# Airflow dependencies
from asyncio import Task
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from jinja2 import Template
from functools import reduce
from datetime import datetime, timedelta
from standardized_process.dags.functions.data_cleaning import *
import json
import os
task_loaded = False
# Default arguments
default_args = {
'owner': 'pedro',
'start_date': datetime(2023, 4, 22, 0, 0, 0)
}
year = datetime.now().year
month = datetime.now().month
day = datetime.now().day
mine_site = "Collahuasi"
drilling_source = "Provision"
# Aplica templating Jinja al archivo JSON
with open('dags/standardized_process/dags/configuration/Collahuasi/data_cleaning_config.json') as f:
template_content = f.read()
# Crear una plantilla Jinja2
template = Template(template_content)
rendered_config = template.render(mine_site=mine_site, drilling_source=drilling_source, year=year, month=month, day=day)
data = json.loads(rendered_config)
# Diccionario con un mapa de todas las funciones
function_mapping = {
'normalize_headers': normalize_headers,
'drill_groups_label': drill_groups_label,
'prepare_columns': prepare_columns,
'coordinates_limit_filter': coordinates_limit_filter,
'minesite_limit_filter':minesite_limit_filter,
'unit_converter':unit_converter,
'zero_samples_filter': zero_samples_filter,
'iqr_limit_filter':iqr_limit_filter,
'z_score_filter': z_score_filter,
'add_id': add_id,
'register_per_meter': register_per_meter,
'get_bench_phase_values':get_bench_phase_values
}
def chain_tasks(x, y):
return x << y
dag = DAG("data_cleaning_Collahuasi", schedule_interval='0 15 * * *', tags=['Standardized Process'], default_args=default_args, max_active_runs=1, catchup=False, is_paused_upon_creation=True)
list_task = []
for task in data['task_scheme']:
if task['active'] == True:
drill_report_task = PythonOperator(
task_id = task['task_id'],
python_callable = function_mapping[task['function']],
op_kwargs = task['op_kwargs'],
retries=3,
dag = dag
)
list_task.append(drill_report_task)
# Establecer dependencias
for i in range(len(list_task) - 1):
list_task[i] >> list_task[i + 1]