DAG: SierraGorda_EstimateHardnessLevelDag Functions that obtain data from the source related to Sierra Gorda

schedule: 30 14 * * *


SierraGorda_EstimateHardnessLevelDag

Toggle wrap
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# Airflow dependencies
from asyncio import Task
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.dagrun_operator import TriggerDagRunOperator

# Python dependencies
import sys
import os
from datetime import datetime 
from utils import estimation_sg_functions

# Default arguments
default_args = {
    'owner': 'pedro',
    'start_date': datetime(2023, 8, 31, 0, 0, 0)
}

# Instanciate the DAG object
sierragorda_estimate_dag = DAG(
    'SierraGorda_EstimateHardnessLevelDag',
    default_args=default_args,
    description='Functions that obtain data from the source related to Sierra Gorda',
    schedule_interval='30 14 * * *', # Hora local 9:30 y 21:30
    catchup=False,
    is_paused_upon_creation=False
)

"""
    Drilling data ingestion
"""
estimate_task1 = PythonOperator(
    task_id = 'clean_drilling_data',
    python_callable = estimation_sg_functions.clean_daily_sg_data,
    dag = sierragorda_estimate_dag,
)

estimate_task2 = PythonOperator(
    task_id = 'generate_metrics_from_data',
    python_callable = estimation_sg_functions.generate_metrics,
    dag = sierragorda_estimate_dag,
)

estimate_task3 = PythonOperator(
    task_id = 'generate_hardness_levels',
    python_callable = estimation_sg_functions.generate_labels,
    dag = sierragorda_estimate_dag,
)

estimate_task1 >> estimate_task2 >> estimate_task3