# Airflow dependencies
from asyncio import Task
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.dagrun_operator import TriggerDagRunOperator
# Python dependencies
import sys
import os
from datetime import timedelta
from datetime import datetime
from datetime import timedelta
# Dependencias propias
from utils import agregate_functions
from utils.settings import CLEAN_CONTAINER, FECHA_ACTUAL, YEAR, MONTH, DAY, PROCESS_CONTAINER
# Default arguments
default_args = {
'owner': 'pedro',
'start_date': datetime(2023, 4, 22, 0, 0, 0)
}
# Instanciate the DAG object
collahuasi_aggregate_dag = DAG(
'Collahuasi_Aggregation',
default_args=default_args,
description='Functions that add fields in files related to Collahuasi',
schedule_interval= '45 17 * * *',# Hora local 17:30
catchup=False,
is_paused_upon_creation=True
)
"""
Drilling file
"""
drill_task1 = PythonOperator(
task_id = 'Drill_AddFields_MallaName_Bench_Phase',
python_callable = agregate_functions.get_fields_from_mallaid,
op_kwargs = {'origin_container':CLEAN_CONTAINER,
'origin_blob': f"Collahuasi/Drilling/Provision/{FECHA_ACTUAL}/data_drill.json",
'final_container': PROCESS_CONTAINER,
'malla_field': 'Malla',
'final_blob': f"Collahuasi/Get blast detail/{FECHA_ACTUAL}/blast_detail.json",
'final_blob_dl': f"Collahuasi/Get blast detail/{YEAR}/{MONTH}/{DAY}/blast_detail.json",
},
dag = collahuasi_aggregate_dag,
)
drill_task2 = PythonOperator(
task_id = 'Drill_ComunFieldFlag',
python_callable = agregate_functions.get_drilling_time,
op_kwargs = {'origin_container':CLEAN_CONTAINER,
'origin_blob': f"Collahuasi/Drilling/Provision/{FECHA_ACTUAL}/data_drill.json",
'final_container': PROCESS_CONTAINER,
'hole_id': ['Malla', 'Pozo', 'nombreEquipo','PosicionXPlanificada','PosicionYPlanificada','PosicionZPlanificada','fechaInicioPozo'],
'start_timestamp': 'fechaInicioPozo',
'end_timestamp': 'fechaFinPozo',
'final_blob': f"Collahuasi/Drilling time/{FECHA_ACTUAL}/drilling_time.json",
'final_blob_dl': f"Collahuasi/Drilling time/{YEAR}/{MONTH}/{DAY}/drilling_time.json",
},
dag = collahuasi_aggregate_dag,
)
# Define task secuence
drill_task1 >> drill_task2