DAG: report_drilling_status_Spence

schedule: 0 15 * * *


report_drilling_status_Spence

Toggle wrap
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# Airflow dependencies
from asyncio import Task
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable

from jinja2 import Template
from functools import reduce
from datetime import datetime, timedelta
from standardized_process.dags.functions.drilling_status import *

import json 
import os 

task_loaded = False

# Default arguments
default_args = {
    'owner': 'pedro',
    'start_date': datetime(2024, 4, 29, 0, 0, 0)
}

year = datetime.now().year
month = datetime.now().month
day = datetime.now().day

mine_site = "BHP Spence"
drilling_source = "Surface Manager"
start_date = datetime.now()
days_interval = 8
#recipients_bcc = "jorge.alvarez@enaex.com;leonardo.deferrari@enaex.com;sebastian.navarro@enaex.com;rodrigo.echegaray@enaex.com;jaritza.ramirez@enaex.com;aldo.abarzua@enaex.com;miguel.alarcon@enaex.com;nacordero@collahuasi.cl;Ginunez@collahuasi.cl;diego.caceres@enaex.com;sebastian.cerda@enaex.com;sebastian.seria.ext@enaex.com;pedro.delgado@auctus.cl;naalbornoz@collahuasi.cl"
recipients_bcc = "pedro.delgado@auctus.cl"

# Aplica templating Jinja al archivo JSON
with open('dags/standardized_process/dags/configuration/report_drilling_status_config.json') as f:
    template_content = f.read()

# Crear una plantilla Jinja2
template = Template(template_content)
rendered_config = template.render(mine_site=mine_site, drilling_source=drilling_source, start_date=start_date, days_interval=days_interval, recipients_bcc=recipients_bcc)
data = json.loads(rendered_config)

# Diccionario con un mapa de todas las funciones
function_mapping = {
    'generate_tables': generate_tables,
    'generate_report': generate_report
}

def chain_tasks(x, y):
    return x << y

dag = DAG("report_drilling_status_Spence", schedule_interval='0 15 * * *', tags=['Standardized Process'], default_args=default_args, max_active_runs=1, catchup=False, is_paused_upon_creation=False)

list_task = []
for task in data['task_scheme']:
    if task['active'] == True: 
        drill_report_task = PythonOperator(
            task_id = task['task_id'],
            python_callable = function_mapping[task['function']],
            op_kwargs = task['op_kwargs'],
            retries=3,
            dag = dag
        )
        list_task.append(drill_report_task)

# Establecer dependencias
for i in range(len(list_task) - 1):
    list_task[i] >> list_task[i + 1]