DAG: Spence_Reports Functions that generated reports related to Spence BHP

schedule: 30,30 17,1 * * *


Spence_Reports

Toggle wrap
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# Airflow dependencies
from asyncio import Task
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.dagrun_operator import TriggerDagRunOperator

# Python dependencies
import sys
import os

from datetime import timedelta
from datetime import datetime 
from datetime import timedelta

# Dependencias propias
from utils import report_functions
from utils.settings import  FECHA_ACTUAL, YEAR, MONTH, DAY, RAW_CONTAINER

# Default arguments
default_args = {
    'owner': 'pedro',
    'start_date': datetime(2024, 6, 17, 0, 0, 0)
}

# Instanciate the DAG object
spence_report_dag = DAG(
    'Spence_Reports',
    default_args=default_args,
    description='Functions that generated reports related to Spence BHP',
    schedule_interval= '30,30 17,1 * * *',# Hora local 13:30 y 21:30
    catchup=False,
    is_paused_upon_creation=True
)


"""
    Drilling file
"""
drill_report_task1 = PythonOperator(
    task_id = 'generate_drilling_report_data',
    python_callable = report_functions.get_spence_data_for_drilling_report,
    op_kwargs = {'days': 7},
    dag = spence_report_dag,
)


drill_report_task2 = PythonOperator(
    task_id = 'generate_drilling_report',
    python_callable = report_functions.generate_drilling_report,
    op_kwargs = {'fecha_inicio': '2024-06-23',
                 'fecha_termino': '2024-06-17',
                 'feana': 'BHP Spence',
                 'blob_route': 'BHP Spence',
                 'cosmos_faena':'BHP Spence'},
    dag = spence_report_dag,
)
# Define task secuence 
drill_report_task1