DAG: SierraGorda_HomologateStorage Functions that homologate files related to SierraGorda

schedule: 0,0 13,1 * * *


Task Instance: Terrain_homologatefields


Task Instance Details

Dependencies Blocking Task From Getting Scheduled
Dependency Reason
Task Instance State Task is in the 'upstream_failed' state which is not a valid state for execution. The task must be cleared in order to be run.
Trigger Rule Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 1, 'upstream_failed': 0, 'done': 1}, upstream_task_ids={'Terrain_joinfiles'}
Dagrun Running Task instance's dagrun was not in the 'running' state but in the state 'failed'.
Attribute: python_callable
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def homologate_fields_in_terrain_file(origin_container, origin_blob, final_container, final_blob): 
    """
    Función que permite homologar los campos en terrain para que queden con el mismo nombre de los campos en flanders. 
    
    Parámetros
    ----------
    origin_container: str
        Nombre del contenedor donde se encuentra el archivo a homologar.
    origin_blob: str
        Nombre del archivo que contiene los campos a homologar.
    final_container: str
        Nombre del contenedor donde se guardará el archivo.
    final_blob: str
        Nombre del archivo que se guardará en el contenedor final.

    Returns
    -------
    None
    """
    # Read terrain hole position file 
    conn_origin_container = BlobClient.from_connection_string(conn_str=BLOB_CONNECT_STRING, container_name=origin_container, blob_name=origin_blob)
    download_origin_blob = conn_origin_container.download_blob()
    df_terrain = pd.read_json(download_origin_blob, orient='table')

    # Merge files
    df_terrain['time_start'] = df_terrain['time_start'].astype('datetime64[ns]')
    df_terrain['time_end_informacion'] = df_terrain['time_end_informacion'].astype('datetime64[ns]')
    df_terrain['drill_time'] = (df_terrain['time_end_informacion'] - df_terrain['time_start']).dt.total_seconds()

    multiples_of_16 = [x for x in range(800, 3000) if x % 16 == 0]
    df_terrain['benchbelow'] = df_terrain.apply(lambda x: calculateBenchBelowTerrrain(x, 'design_z', multiples_of_16), axis=1)

    # Create new fields
    df_terrain['RecordID'] = np.nan
    df_terrain['shift_index'] = np.nan
    df_terrain['Mine_HoleNumber'] = df_terrain['hole_id']
    df_terrain['BenchBelow'] = df_terrain['bench']
    df_terrain['BitNumber'] = ''
    df_terrain['BitUsage'] = np.nan
    df_terrain['AvgHoleEnergy'] = np.nan
    df_terrain['AvgPenetrationRate2'] = np.nan
    df_terrain['HoleDepth'] = df_terrain['depth']
    df_terrain['TargetDepth'] = df_terrain['design_depth']
    df_terrain['PatternHoleNumber'] = ''
    df_terrain['BlastName'] = ''
    df_terrain['DrillPattern'] = ''
    df_terrain['DateTime'] = df_terrain['time_start']
    df_terrain['Drill_Number'] = df_terrain['machine_id']
    df_terrain['OperatorID'] = np.nan
    df_terrain['HoleNumber'] = df_terrain['name']
    df_terrain['DesignNorthing'] = df_terrain['design_y']
    df_terrain['DesignEasting'] = df_terrain['design_x']
    df_terrain['ActualCollarNorthing'] = df_terrain['start_y']
    df_terrain['ActualCollarEasting'] = df_terrain['start_x']
    df_terrain['designelevation'] = df_terrain['design_z']
    df_terrain['actualelevation'] = df_terrain['start_z']
    df_terrain['actualgroundelevation'] = np.nan
    df_terrain['Bench Below'] = df_terrain['benchbelow']
    df_terrain['Hole Difference'] = ''
    df_terrain['Rango'] = ''
    df_terrain['Categoria'] = ''
    df_terrain['Nombre Categoria'] = ''
    df_terrain['Desviacion'] = ''
    df_terrain['Extra Sobreperforación'] = np.nan
    df_terrain['Extra Reperforación'] = np.nan
    df_terrain['Turno'] = ''
    df_terrain['DrillTime'] = df_terrain['drill_time']

    ## Transformaciones
    df_terrain['Mine_HoleNumber'] = df_terrain['Mine_HoleNumber'].astype(str)
    df_terrain['HoleNumber'] = df_terrain['HoleNumber'].astype(str)
    df_terrain['Bench Below'] = df_terrain['Bench Below'].astype(str)

    lista_campos = [x.strip().replace("'","") for x in COLUMNAS_FLANDERS]
    df_terrain = df_terrain[lista_campos]

    # Write file to blob
    blob = BlobClient.from_connection_string(conn_str=BLOB_CONNECT_STRING, container_name=final_container, blob_name=final_blob)
    json_data = df_terrain.to_json(index=False, orient='table')
    blob.upload_blob(json_data, overwrite=True)
