1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105 | # Airflow dependencies
from asyncio import Task
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.dagrun_operator import TriggerDagRunOperator
# Python dependencies
import sys
import os
from datetime import timedelta
from datetime import datetime
# Dependencias propias
from utils import parameters_function, join_functions
from utils.settings import RAW_CONTAINER, PROCESS_CONTAINER, FECHA_ACTUAL
# Default arguments
default_args = {
'owner': 'pedro',
'start_date': datetime(2023, 4, 22, 0, 0, 0)
}
# Instanciate the DAG object
sierra_gorda_parameters_dag = DAG(
'SierraGorda_ParametersStorage',
default_args=default_args,
description='Functions that calculate parameters like fragmentation in SierraGorda',
schedule_interval='45,45 12,0 * * *', # Hora local 8:30 y 20:30
catchup=False,
is_paused_upon_creation=False
)
paramters_fragmentation_task1 = PythonOperator(
task_id = 'fragmentation_joinWencoFiles',
python_callable = join_functions.generar_cruce_wenco_haul_shovel,
op_kwargs = {
"origin_shovel_file_container": RAW_CONTAINER,
"origin_shovel_file_blob": f"SierraGorda/{FECHA_ACTUAL}/Wenco-Shovel-cycle.json",
"origin_haul_file_container": RAW_CONTAINER,
"origin_haul_file_blob":f"SierraGorda/{FECHA_ACTUAL}/Wenco-Truck-cycle.json",
"origin_load_file_container": RAW_CONTAINER,
"origin_load_file_blob": f"SierraGorda/{FECHA_ACTUAL}/LoadTrans.json",
"final_container": PROCESS_CONTAINER,
"final_blob": f"SierraGorda/Fragmentacion/{FECHA_ACTUAL}/WencoShovelHaul.json"
},
dag = sierra_gorda_parameters_dag,
)
paramters_fragmentation_task2 = PythonOperator(
task_id = 'fragmentation_AddBenchToWenco',
python_callable = parameters_function.agregar_banco_a_wenco,
op_kwargs = {
"origin_wenco_file_container": PROCESS_CONTAINER,
"origin_opit_container": RAW_CONTAINER,
"origin_wenco_file_blob": f"SierraGorda/Fragmentacion/{FECHA_ACTUAL}/WencoShovelHaul.json",
"origin_opit_blob": "SierraGorda/Historico/opit.parquet",
"final_container": PROCESS_CONTAINER,
"final_blob": f"SierraGorda/Fragmentacion/{FECHA_ACTUAL}/WencoShovelHaul_banco.json"
},
dag = sierra_gorda_parameters_dag,
)
paramters_fragmentation_task3 = PythonOperator(
task_id = 'fragmentation_AddExcavationRate',
python_callable = parameters_function.agregar_tasa_de_excavacion,
op_kwargs = {
"origin_wenco_file_container": PROCESS_CONTAINER,
"origin_wenco_file_blob": f"SierraGorda/Fragmentacion/{FECHA_ACTUAL}/WencoShovelHaul_banco.json",
"final_container": PROCESS_CONTAINER,
"final_blob": f"SierraGorda/Fragmentacion/{FECHA_ACTUAL}/cruce_wenco_shovel_banco_tasa_excavacion.json"
},
dag = sierra_gorda_parameters_dag,
)
paramters_fragmentation_task4 = PythonOperator(
task_id = 'fragmentation_AddFragmentation',
python_callable = parameters_function.cruce_con_fragmentacion,
op_kwargs = {
"origin_wenco_file_container": PROCESS_CONTAINER,
"origin_split_contianer": RAW_CONTAINER,
"origin_split_blob":f"SierraGorda/{FECHA_ACTUAL}/Split.json",
"origin_wenco_file_blob": f"SierraGorda/Fragmentacion/{FECHA_ACTUAL}/cruce_wenco_shovel_banco_tasa_excavacion.json",
"final_container": PROCESS_CONTAINER,
"final_blob": f"SierraGorda/Fragmentacion/{FECHA_ACTUAL}/cruce_wenco_shovel_banco_tasa_excavacion_fragmentacion.json"
},
dag = sierra_gorda_parameters_dag,
)
paramters_fragmentation_task5 = PythonOperator(
task_id = 'fragmentation_uploadToSQL',
python_callable = parameters_function.subir_a_sql,
op_kwargs = {
"origin_wenco_file_container": PROCESS_CONTAINER,
"origin_opit_container": RAW_CONTAINER,
"origin_opit_blob":"SierraGorda/Historico/opit.parquet",
"origin_wenco_file_blob": f"SierraGorda/Fragmentacion/{FECHA_ACTUAL}/cruce_wenco_shovel_banco_tasa_excavacion_fragmentacion.json"
},
dag = sierra_gorda_parameters_dag,
)
# Define task secuence
paramters_fragmentation_task1 >> paramters_fragmentation_task2 >> paramters_fragmentation_task3 >> paramters_fragmentation_task4 >> paramters_fragmentation_task5
|