DAG: Collahuasi_LimpiezaStorage Functions that clean data in files related to Collahuasi

schedule: 30 17 * * *


Collahuasi_LimpiezaStorage

Toggle wrap
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# Airflow dependencies
from asyncio import Task
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.dagrun_operator import TriggerDagRunOperator

# Python dependencies
import sys
import os

from datetime import timedelta
from datetime import datetime 
from datetime import timedelta

# Dependencias propias
from utils import clean_functions, kpi_funcitons
from utils.settings import RAW_CONTAINER, CLEAN_CONTAINER, FECHA_ACTUAL, YEAR, MONTH, DAY

# Default arguments
default_args = {
    'owner': 'pedro',
    'start_date': datetime(2023, 4, 22, 0, 0, 0)
}

# Instanciate the DAG object
collahuasi_clean_dag = DAG(
    'Collahuasi_LimpiezaStorage',
    default_args=default_args,
    description='Functions that clean data in files related to Collahuasi',
    schedule_interval= '30 17 * * *',# Hora local 17:30
    catchup=False,
    is_paused_upon_creation=True
)

drill_task1 = PythonOperator(
    task_id = 'Drill_FlagEmptyValues',
    python_callable = clean_functions.flag_empty_value,
    op_kwargs = {'origin_container':RAW_CONTAINER, 
                'origin_blob': f"Collahuasi/{FECHA_ACTUAL}/data_drill.json",
                'final_container': CLEAN_CONTAINER,
                'final_blob': f"Collahuasi/Drilling/Provision/{FECHA_ACTUAL}/data_drill.json",
                'final_blob_dl': f"Collahuasi/Drilling/Provision/{YEAR}/{MONTH}/{DAY}/data_drill.json",
                'field_list': ['fechaInicioPozo', 'fechaFinPozo', 'nombreEquipo', 'Malla',
                                'Pozo', 'PosicionXPlanificada', 'PosicionYPlanificada',
                                'PosicionZPlanificada', 'posicionXPerforado', 'posicionYPerforado',
                                'posicionZPerforado', 'Profundidad', 'PenetrationRate',
                                'RPM', 'BitAirPressure', 'PullDown', 'Torque']
                },
    dag = collahuasi_clean_dag,
)

drill_task2 = PythonOperator(
    task_id = 'Drill_ComunFieldFlag',
    python_callable = clean_functions.comun_field_flag,
    op_kwargs = {'origin_container':CLEAN_CONTAINER, 
                'origin_blob': f"Collahuasi/Drilling/Provision/{FECHA_ACTUAL}/data_drill.json",
                'final_container': CLEAN_CONTAINER,
                'final_blob': f"Collahuasi/Drilling/Provision/{FECHA_ACTUAL}/data_drill.json",
                'final_blob_dl': f"Collahuasi/Drilling/Provision/{YEAR}/{MONTH}/{DAY}/data_drill.json",
                'flag_type': "I",
                'limit': 0,
                'fields': ['PosicionXPlanificada', 'PosicionYPlanificada',
                'PosicionZPlanificada', 'posicionXPerforado', 'posicionYPerforado',
                'posicionZPerforado', 'Profundidad', 'PenetrationRate',
                'RPM', 'BitAirPressure', 'PullDown', 'Torque']
                },
    dag = collahuasi_clean_dag,
)

# Define task secuence 
drill_task1 >> drill_task2