DAG: SierraGorda_HomologateStorage Functions that homologate files related to SierraGorda ROOT: Terrain_joinfiles

schedule: 0,0 13,1 * * *


SierraGorda_HomologateStorage

Toggle wrap
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# Airflow dependencies
from asyncio import Task
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.dagrun_operator import TriggerDagRunOperator

# Python dependencies
import sys
import os

from datetime import timedelta
from datetime import datetime 

# Dependencias propias
from utils import join_functions, homologation_functions
from utils.settings import RAW_CONTAINER, CLEAN_CONTAINER, PROCESS_CONTAINER, FECHA_ACTUAL


# Default arguments
default_args = {
    'owner': 'pedro',
    'start_date': datetime(2023, 4, 22, 0, 0, 0)
}

# Instanciate the DAG object
sierra_gorda_homologate_dag = DAG(
    'SierraGorda_HomologateStorage',
    default_args=default_args,
    description='Functions that homologate files related to SierraGorda',
    schedule_interval='0,0 13,1 * * *', # Hora local 8:30 y 20:30
    catchup=False,
    is_paused_upon_creation=False
)

### Terrain File
terrain_task1 = PythonOperator(
    task_id = 'Terrain_joinfiles',
    python_callable = join_functions.join_terrain_files,
    op_kwargs = {   
        "origin_container": RAW_CONTAINER,
        "origin_hole_position_blob": f"SierraGorda/{FECHA_ACTUAL}/Terrain_HolePosition.json",
        "origin_hole_information_blob": f"SierraGorda/{FECHA_ACTUAL}/Terrain_HoleInformation.json",
        "origin_hole_profile_blob": f"SierraGorda/{FECHA_ACTUAL}/Terrain_HoleProfile.json",
        "final_container": PROCESS_CONTAINER,
        "final_blob": f"SierraGorda/BrightBoard/Terrain/{FECHA_ACTUAL}/TerrainFile.json",
    },
    dag = sierra_gorda_homologate_dag,
)

terrain_task2 = PythonOperator(
    task_id = 'Terrain_homologatefields',
    python_callable = homologation_functions.homologate_fields_in_terrain_file,
    op_kwargs = {   
        "origin_container": PROCESS_CONTAINER,
        "origin_blob": f"SierraGorda/BrightBoard/Terrain/{FECHA_ACTUAL}/TerrainFile.json",
        "final_container": PROCESS_CONTAINER,
        "final_blob": f"SierraGorda/BrightBoard/Terrain/{FECHA_ACTUAL}/terrain_{FECHA_ACTUAL}.json"
    },
    dag = sierra_gorda_homologate_dag,
)

# Define task secuence 
terrain_task1 >> terrain_task2