DAG: SierraGorda_IngestDag Functions that obtain data from the source related to Sierra Gorda

schedule: 30 15 * * *


SierraGorda_IngestDag

Toggle wrap
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# Airflow dependencies
from asyncio import Task
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.dagrun_operator import TriggerDagRunOperator

# Python dependencies
import sys
import os
from datetime import datetime 
from datetime import timedelta

# Dependencias propias
from utils import ingest_functions
from utils.settings import RAW_CONTAINER, SIERRA_GORDA_NAME, YEAR, MONTH, DAY
from utils.settings import SIERRA_GORDA_API_URL, SIERRA_GORDA_API_TOKEN, SIERRA_GORDA_NAME
from utils.settings import DRILLING_PROCESS_NAME, LOADINGHAULING_PROCESS_NAME, CRUSHING_PROCESS_NAME, FRAGMENTATION_PROCESS_NAME
from utils.settings import TERRAIN_SOURCE_NAME, FLANDERS_SOURCE_NAME, SURFACE_MANAGER_SOURCE_NAME


# Default arguments
default_args = {
    'owner': 'pedro',
    'start_date': datetime(2023, 8, 31, 0, 0, 0)
}

# Instanciate the DAG object
sierragorda_ingest_dag = DAG(
    'SierraGorda_IngestDag',
    default_args=default_args,
    description='Functions that obtain data from the source related to Sierra Gorda',
    schedule_interval='30 15 * * *', # Hora local 9:30 y 21:30
    catchup=False,
    is_paused_upon_creation=True
)

"""
    Drilling data ingestion
"""
drilling_terrain_task1 = PythonOperator(
    task_id = 'Terrain_GetOperator',
    python_callable = ingest_functions.get_data_from_api,
    op_kwargs = {'url': SIERRA_GORDA_API_URL, 
                 'headers': {"Authorization": SIERRA_GORDA_API_TOKEN},
                 'body': "select * from dr_c_operator",
                 'method': 'POST',
                 'destination_container': RAW_CONTAINER,
                 'destination_blob': f'{SIERRA_GORDA_NAME}/{DRILLING_PROCESS_NAME}/{TERRAIN_SOURCE_NAME}/{YEAR}/{MONTH}/{DAY}/Terrain_Operator.json'
                },
    dag = sierragorda_ingest_dag,
)