DAG: SierraGorda_BrightBoard Functions that clean data in files related to SierraGorda

schedule: 30,30 12,0 * * *


Task Instance: BrighBoard_GenerarCrudeFlandersOpitReal


Task Instance Details

Dependencies Blocking Task From Getting Scheduled
Dependency Reason
Trigger Rule Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 1, 'upstream_failed': 0, 'done': 1}, upstream_task_ids={'BrighBoard_GenerarArchivoFlanders'}
Dagrun Running Task instance's dagrun was not in the 'running' state but in the state 'failed'.
Task Instance State Task is in the 'upstream_failed' state which is not a valid state for execution. The task must be cleared in order to be run.
Attribute: python_callable
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def generar_cruce_flanders_opit(origin_flanders_container, origin_opit_container, origin_opit_blob, final_container, final_blob, rango_dias):
    """
    Función que permite generar un archivo final con los datos de Flanders y Opit
    
    Parámetros
    ----------
    origin_flanders_container: str
        Nombre del contenedor donde se encuentra el archivo Flanders.
    origin_opit_container: str
        Nombre del contenedor donde se encuentra el archivo Opit.
    origin_opit_blob: str
        Nombre del archivo Opit.
    final_container: str
        Nombre del contenedor donde se guardará el archivo final.
    final_blob: str
        Nombre del archivo final.
    rango_dias: int
        Número de días que se retrocederá para obtener los datos de Flanders.

    Returns
    -------
    None
    """

    """ Lectura de archivos """ 
    # Archivo Opit
    conn_origin_containerOpit = BlobClient.from_connection_string(conn_str=BLOB_CONNECT_STRING, container_name=origin_opit_container, blob_name=origin_opit_blob)
    download_streamOpit = conn_origin_containerOpit.download_blob()

    stream = BytesIO()
    download_streamOpit.readinto(stream)
    df_opit = pd.read_parquet(stream)

    # Lectura archivo Flanders
    df_flanders = pd.DataFrame(columns=COLUMNAS_FLANDERS)

    for i in range(0, rango_dias+1):
        try: 
            fecha_anterior = FECHA_ACTUAL_DATETIME - timedelta(days=i)
            fecha_anterior_formateada = fecha_anterior.strftime('%Y-%m-%d')

            """ Leyendo el archivo de flanders """
            conn_origin_containerhhd = BlobClient.from_connection_string(conn_str=BLOB_CONNECT_STRING, container_name=origin_flanders_container, blob_name=f'SierraGorda/BrightBoard/Flanders/{fecha_anterior_formateada}/flanders.json')
            download_streamhhd = conn_origin_containerhhd.download_blob()
            df_flanders_dia = pd.read_json(download_streamhhd, orient='table')  

            df_flanders = pd.concat([df_flanders_dia, df_flanders], ignore_index=True)
        except: 
            pass

    ## Procesar los datos
    df_opit['malla_id_str'] = df_opit['malla_id'].apply(str)
    df_opit['malla_name_str'] = df_opit['malla_name'].apply(str)
    df_opit['blast_date_str'] = df_opit['blast_date'].apply(str)

    df_opit['unique_malla'] = df_opit['malla_id_str'] + '|'+ df_opit['malla_name_str'] + '|' + df_opit['blast_date_str']
    df_opit['x'] = df_opit['holes_coordinates_x'].astype(float)
    df_opit['y'] = df_opit['holes_coordinates_y'].astype(float)

    ### Seteando coordenadas
    df_flanders['x'] = df_flanders['DesignEasting'].astype(float)
    df_flanders['y'] = df_flanders['DesignNorthing'].astype(float)

    df_Flanders_opit = pd.merge(df_flanders, df_opit, how='left', on=['x','y'], indicator=True)
    df_Flanders_opit['DateTime'] = df_Flanders_opit['DateTime'].astype('datetime64[ns]')
    df_Flanders_opit['mes'] = df_Flanders_opit['DateTime'].dt.month

    ## Generando nuevos archivos 
    df_Flanders_opit['DateTimeStr'] = df_Flanders_opit['DateTime'].astype(str).str[0:10]

    fechas = df_Flanders_opit['DateTimeStr'].unique()

    for fecha in fechas: 
        df_aux = df_Flanders_opit.loc[df_Flanders_opit['DateTimeStr']==fecha]

        df_aux = df_aux[['RecordID', 'shift_index', 'Mine_HoleNumber', 'BenchBelow', 'BitNumber',
            'BitUsage', 'AvgHoleEnergy', 'AvgPenetrationRate2', 'HoleDepth',
            'TargetDepth', 'PatternHoleNumber', 'BlastName', 'DrillPattern',
            'DateTime', 'Drill_Number', 'OperatorID', 'HoleNumber',
            'DesignNorthing', 'DesignEasting', 'ActualCollarNorthing',
            'ActualCollarEasting', 'designelevation', 'actualelevation',
            'actualgroundelevation', 'Bench Below', 'Hole Difference', 'Rango',
            'Categoria', 'Nombre Categoria', 'Desviacion', 'Extra Sobreperforación',
            'Extra Reperforación', 'Turno', 'DrillTime', 'x', 'y', 'malla_id',
            'malla_name', 'uniqid', 'owner_id', 'owner_name', 'owner_email',
            'shotfire', 'design_responsible', 'site_name', 'location_name',
            'country', 'coordinates_lat', 'coordinates_lng', 'blast_date', 'Mine',
            'Blast', 'Bank', 'Fase', 'truckName', 'holes_id', 'holes_number',
            'holes_label_blast_new', 'holes_label2', 'holes_length',
            'holes_diameter', 'holes_angle', 'holes_azimuth', 'holes_stemming',
            'holes_subdrilling', 'holes_water', 'holes_redrill', 'holes_deleted',
            'holes_verified', 'holes_problems', 'holes_coordinates_x',
            'holes_coordinates_y', 'holes_zones_final_length',
            'holes_zones_truck_number', 'holes_zones_number_detonators',
            'holes_zones_number_primes', 'holes_zones_temperature', 'holes_polygon',
            'holes_operator_email', 'holes_operator_id', 'holes_operator_name',
            'holes_tipo', 'holes_water_level', 'holes_zones_density',
            'holes_label_blast_new_charge', 'charge_name', 'charge_type',
            'charge_unit', 'charge_quantity', 'charge_rule_unit',
            'charge_rule_value', 'charge_tipo', 'malla_id_str', 'malla_name_str',
            'blast_date_str', 'unique_malla', '_merge', 'mes']]

        ## Configuracion para subir al blob
        blob = BlobClient.from_connection_string(conn_str=BLOB_CONNECT_STRING, container_name=final_container, blob_name=final_blob)

        ## Subir la data 
        json_data = df_aux.to_json(index=False, orient='table')
        blob.upload_blob(json_data, overwrite=True)

        """ Proceso aparte (Subir a otro blob) """
        ## Subir a otro blob 
        connection_string_rgr = 'DefaultEndpointsProtocol=https;AccountName=rgrprimerbf21;AccountKey=RCY0IhFKkBYe8xAp1k9ZfF8QK148d4vjf+Rlhf3yO4fUMNNM+SIMV4lCSG/j6e1CjVpRjhIY1oVR+AStkXbwSw==;EndpointSuffix=core.windows.net'
        blob_rgr = BlobClient.from_connection_string(conn_str=connection_string_rgr, container_name='data', blob_name=f"SierraGorda/FlandersOpitMalla/{fecha}/flanders_opit_malla_{fecha}.json")

        ## Subir la data 
        blob_rgr.upload_blob(json_data, overwrite=True)    
