1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | # Airflow dependencies from asyncio import Task from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from airflow.models import Variable from airflow.operators.dagrun_operator import TriggerDagRunOperator # Python dependencies import sys import os from datetime import timedelta from datetime import datetime from datetime import timedelta # Dependencias propias from utils import desviation_function as desvf from utils.settings import RAW_CONTAINER, CLEAN_CONTAINER, PROCESS_CONTAINER, FECHA_ACTUAL # Default arguments default_args = { 'owner': 'Carlos', 'start_date': datetime(2023, 11, 6, 0, 0, 0) } # Instanciate the DAG object collahuasi_brightboard_dag = DAG( ## Cambiar esto a cammel case. 'Collahuasi_BrightBoard', default_args=default_args, description='Functions that clean data in files related to Collahuasi', schedule_interval='0 15 * * *', # Hora local 20:30 catchup=False, is_paused_upon_creation=False ) """ Tareas de cruce entre Perforaciones y Opit """ brightboard_perf_opit_task1 = PythonOperator( task_id = 'BrighBoard_GenerarArchivoPerf', python_callable = desvf.generar_archivo_perf_procesado_collahuasi, op_kwargs = {'origin_perf_file_container': RAW_CONTAINER, 'origin_perf_file_blob': f"Collahuasi/{FECHA_ACTUAL}/data_drill.json", 'final_container': PROCESS_CONTAINER, 'final_blob': f"Collahuasi/BrightBoard/Drills/{FECHA_ACTUAL}/data_drill.json" }, dag = collahuasi_brightboard_dag, ) # Define task secuence brightboard_perf_opit_task1 |