DAG: SierraGorda_ParametersStorage Functions that calculate parameters like fragmentation in SierraGorda

schedule: 45,45 12,0 * * *


Task Instance: fragmentation_AddFragmentation


Task Instance Details

Dependencies Blocking Task From Getting Scheduled
Dependency Reason
Trigger Rule Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 1, 'done': 1}, upstream_task_ids={'fragmentation_AddExcavationRate'}
Dagrun Running Task instance's dagrun was not in the 'running' state but in the state 'failed'.
Task Instance State Task is in the 'upstream_failed' state which is not a valid state for execution. The task must be cleared in order to be run.
Attribute: python_callable
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def cruce_con_fragmentacion(origin_wenco_file_container, origin_split_contianer, origin_split_blob,  origin_wenco_file_blob, final_container, final_blob):

    conn_origin_container_wenco = BlobClient.from_connection_string(conn_str=BLOB_CONNECT_STRING, container_name=origin_wenco_file_container, blob_name=origin_wenco_file_blob)
    download_stream_wenco = conn_origin_container_wenco.download_blob()
    df_wenco = pd.read_json(download_stream_wenco, orient='table')

    # Archivo Split
    conn_origin_container_split = BlobClient.from_connection_string(conn_str=BLOB_CONNECT_STRING, container_name=origin_split_contianer, blob_name=origin_split_blob)
    download_stream_split = conn_origin_container_split.download_blob().readall()
    data_split = json.loads(download_stream_split)
    df_split = pd.DataFrame(data_split['values'], columns=data_split['headers'])

    # Preparando los datos 
    df_split['fecha'] = df_split['TSP'].astype('datetime64[ns]')
    df_split['fecha'] = df_split['fecha'].dt.date

    df_split['P10'] = df_split['P10'].astype(float)
    df_split['P20'] = df_split['P20'].astype(float)
    df_split['P30'] = df_split['P30'].astype(float)
    df_split['P40'] = df_split['P40'].astype(float)
    df_split['P50'] = df_split['P50'].astype(float)
    df_split['P60'] = df_split['P60'].astype(float)
    df_split['P70'] = df_split['P70'].astype(float)
    df_split['P80'] = df_split['P80'].astype(float)
    df_split['P90'] = df_split['P90'].astype(float)
    
    ### Calculando las metricas
    df_split_mean = (df_split.groupby(['Pala', 'Hora', 'fecha'], as_index=False)         
        .agg(P10_mean = pd.NamedAgg(column="P10", aggfunc="mean"),
                P20_mean = pd.NamedAgg(column="P20", aggfunc="mean"),
                P30_mean = pd.NamedAgg(column="P30", aggfunc="mean"),
                P40_mean = pd.NamedAgg(column="P40", aggfunc="mean"),
                P50_mean = pd.NamedAgg(column="P50", aggfunc="mean"),
                P60_mean = pd.NamedAgg(column="P60", aggfunc="mean"),
                P70_mean = pd.NamedAgg(column="P70", aggfunc="mean"),
                P80_mean = pd.NamedAgg(column="P80", aggfunc="mean"),
                 P90_mean = pd.NamedAgg(column="P90", aggfunc="mean"),)       
        ) 

    df_wenco = df_wenco.drop_duplicates()

    df_split_mean['Pala'] = df_split_mean['Pala'].astype(int)
    df_wenco['LOADING_UNIT_IDENT'] = df_wenco['LOADING_UNIT_IDENT'].astype(int)

    df_wenco['fecha'] = df_wenco['fecha'].astype('datetime64[ns]')
    df_split_mean['fecha'] = df_split_mean['fecha'].astype('datetime64[ns]')

    df_join = df_wenco.merge(df_split_mean, how='left', left_on=['fecha', 'hora', 'LOADING_UNIT_IDENT'], right_on=['fecha', 'Hora', 'Pala'], indicator=True)
    df_join = df_join.drop(columns=['_merge'])

    # Configuracion para subir al blob
    blob = BlobClient.from_connection_string(conn_str=BLOB_CONNECT_STRING, container_name=final_container, blob_name=final_blob)
    ## Subir la data 
    json_data = df_join.to_json(index=False, orient='table')
    blob.upload_blob(json_data, overwrite=True)
