# Airflow dependencies
from asyncio import Task
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.dagrun_operator import TriggerDagRunOperator
# Python dependencies
import sys
import os
from datetime import datetime
from datetime import timedelta
# Dependencias propias
from utils import ingest_functions
from utils.settings import RAW_CONTAINER, SIERRA_GORDA_NAME, YEAR, MONTH, DAY
from utils.settings import SIERRA_GORDA_API_URL, SIERRA_GORDA_API_TOKEN, SIERRA_GORDA_NAME
from utils.settings import DRILLING_PROCESS_NAME, LOADINGHAULING_PROCESS_NAME, CRUSHING_PROCESS_NAME, FRAGMENTATION_PROCESS_NAME
from utils.settings import TERRAIN_SOURCE_NAME, FLANDERS_SOURCE_NAME, SURFACE_MANAGER_SOURCE_NAME
# Default arguments
default_args = {
'owner': 'pedro',
'start_date': datetime(2023, 8, 31, 0, 0, 0)
}
# Instanciate the DAG object
sierragorda_ingest_dag = DAG(
'SierraGorda_IngestDag',
default_args=default_args,
description='Functions that obtain data from the source related to Sierra Gorda',
schedule_interval='30 15 * * *', # Hora local 9:30 y 21:30
catchup=False,
is_paused_upon_creation=True
)
"""
Drilling data ingestion
"""
drilling_terrain_task1 = PythonOperator(
task_id = 'Terrain_GetOperator',
python_callable = ingest_functions.get_data_from_api,
op_kwargs = {'url': SIERRA_GORDA_API_URL,
'headers': {"Authorization": SIERRA_GORDA_API_TOKEN},
'body': "select * from dr_c_operator",
'method': 'POST',
'destination_container': RAW_CONTAINER,
'destination_blob': f'{SIERRA_GORDA_NAME}/{DRILLING_PROCESS_NAME}/{TERRAIN_SOURCE_NAME}/{YEAR}/{MONTH}/{DAY}/Terrain_Operator.json'
},
dag = sierragorda_ingest_dag,
)