diff --git a/datascience/src/pipeline/flows/current_segments.py b/datascience/src/pipeline/flows/current_segments.py index afb5be64f6..537fe8d346 100644 --- a/datascience/src/pipeline/flows/current_segments.py +++ b/datascience/src/pipeline/flows/current_segments.py @@ -9,14 +9,13 @@ from config import default_risk_factors from src.pipeline.generic_tasks import extract, load -from src.pipeline.helpers.segments import attribute_segments_to_catches +from src.pipeline.helpers.segments import allocate_segments_to_catches from src.pipeline.processing import df_to_dict_series from src.pipeline.shared_tasks.control_flow import check_flow_not_running from src.pipeline.shared_tasks.dates import get_current_year from src.pipeline.shared_tasks.facades import extract_facade_areas from src.pipeline.shared_tasks.infrastructure import get_table from src.pipeline.shared_tasks.segments import extract_segments_of_year, unnest_segments -from src.pipeline.shared_tasks.vessels import add_vessel_id @task(checkpoint=False) @@ -50,7 +49,6 @@ def extract_last_positions(): def compute_last_positions_facade( last_positions: gpd.GeoDataFrame, facade_areas: gpd.GeoDataFrame ) -> pd.DataFrame: - last_positions_facade_1 = gpd.sjoin(last_positions, facade_areas)[["cfr", "facade"]] unassigned_last_positions = last_positions[ @@ -78,8 +76,7 @@ def compute_last_positions_facade( @task(checkpoint=False) def compute_current_segments(catches, segments): - - current_segments = attribute_segments_to_catches( + current_segments = allocate_segments_to_catches( catches[["cfr", "gear", "fao_area", "species"]], segments[ [ @@ -138,7 +135,6 @@ def compute_control_priorities( last_positions_facade: pd.DataFrame, control_priorities: pd.DataFrame, ) -> pd.DataFrame: - cfr_segment_facade = ( current_segments[["segments"]] .join(last_positions_facade) @@ -170,7 +166,6 @@ def join( current_segments: pd.DataFrame, control_priorities: pd.DataFrame, ) -> pd.DataFrame: - # Group catch data of each vessel in a list of dicts like # [{"gear": "DRB", "species": "SCE", "faoZone": "27.7", "weight": 156.2}, ...] catch_columns = ["gear", "fao_area", "species", "weight"] @@ -231,10 +226,8 @@ def load_current_segments(vessels_segments): # pragma: no cover with Flow("Current segments", executor=LocalDaskExecutor()) as flow: - flow_not_running = check_flow_not_running() with case(flow_not_running, True): - # Extract current_year = get_current_year() catches = extract_catches() @@ -255,7 +248,6 @@ def load_current_segments(vessels_segments): # pragma: no cover current_segments, last_positions_facade, control_priorities ) current_segments = join(catches, current_segments, control_priorities) - current_segments = add_vessel_id(current_segments, vessels_table) # Load load_current_segments(current_segments) diff --git a/datascience/src/pipeline/flows/recompute_controls_segments.py b/datascience/src/pipeline/flows/recompute_controls_segments.py index b1aa841fc7..78596cb6c7 100644 --- a/datascience/src/pipeline/flows/recompute_controls_segments.py +++ b/datascience/src/pipeline/flows/recompute_controls_segments.py @@ -10,7 +10,7 @@ from src.db_config import create_engine from src.pipeline.entities.missions import MissionActionType from src.pipeline.generic_tasks import extract -from src.pipeline.helpers.segments import attribute_segments_to_catches +from src.pipeline.helpers.segments import allocate_segments_to_catches from src.pipeline.processing import df_to_dict_series, prepare_df_for_loading from src.pipeline.shared_tasks.control_flow import check_flow_not_running from src.pipeline.shared_tasks.segments import extract_segments_of_year, unnest_segments @@ -63,10 +63,9 @@ def compute_controls_segments( controls_catches: pd.DataFrame, segments: pd.DataFrame, ) -> pd.DataFrame: - controls_catches = controls_catches.where(controls_catches.notnull(), None) controls_segments = ( - attribute_segments_to_catches( + allocate_segments_to_catches( controls_catches, segments[["segment", "segment_name", "fao_area", "gear", "species"]], )[["id", "segment", "segment_name"]] @@ -100,7 +99,6 @@ def load_controls_segments(controls_segments: pd.DataFrame): e = create_engine("monitorfish_remote") with e.begin() as connection: - logger.info("Creating temporary table") connection.execute( text( @@ -142,10 +140,8 @@ def load_controls_segments(controls_segments: pd.DataFrame): with Flow("Recompute controls segments", executor=LocalDaskExecutor()) as flow: - flow_not_running = check_flow_not_running() with case(flow_not_running, True): - # Parameters year = Parameter("year") control_types = Parameter("control_types") diff --git a/datascience/src/pipeline/queries/monitorfish/current_catches.sql b/datascience/src/pipeline/queries/monitorfish/current_catches.sql index d54bad92ee..8e65118b20 100644 --- a/datascience/src/pipeline/queries/monitorfish/current_catches.sql +++ b/datascience/src/pipeline/queries/monitorfish/current_catches.sql @@ -1,33 +1,82 @@ -WITH deleted_or_corrected_messages AS ( +WITH acknowledged_messages AS ( SELECT referenced_report_id - FROM public.logbook_reports - WHERE operation_type IN ('DEL', 'COR') - AND operation_datetime_utc > CURRENT_TIMESTAMP - INTERVAL '6 months' - -- exclude VisioCapture (which is not real time but has several months of delay) from current_catches - AND (software IS NULL OR software NOT LIKE '%VISIOCaptures%') + FROM logbook_reports + WHERE + operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL '3 months' + AND operation_datetime_utc < CURRENT_TIMESTAMP + INTERVAL '6 hours' + AND operation_type ='RET' + AND value->>'returnStatus' = '000' +), + +deleted_messages AS ( + SELECT + operation_number, + referenced_report_id + FROM logbook_reports + WHERE + operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL '3 months' + AND operation_datetime_utc < CURRENT_TIMESTAMP + INTERVAL '6 hours' + AND operation_type ='DEL' +), + +acknowledged_deleted_messages AS ( + SELECT referenced_report_id + FROM deleted_messages + WHERE + operation_number IN (SELECT referenced_report_id FROM acknowledged_messages) +), + +corrected_messages AS ( + SELECT + referenced_report_id + FROM logbook_reports + WHERE + operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL '3 months' + AND operation_datetime_utc < CURRENT_TIMESTAMP + INTERVAL '6 hours' + AND operation_type ='COR' + AND ( + flag_state NOT IN ('FRA', 'GUF', 'VEN') -- Flag states for which we receive RET + OR report_id IN (SELECT referenced_report_id FROM acknowledged_messages) + ) ), ordered_deps AS ( SELECT cfr, ircs, - external_identification, + external_identification AS external_immatriculation, trip_number, - (value->>'departureDatetimeUtc')::timestamptz AS departure_datetime_utc, - ROW_NUMBER() OVER(PARTITION BY cfr ORDER BY (value->>'departureDatetimeUtc')::timestamptz DESC) as rk + activity_datetime_utc AS departure_datetime_utc, + ROW_NUMBER() OVER (PARTITION BY cfr ORDER BY activity_datetime_utc DESC) as rk FROM public.logbook_reports - WHERE log_type = 'DEP' - AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '6 months' - AND (value->>'departureDatetimeUtc')::timestamptz < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + INTERVAL '24 hours' - AND report_id NOT IN (SELECT referenced_report_id FROM deleted_or_corrected_messages) - AND (software IS NULL OR software NOT LIKE '%VISIOCaptures%') + WHERE + log_type = 'DEP' + AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '3 months' + AND activity_datetime_utc < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + INTERVAL '24 hours' + AND report_id NOT IN (SELECT referenced_report_id FROM corrected_messages) + AND NOT ( + report_id IN (SELECT referenced_report_id FROM acknowledged_deleted_messages) + OR ( + report_id IN (SELECT referenced_report_id FROM deleted_messages) + AND flag_state NOT IN ('FRA', 'GUF', 'VEN') + ) + ) + AND ( + flag_state NOT IN ('FRA', 'GUF', 'VEN') -- Flag states for which we receive RET + OR report_id IN (SELECT referenced_report_id FROM acknowledged_messages) + ) + -- Exclude data that is not real-time electronic logbook data + AND ( + software IS NULL -- Non french vessels + OR software NOT LIKE '%VISIOCaptures%' + ) ), last_deps AS ( SELECT cfr, ircs, - external_identification, + external_immatriculation, departure_datetime_utc, trip_number FROM ordered_deps @@ -40,7 +89,7 @@ last_logbook_reports AS ( MAX(report_datetime_utc) AS last_logbook_message_datetime_utc FROM public.logbook_reports WHERE operation_type IN ('DAT', 'COR') - AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '6 months' + AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '3 months' AND report_datetime_utc < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + INTERVAL '24 hours' AND (software IS NULL OR software NOT LIKE '%VISIOCaptures%') GROUP BY cfr @@ -60,11 +109,22 @@ catches AS ( JOIN last_deps d ON r.cfr = d.cfr AND r.trip_number = d.trip_number - WHERE log_type = 'FAR' - AND operation_type IN ('DAT', 'COR') - AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '6 months' - AND operation_number NOT IN (SELECT referenced_report_id FROM deleted_or_corrected_messages) - AND (software IS NULL OR software NOT LIKE '%VISIOCaptures%') + WHERE + log_type = 'FAR' + AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '3 months' + AND report_id NOT IN (SELECT referenced_report_id FROM corrected_messages) + AND NOT ( + report_id IN (SELECT referenced_report_id FROM acknowledged_deleted_messages) + OR ( + report_id IN (SELECT referenced_report_id FROM deleted_messages) + AND r.flag_state NOT IN ('FRA', 'GUF', 'VEN') + ) + ) + AND ( + flag_state NOT IN ('FRA', 'GUF', 'VEN') -- Flag states for which we receive RET + OR report_id IN (SELECT referenced_report_id FROM acknowledged_messages) + ) + AND (software IS NULL OR software NOT LIKE '%VISIOCaptures%') ), summed_catches AS ( @@ -73,9 +133,10 @@ summed_catches AS ( species, gear, fao_area, + mesh, SUM(weight) as weight FROM catches - GROUP BY cfr, species, gear, fao_area + GROUP BY cfr, species, gear, fao_area, mesh ), gear_onboard AS ( @@ -93,21 +154,40 @@ gear_onboard AS ( ) SELECT - COALESCE(last_logbook_reports.cfr, last_deps.cfr) AS cfr, + ROW_NUMBER() OVER (ORDER BY COALESCE(l.cfr, last_deps.cfr), species) AS catch_id, + COALESCE(l.cfr, last_deps.cfr) AS cfr, + COALESCE( + v_cfr.id, + v_ircs.id, + v_ext.id + ) AS vessel_id, + EXTRACT(YEAR from CURRENT_TIMESTAMP AT TIME ZONE 'UTC') AS year, + fao_area, + gear, + mesh, + species, + s.species_name, + weight, + COALESCE( + v_cfr.vessel_type, + v_ircs.vessel_type, + v_ext.vessel_type + ) AS vessel_type, last_deps.ircs, - last_deps.external_identification AS external_immatriculation, - last_logbook_reports.last_logbook_message_datetime_utc, + last_deps.external_immatriculation AS external_immatriculation, + l.last_logbook_message_datetime_utc, departure_datetime_utc, trip_number, - go.gear_onboard, - species, - gear, - fao_area, - weight -FROM last_logbook_reports + go.gear_onboard +FROM last_logbook_reports l FULL OUTER JOIN last_deps -ON last_logbook_reports.cfr = last_deps.cfr +ON l.cfr = last_deps.cfr LEFT JOIN summed_catches -ON last_logbook_reports.cfr = summed_catches.cfr +ON l.cfr = summed_catches.cfr LEFT JOIN gear_onboard go -ON last_logbook_reports.cfr = go.cfr +ON l.cfr = go.cfr +LEFT JOIN species s +ON s.species_code = summed_catches.species +LEFT JOIN vessels v_cfr ON v_cfr.cfr = COALESCE(l.cfr, last_deps.cfr) +LEFT JOIN vessels v_ircs ON v_ircs.ircs = last_deps.ircs AND v_ircs.cfr IS NULL +LEFT JOIN vessels v_ext ON v_ext.external_immatriculation = last_deps.external_immatriculation AND v_ext.cfr IS NULL \ No newline at end of file diff --git a/datascience/tests/test_pipeline/test_flows/test_current_segments.py b/datascience/tests/test_pipeline/test_flows/test_current_segments.py index 26bd845201..e7ea619b4a 100644 --- a/datascience/tests/test_pipeline/test_flows/test_current_segments.py +++ b/datascience/tests/test_pipeline/test_flows/test_current_segments.py @@ -100,7 +100,7 @@ def current_segments() -> pd.DataFrame: def test_extract_catches(reset_test_data): catches = extract_catches.run() - assert len(catches) == 6 + assert len(catches) == 5 assert set(catches.cfr) == { "ABC000542519", "ABC000306959",