1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | # Airflow dependencies from asyncio import Task from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from airflow.models import Variable from airflow.operators.dagrun_operator import TriggerDagRunOperator # Python dependencies import sys import os from datetime import datetime from utils import estimation_sg_functions # Default arguments default_args = { 'owner': 'pedro', 'start_date': datetime(2023, 8, 31, 0, 0, 0) } # Instanciate the DAG object sierragorda_estimate_dag = DAG( 'SierraGorda_EstimateHardnessLevelDag', default_args=default_args, description='Functions that obtain data from the source related to Sierra Gorda', schedule_interval='30 14 * * *', # Hora local 9:30 y 21:30 catchup=False, is_paused_upon_creation=False ) """ Drilling data ingestion """ estimate_task1 = PythonOperator( task_id = 'clean_drilling_data', python_callable = estimation_sg_functions.clean_daily_sg_data, dag = sierragorda_estimate_dag, ) estimate_task2 = PythonOperator( task_id = 'generate_metrics_from_data', python_callable = estimation_sg_functions.generate_metrics, dag = sierragorda_estimate_dag, ) estimate_task3 = PythonOperator( task_id = 'generate_hardness_levels', python_callable = estimation_sg_functions.generate_labels, dag = sierragorda_estimate_dag, ) estimate_task1 >> estimate_task2 >> estimate_task3 |