Task Instance Attributes
Attribute Value
dag_id SierraGorda_ParametersStorage
duration None
end_date 2025-12-18 12:46:36.478449+00:00
execution_date 2025-12-18T00:45:00+00:00
executor_config {}
generate_command <function TaskInstance.generate_command at 0x7783d3ac3040>
hostname
is_premature False
job_id None
key ('SierraGorda_ParametersStorage', 'fragmentation_AddFragmentation', <Pendulum [2025-12-18T00:45:00+00:00]>, 1)
log <Logger airflow.task (INFO)>
log_filepath /usr/local/airflow/logs/SierraGorda_ParametersStorage/fragmentation_AddFragmentation/2025-12-18T00:45:00+00:00.log
log_url http://localhost:8080/admin/airflow/log?execution_date=2025-12-18T00%3A45%3A00%2B00%3A00&task_id=fragmentation_AddFragmentation&dag_id=SierraGorda_ParametersStorage
logger <Logger airflow.task (INFO)>
mark_success_url http://localhost:8080/success?task_id=fragmentation_AddFragmentation&dag_id=SierraGorda_ParametersStorage&execution_date=2025-12-18T00%3A45%3A00%2B00%3A00&upstream=false&downstream=false
max_tries 0
metadata MetaData(bind=None)
next_try_number 1
operator None
pid None
pool default_pool
prev_attempted_tries 0
previous_execution_date_success 2025-12-17 12:45:00+00:00
previous_start_date_success 2025-12-18 00:49:05.012905+00:00
previous_ti <TaskInstance: SierraGorda_ParametersStorage.fragmentation_AddFragmentation 2025-12-17 12:45:00+00:00 [success]>
previous_ti_success <TaskInstance: SierraGorda_ParametersStorage.fragmentation_AddFragmentation 2025-12-17 12:45:00+00:00 [success]>
priority_weight 2
queue default
queued_dttm None
raw False
run_as_user None
start_date 2025-12-18 12:46:36.478440+00:00
state upstream_failed
task <Task(PythonOperator): fragmentation_AddFragmentation>
task_id fragmentation_AddFragmentation
test_mode False
try_number 1
unixname airflow
Task Attributes
Attribute Value
dag <DAG: SierraGorda_ParametersStorage>
dag_id SierraGorda_ParametersStorage
depends_on_past False
deps {<TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>, <TIDep(Previous Dagrun State)>}
do_xcom_push True
downstream_list [<Task(PythonOperator): fragmentation_uploadToSQL>]
downstream_task_ids {'fragmentation_uploadToSQL'}
email None
email_on_failure True
email_on_retry True
end_date None
execution_timeout None
executor_config {}
extra_links []
global_operator_extra_link_dict {}
inlets []
lineage_data None
log <Logger airflow.task.operators (INFO)>
logger <Logger airflow.task.operators (INFO)>
max_retry_delay None
on_failure_callback None
on_retry_callback None
on_success_callback None
op_args []
op_kwargs {'origin_wenco_file_container': 'processed', 'origin_split_contianer': 'raw', 'origin_split_blob': 'SierraGorda/2026-01-03/Split.json', 'origin_wenco_file_blob': 'SierraGorda/Fragmentacion/2026-01-03/cruce_wenco_shovel_banco_tasa_excavacion.json', 'final_container': 'processed', 'final_blob': 'SierraGorda/Fragmentacion/2026-01-03/cruce_wenco_shovel_banco_tasa_excavacion_fragmentacion.json'}
operator_extra_link_dict {}
operator_extra_links ()
outlets []
owner pedro
params {}
pool default_pool
priority_weight 1
priority_weight_total 2
provide_context False
queue default
resources None
retries 0
retry_delay 0:05:00
retry_exponential_backoff False
run_as_user None
schedule_interval 45,45 12,0 * * *
shallow_copy_attrs ('python_callable', 'op_kwargs')
sla None
start_date 2023-04-22T00:00:00+00:00
subdag None
task_concurrency None
task_id fragmentation_AddFragmentation
task_type PythonOperator
template_ext []
template_fields ('templates_dict', 'op_args', 'op_kwargs')
templates_dict None
trigger_rule all_success
ui_color #ffefeb
ui_fgcolor #000
upstream_list [<Task(PythonOperator): fragmentation_AddExcavationRate>]
upstream_task_ids {'fragmentation_AddExcavationRate'}
wait_for_downstream False
weight_rule downstream