Task Instance Attributes
Attribute Value
dag_id SierraGorda_BrightBoard
duration None
end_date 2025-12-09 07:42:04.062824+00:00
execution_date 2025-12-08T12:30:00+00:00
executor_config {}
generate_command <function TaskInstance.generate_command at 0x7783d3ac3040>
hostname
is_premature False
job_id None
key ('SierraGorda_BrightBoard', 'BrighBoard_GenerarCrudeFlandersOpitReal', <Pendulum [2025-12-08T12:30:00+00:00]>, 1)
log <Logger airflow.task (INFO)>
log_filepath /usr/local/airflow/logs/SierraGorda_BrightBoard/BrighBoard_GenerarCrudeFlandersOpitReal/2025-12-08T12:30:00+00:00.log
log_url http://localhost:8080/admin/airflow/log?execution_date=2025-12-08T12%3A30%3A00%2B00%3A00&task_id=BrighBoard_GenerarCrudeFlandersOpitReal&dag_id=SierraGorda_BrightBoard
logger <Logger airflow.task (INFO)>
mark_success_url http://localhost:8080/success?task_id=BrighBoard_GenerarCrudeFlandersOpitReal&dag_id=SierraGorda_BrightBoard&execution_date=2025-12-08T12%3A30%3A00%2B00%3A00&upstream=false&downstream=false
max_tries 0
metadata MetaData(bind=None)
next_try_number 1
operator None
pid None
pool default_pool
prev_attempted_tries 0
previous_execution_date_success None
previous_start_date_success None
previous_ti None
previous_ti_success None
priority_weight 1
queue default
queued_dttm None
raw False
run_as_user None
start_date 2025-12-09 07:42:04.062811+00:00
state upstream_failed
task <Task(PythonOperator): BrighBoard_GenerarCrudeFlandersOpitReal>
task_id BrighBoard_GenerarCrudeFlandersOpitReal
test_mode False
try_number 1
unixname airflow
Task Attributes
Attribute Value
dag <DAG: SierraGorda_BrightBoard>
dag_id SierraGorda_BrightBoard
depends_on_past False
deps {<TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>, <TIDep(Previous Dagrun State)>}
do_xcom_push True
downstream_list []
downstream_task_ids set()
email None
email_on_failure True
email_on_retry True
end_date None
execution_timeout None
executor_config {}
extra_links []
global_operator_extra_link_dict {}
inlets []
lineage_data None
log <Logger airflow.task.operators (INFO)>
logger <Logger airflow.task.operators (INFO)>
max_retry_delay None
on_failure_callback None
on_retry_callback None
on_success_callback None
op_args []
op_kwargs {'origin_flanders_container': 'processed', 'origin_opit_container': 'raw', 'origin_opit_blob': 'SierraGorda/Historico/opit.parquet', 'final_container': 'processed', 'final_blob': 'SierraGorda/BrightBoard/FlandersOpitMalla/2026-01-02/flanders_opit_malla_2026-01-02.json', 'rango_dias': 5}
operator_extra_link_dict {}
operator_extra_links ()
outlets []
owner pedro
params {}
pool default_pool
priority_weight 1
priority_weight_total 1
provide_context False
queue default
resources None
retries 0
retry_delay 0:05:00
retry_exponential_backoff False
run_as_user None
schedule_interval 30,30 12,0 * * *
shallow_copy_attrs ('python_callable', 'op_kwargs')
sla None
start_date 2023-04-22T00:00:00+00:00
subdag None
task_concurrency None
task_id BrighBoard_GenerarCrudeFlandersOpitReal
task_type PythonOperator
template_ext []
template_fields ('templates_dict', 'op_args', 'op_kwargs')
templates_dict None
trigger_rule all_success
ui_color #ffefeb
ui_fgcolor #000
upstream_list [<Task(PythonOperator): BrighBoard_GenerarArchivoFlanders>]
upstream_task_ids {'BrighBoard_GenerarArchivoFlanders'}
wait_for_downstream False
weight_rule downstream