1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 | # Airflow dependencies
from asyncio import Task
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.dagrun_operator import TriggerDagRunOperator
# Python dependencies
import sys
import os
from datetime import timedelta
from datetime import datetime
# Dependencias propias
from utils import clean_functions, kpi_funcitons, agregate_functions
from utils.settings import RAW_CONTAINER, CLEAN_CONTAINER, VALIDATED_CONTAINER, FECHA_ACTUAL
# Default arguments
default_args = {
'owner': 'pedro',
'start_date': datetime(2023, 4, 22, 0, 0, 0)
}
# Instanciate the DAG object
sierra_gorda_agregate_dag = DAG(
'SierraGorda_AgregacionStorage',
default_args=default_args,
description='Functions that agregate data in files related to SierraGorda',
schedule_interval='45,45 12,0 * * *', # Hora local 8:30 y 20:30
catchup=False,
is_paused_upon_creation=True
)
### Flanders HHD File
flanders_hhd_task1 = PythonOperator(
task_id = 'FlandersHHD_GenerateBitNumber',
python_callable = agregate_functions.generate_bit_number_identifier,
op_kwargs = {
"origin_container": CLEAN_CONTAINER,
"origin_blob": f"SierraGorda/{FECHA_ACTUAL}/Flanders-HHD.json",
"final_container": VALIDATED_CONTAINER,
"final_blob": f"SierraGorda/{FECHA_ACTUAL}/Flanders-HHD.json",
"field": "BitNumber"
},
dag = sierra_gorda_agregate_dag,
)
flanders_hhd_task2 = PythonOperator(
task_id = 'FlandersHHD_GenerateBitWidth',
python_callable = agregate_functions.generate_bit_width,
op_kwargs = {
"origin_container": VALIDATED_CONTAINER,
"origin_blob": f"SierraGorda/{FECHA_ACTUAL}/Flanders-HHD.json",
"final_container": VALIDATED_CONTAINER,
"final_blob": f"SierraGorda/{FECHA_ACTUAL}/Flanders-HHD.json",
"field": "BitNumber"
},
dag = sierra_gorda_agregate_dag,
)
flanders_hhd_task3 = PythonOperator(
task_id = 'FlandersHHD_CalculateDrillTime',
python_callable = agregate_functions.get_drill_time,
op_kwargs = {
"origin_detalle_container": RAW_CONTAINER,
"origin_detalle_blob": f"SierraGorda/{FECHA_ACTUAL}/Flanders-MWD.json",
"origin_resumen_container": VALIDATED_CONTAINER,
"origin_resumen_blob": f"SierraGorda/{FECHA_ACTUAL}/Flanders-HHD.json",
"final_container": VALIDATED_CONTAINER,
"final_blob": f"SierraGorda/{FECHA_ACTUAL}/Flanders-HHD.json",
"date_field": "DateTime",
"list_key": ["Drill_Number","OperatorID","HoleNumber"]
},
dag = sierra_gorda_agregate_dag,
)
flanders_hhd_task4 = PythonOperator(
task_id = 'FlandersHHD_TransformFromUTM_to_LatLon',
python_callable = agregate_functions.transform_from_utm_to_latlon,
op_kwargs = {
"origin_container": VALIDATED_CONTAINER,
"origin_blob": f"SierraGorda/{FECHA_ACTUAL}/Flanders-HHD.json",
"final_container": VALIDATED_CONTAINER,
"final_blob": f"SierraGorda/{FECHA_ACTUAL}/Flanders-HHD.json",
"easting_field": "ActualCollarEasting",
"northing_field": "ActualCollarNorthing"
},
dag = sierra_gorda_agregate_dag,
)
flanders_hhd_task5 = PythonOperator(
task_id = 'FlandersHHD_TransformFrom_LatLong_to_Miner',
python_callable = agregate_functions.transform_from_lat_long_to_miner,
op_kwargs = {
"origin_container": VALIDATED_CONTAINER,
"origin_blob": f"SierraGorda/{FECHA_ACTUAL}/Flanders-HHD.json",
"final_container": VALIDATED_CONTAINER,
"final_blob": f"SierraGorda/{FECHA_ACTUAL}/Flanders-HHD.json",
"latitud": "lat",
"longitud": "lon"
},
dag = sierra_gorda_agregate_dag,
)
# Define task secuence
flanders_hhd_task1 >> flanders_hhd_task2 >> flanders_hhd_task3 >> flanders_hhd_task4 >> flanders_hhd_task5
|