# Airflow dependencies
from asyncio import Task
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.dagrun_operator import TriggerDagRunOperator
# Python dependencies
import sys
import os
from datetime import timedelta
from datetime import datetime
from datetime import timedelta
# Dependencias propias
from utils.settings import RAW_CONTAINER, METRICS_CONTAINER, YEAR, MONTH, DAY
from utils.metrics import calculate_estimated_bench_metric
# Default arguments
default_args = {
'owner': 'Pedro',
'start_date': datetime(2024, 4, 26, 0, 0, 0)
}
# Instanciate the DAG object
sg_metric_dag = DAG( ## Cambiar esto a cammel case.
'SG_Metrics',
default_args=default_args,
description='Functions that calculated metrics related to models in Sierra Gorda',
schedule_interval= '30 12 * * 1', # Viernes a las 12:30 todas las semanas
catchup=False,
is_paused_upon_creation=False
)
"""
Tareas de cruce entre Perforaciones y Opit
"""
metrics_sg_task1 = PythonOperator(
task_id = 'Metrics_GenerateScore_BenchEstimation',
python_callable = calculate_estimated_bench_metric,
op_kwargs = {"app_points_container": RAW_CONTAINER,
"app_points_blob": f"SierraGorda/dxf/puntos_app_2024_01.json",
"mine_opit": "'SG'",
"mine_dxf_polygon": "SierraGorda",
"classifier_model_path": "utils/models/SG/clasificador.pkl",
"final_container": METRICS_CONTAINER,
"final_blob": f"SierraGorda/Estimate bench/{YEAR}/{MONTH}/{DAY}/BenchEstimation_v1.json"},
dag = sg_metric_dag,
)
# Define task secuence
metrics_sg_task1