# Airflow dependencies
from asyncio import Task
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.dagrun_operator import TriggerDagRunOperator
# Python dependencies
import sys
import os
from datetime import timedelta
from datetime import datetime
# Dependencias propias
from utils import join_functions, homologation_functions
from utils.settings import RAW_CONTAINER, CLEAN_CONTAINER, PROCESS_CONTAINER, FECHA_ACTUAL
# Default arguments
default_args = {
'owner': 'pedro',
'start_date': datetime(2023, 4, 22, 0, 0, 0)
}
# Instanciate the DAG object
sierra_gorda_homologate_dag = DAG(
'SierraGorda_HomologateStorage',
default_args=default_args,
description='Functions that homologate files related to SierraGorda',
schedule_interval='0,0 13,1 * * *', # Hora local 8:30 y 20:30
catchup=False,
is_paused_upon_creation=False
)
### Terrain File
terrain_task1 = PythonOperator(
task_id = 'Terrain_joinfiles',
python_callable = join_functions.join_terrain_files,
op_kwargs = {
"origin_container": RAW_CONTAINER,
"origin_hole_position_blob": f"SierraGorda/{FECHA_ACTUAL}/Terrain_HolePosition.json",
"origin_hole_information_blob": f"SierraGorda/{FECHA_ACTUAL}/Terrain_HoleInformation.json",
"origin_hole_profile_blob": f"SierraGorda/{FECHA_ACTUAL}/Terrain_HoleProfile.json",
"final_container": PROCESS_CONTAINER,
"final_blob": f"SierraGorda/BrightBoard/Terrain/{FECHA_ACTUAL}/TerrainFile.json",
},
dag = sierra_gorda_homologate_dag,
)
terrain_task2 = PythonOperator(
task_id = 'Terrain_homologatefields',
python_callable = homologation_functions.homologate_fields_in_terrain_file,
op_kwargs = {
"origin_container": PROCESS_CONTAINER,
"origin_blob": f"SierraGorda/BrightBoard/Terrain/{FECHA_ACTUAL}/TerrainFile.json",
"final_container": PROCESS_CONTAINER,
"final_blob": f"SierraGorda/BrightBoard/Terrain/{FECHA_ACTUAL}/terrain_{FECHA_ACTUAL}.json"
},
dag = sierra_gorda_homologate_dag,
)
# Define task secuence
terrain_task1 >> terrain_task2