Task Instance Attributes
Attribute Value
dag_id SierraGorda_HomologateStorage
duration None
end_date 2026-06-07 13:01:12.630174+00:00
execution_date 2026-06-07T01:00:00+00:00
executor_config {}
generate_command <function TaskInstance.generate_command at 0x73b601556040>
hostname
is_premature False
job_id None
key ('SierraGorda_HomologateStorage', 'Terrain_homologatefields', <Pendulum [2026-06-07T01:00:00+00:00]>, 1)
log <Logger airflow.task (INFO)>
log_filepath /usr/local/airflow/logs/SierraGorda_HomologateStorage/Terrain_homologatefields/2026-06-07T01:00:00+00:00.log
log_url http://localhost:8080/admin/airflow/log?execution_date=2026-06-07T01%3A00%3A00%2B00%3A00&task_id=Terrain_homologatefields&dag_id=SierraGorda_HomologateStorage
logger <Logger airflow.task (INFO)>
mark_success_url http://localhost:8080/success?task_id=Terrain_homologatefields&dag_id=SierraGorda_HomologateStorage&execution_date=2026-06-07T01%3A00%3A00%2B00%3A00&upstream=false&downstream=false
max_tries 0
metadata MetaData(bind=None)
next_try_number 1
operator None
pid None
pool default_pool
prev_attempted_tries 0
previous_execution_date_success None
previous_start_date_success None
previous_ti <TaskInstance: SierraGorda_HomologateStorage.Terrain_homologatefields 2026-06-06 13:00:00+00:00 [upstream_failed]>
previous_ti_success None
priority_weight 1
queue default
queued_dttm None
raw False
run_as_user None
start_date 2026-06-07 13:01:12.630164+00:00
state upstream_failed
task <Task(PythonOperator): Terrain_homologatefields>
task_id Terrain_homologatefields
test_mode False
try_number 1
unixname airflow
Task Attributes
Attribute Value
dag <DAG: SierraGorda_HomologateStorage>
dag_id SierraGorda_HomologateStorage
depends_on_past False
deps {<TIDep(Previous Dagrun State)>, <TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>}
do_xcom_push True
downstream_list []
downstream_task_ids set()
email None
email_on_failure True
email_on_retry True
end_date None
execution_timeout None
executor_config {}
extra_links []
global_operator_extra_link_dict {}
inlets []
lineage_data None
log <Logger airflow.task.operators (INFO)>
logger <Logger airflow.task.operators (INFO)>
max_retry_delay None
on_failure_callback None
on_retry_callback None
on_success_callback None
op_args []
op_kwargs {'origin_container': 'processed', 'origin_blob': 'SierraGorda/BrightBoard/Terrain/2026-06-13/TerrainFile.json', 'final_container': 'processed', 'final_blob': 'SierraGorda/BrightBoard/Terrain/2026-06-13/terrain_2026-06-13.json'}
operator_extra_link_dict {}
operator_extra_links ()
outlets []
owner pedro
params {}
pool default_pool
priority_weight 1
priority_weight_total 1
provide_context False
queue default
resources None
retries 0
retry_delay 0:05:00
retry_exponential_backoff False
run_as_user None
schedule_interval 0,0 13,1 * * *
shallow_copy_attrs ('python_callable', 'op_kwargs')
sla None
start_date 2023-04-22T00:00:00+00:00
subdag None
task_concurrency None
task_id Terrain_homologatefields
task_type PythonOperator
template_ext []
template_fields ('templates_dict', 'op_args', 'op_kwargs')
templates_dict None
trigger_rule all_success
ui_color #ffefeb
ui_fgcolor #000
upstream_list [<Task(PythonOperator): Terrain_joinfiles>]
upstream_task_ids {'Terrain_joinfiles'}
wait_for_downstream False
weight_rule downstream