diff --git a/datascience/src/pipeline/flows/current_segments.py b/datascience/src/pipeline/flows/current_segments.py index afb5be64f6..1b40833870 100644 --- a/datascience/src/pipeline/flows/current_segments.py +++ b/datascience/src/pipeline/flows/current_segments.py @@ -1,33 +1,31 @@ from datetime import datetime from pathlib import Path -import geopandas as gpd import pandas as pd import prefect -from prefect import Flow, case, task +from prefect import Flow, Parameter, case, task from prefect.executors import LocalDaskExecutor from config import default_risk_factors from src.pipeline.generic_tasks import extract, load -from src.pipeline.helpers.segments import attribute_segments_to_catches +from src.pipeline.helpers.segments import allocate_segments_to_catches from src.pipeline.processing import df_to_dict_series from src.pipeline.shared_tasks.control_flow import check_flow_not_running from src.pipeline.shared_tasks.dates import get_current_year -from src.pipeline.shared_tasks.facades import extract_facade_areas -from src.pipeline.shared_tasks.infrastructure import get_table -from src.pipeline.shared_tasks.segments import extract_segments_of_year, unnest_segments -from src.pipeline.shared_tasks.vessels import add_vessel_id +from src.pipeline.shared_tasks.segments import extract_segments_of_year @task(checkpoint=False) -def extract_catches(): +def extract_current_catches(number_of_days: int) -> pd.DataFrame: return extract( - db_name="monitorfish_remote", query_filepath="monitorfish/current_catches.sql" + db_name="monitorfish_remote", + query_filepath="monitorfish/current_catches.sql", + params={"number_of_days": number_of_days}, ) @task(checkpoint=False) -def extract_control_priorities(): +def extract_control_priorities() -> pd.DataFrame: return extract( db_name="monitorfish_remote", query_filepath="monitorfish/control_priorities.sql", @@ -40,60 +38,68 @@ def extract_last_positions(): return extract( db_name="monitorfish_remote", query_filepath="monitorfish/last_positions.sql", - backend="geopandas", - geom_col="geometry", - crs=4326, ) @task(checkpoint=False) -def compute_last_positions_facade( - last_positions: gpd.GeoDataFrame, facade_areas: gpd.GeoDataFrame -) -> pd.DataFrame: - - last_positions_facade_1 = gpd.sjoin(last_positions, facade_areas)[["cfr", "facade"]] - - unassigned_last_positions = last_positions[ - ~last_positions.cfr.isin(last_positions_facade_1.cfr) - ].copy(deep=True) - - # Vessels that are not directly in a facade geometry are oftentimes vessels in a - # port, which facade geometries genereally do not encompass. In order to match - # these vessels to a nearby facade, we drawed a ~10km circle around them - # and attempt a spatial join on this buffered geometry. - unassigned_last_positions["geometry"] = unassigned_last_positions.buffer(0.1) - - last_positions_facade_2 = gpd.sjoin( - unassigned_last_positions, facade_areas, how="left" - )[["cfr", "facade"]] - - last_positions_facade_2 = last_positions_facade_2.drop_duplicates(subset=["cfr"]) - - last_positions_facade = pd.concat( - [last_positions_facade_1, last_positions_facade_2] - ).set_index("cfr") - - return last_positions_facade - - -@task(checkpoint=False) -def compute_current_segments(catches, segments): - - current_segments = attribute_segments_to_catches( - catches[["cfr", "gear", "fao_area", "species"]], - segments[ +def compute_current_segments(current_catches, segments, control_priorities): + current_segments = allocate_segments_to_catches( + current_catches[ [ - "segment", - "gear", + "catch_id", + "cfr", + "vessel_id", + "year", "fao_area", + "gear", + "mesh", "species", + "scip_species_type", + "weight", + "vessel_type", + "ircs", + "external_immatriculation", + "last_logbook_message_datetime_utc", + "departure_datetime_utc", + "trip_number", + "gear_onboard", + ] + ], + segments[ + [ + "year", + "segment", + "segment_name", + "gears", + "fao_areas", + "min_mesh", + "max_mesh", + "target_species", + "min_share_of_target_species", + "main_scip_species_type", + "priority", + "vessel_types", "impact_risk_factor", ] ], + catch_id_column="catch_id", + batch_id_column="cfr", ) - # Aggregate by vessel + control_priorities = ( + pd.merge(current_segments, control_priorities, on=["segment", "facade"]) + .sort_values("control_priority_level", ascending=False) + .groupby("cfr")[["cfr", "segment", "control_priority_level"]] + .head(1) + .set_index("cfr") + .rename( + columns={ + "segment": "segment_highest_priority", + } + ) + ) + # Aggregate by vessel current_segments_impact = ( current_segments.sort_values("impact_risk_factor", ascending=False) .groupby("cfr")[["cfr", "segment", "impact_risk_factor"]] @@ -110,7 +116,7 @@ def compute_current_segments(catches, segments): current_segments.groupby("cfr")["segment"].unique().rename("segments") ) - total_catch_weight = catches.groupby("cfr")["weight"].sum() + total_catch_weight = current_catches.groupby("cfr")["weight"].sum() total_catch_weight = total_catch_weight.rename("total_weight_onboard") current_segments = pd.merge( @@ -129,57 +135,15 @@ def compute_current_segments(catches, segments): how="outer", ) - return current_segments - - -@task(checkpoint=False) -def compute_control_priorities( - current_segments: pd.DataFrame, - last_positions_facade: pd.DataFrame, - control_priorities: pd.DataFrame, -) -> pd.DataFrame: - - cfr_segment_facade = ( - current_segments[["segments"]] - .join(last_positions_facade) - .explode("segments") - .rename(columns={"segments": "segment"}) - .reset_index() - .dropna(subset=["segment", "facade"]) - ) - - control_priorities = ( - pd.merge(cfr_segment_facade, control_priorities, on=["segment", "facade"]) - .sort_values("control_priority_level", ascending=False) - .groupby("cfr")[["cfr", "segment", "control_priority_level"]] - .head(1) - .set_index("cfr") - .rename( - columns={ - "segment": "segment_highest_priority", - } - ) - ) - - return control_priorities - - -@task(checkpoint=False) -def join( - catches: pd.DataFrame, - current_segments: pd.DataFrame, - control_priorities: pd.DataFrame, -) -> pd.DataFrame: - # Group catch data of each vessel in a list of dicts like # [{"gear": "DRB", "species": "SCE", "faoZone": "27.7", "weight": 156.2}, ...] catch_columns = ["gear", "fao_area", "species", "weight"] - species_onboard = catches[catch_columns] + species_onboard = current_catches[catch_columns] species_onboard = species_onboard.rename(columns={"fao_area": "faoZone"}) species_onboard = df_to_dict_series( species_onboard.dropna(subset=["species"]), result_colname="species_onboard" ) - species_onboard = catches[["cfr"]].join(species_onboard) + species_onboard = current_catches[["cfr"]].join(species_onboard) species_onboard = species_onboard.dropna(subset=["species_onboard"]) species_onboard = species_onboard.groupby("cfr")["species_onboard"].apply(list) @@ -195,7 +159,9 @@ def join( "gear_onboard", ] - last_logbook_report = catches[last_logbook_report_columns].groupby("cfr").head(1) + last_logbook_report = ( + current_catches[last_logbook_report_columns].groupby("cfr").head(1) + ) last_logbook_report = last_logbook_report.set_index("cfr") # Join departure, catches and segments information into a single table with 1 line @@ -231,31 +197,20 @@ def load_current_segments(vessels_segments): # pragma: no cover with Flow("Current segments", executor=LocalDaskExecutor()) as flow: - flow_not_running = check_flow_not_running() with case(flow_not_running, True): - # Extract + number_of_days = Parameter("number_of_days", 90) current_year = get_current_year() - catches = extract_catches() + current_catches = extract_current_catches(number_of_days=number_of_days) last_positions = extract_last_positions() segments = extract_segments_of_year(current_year) - facade_areas = extract_facade_areas() control_priorities = extract_control_priorities() - vessels_table = get_table("vessels") - # Transform - last_positions_facade = compute_last_positions_facade( - last_positions, facade_areas - ) - segments = unnest_segments(segments) - current_segments = compute_current_segments(catches, segments) - control_priorities = compute_control_priorities( - current_segments, last_positions_facade, control_priorities + current_segments = compute_current_segments( + current_catches, segments, control_priorities ) - current_segments = join(catches, current_segments, control_priorities) - current_segments = add_vessel_id(current_segments, vessels_table) # Load load_current_segments(current_segments) diff --git a/datascience/src/pipeline/flows/recompute_controls_segments.py b/datascience/src/pipeline/flows/recompute_controls_segments.py index b1aa841fc7..78596cb6c7 100644 --- a/datascience/src/pipeline/flows/recompute_controls_segments.py +++ b/datascience/src/pipeline/flows/recompute_controls_segments.py @@ -10,7 +10,7 @@ from src.db_config import create_engine from src.pipeline.entities.missions import MissionActionType from src.pipeline.generic_tasks import extract -from src.pipeline.helpers.segments import attribute_segments_to_catches +from src.pipeline.helpers.segments import allocate_segments_to_catches from src.pipeline.processing import df_to_dict_series, prepare_df_for_loading from src.pipeline.shared_tasks.control_flow import check_flow_not_running from src.pipeline.shared_tasks.segments import extract_segments_of_year, unnest_segments @@ -63,10 +63,9 @@ def compute_controls_segments( controls_catches: pd.DataFrame, segments: pd.DataFrame, ) -> pd.DataFrame: - controls_catches = controls_catches.where(controls_catches.notnull(), None) controls_segments = ( - attribute_segments_to_catches( + allocate_segments_to_catches( controls_catches, segments[["segment", "segment_name", "fao_area", "gear", "species"]], )[["id", "segment", "segment_name"]] @@ -100,7 +99,6 @@ def load_controls_segments(controls_segments: pd.DataFrame): e = create_engine("monitorfish_remote") with e.begin() as connection: - logger.info("Creating temporary table") connection.execute( text( @@ -142,10 +140,8 @@ def load_controls_segments(controls_segments: pd.DataFrame): with Flow("Recompute controls segments", executor=LocalDaskExecutor()) as flow: - flow_not_running = check_flow_not_running() with case(flow_not_running, True): - # Parameters year = Parameter("year") control_types = Parameter("control_types") diff --git a/datascience/src/pipeline/queries/monitorfish/current_catches.sql b/datascience/src/pipeline/queries/monitorfish/current_catches.sql index 8e65118b20..ec8980b619 100644 --- a/datascience/src/pipeline/queries/monitorfish/current_catches.sql +++ b/datascience/src/pipeline/queries/monitorfish/current_catches.sql @@ -2,7 +2,7 @@ WITH acknowledged_messages AS ( SELECT referenced_report_id FROM logbook_reports WHERE - operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL '3 months' + operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL ':number_of_days days' AND operation_datetime_utc < CURRENT_TIMESTAMP + INTERVAL '6 hours' AND operation_type ='RET' AND value->>'returnStatus' = '000' @@ -14,7 +14,7 @@ deleted_messages AS ( referenced_report_id FROM logbook_reports WHERE - operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL '3 months' + operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL ':number_of_days days' AND operation_datetime_utc < CURRENT_TIMESTAMP + INTERVAL '6 hours' AND operation_type ='DEL' ), @@ -31,7 +31,7 @@ corrected_messages AS ( referenced_report_id FROM logbook_reports WHERE - operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL '3 months' + operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL ':number_of_days days' AND operation_datetime_utc < CURRENT_TIMESTAMP + INTERVAL '6 hours' AND operation_type ='COR' AND ( @@ -51,7 +51,7 @@ ordered_deps AS ( FROM public.logbook_reports WHERE log_type = 'DEP' - AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '3 months' + AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL ':number_of_days days' AND activity_datetime_utc < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + INTERVAL '24 hours' AND report_id NOT IN (SELECT referenced_report_id FROM corrected_messages) AND NOT ( @@ -89,7 +89,7 @@ last_logbook_reports AS ( MAX(report_datetime_utc) AS last_logbook_message_datetime_utc FROM public.logbook_reports WHERE operation_type IN ('DAT', 'COR') - AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '3 months' + AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL ':number_of_days days' AND report_datetime_utc < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + INTERVAL '24 hours' AND (software IS NULL OR software NOT LIKE '%VISIOCaptures%') GROUP BY cfr @@ -111,7 +111,7 @@ catches AS ( AND r.trip_number = d.trip_number WHERE log_type = 'FAR' - AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '3 months' + AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL ':number_of_days days' AND report_id NOT IN (SELECT referenced_report_id FROM corrected_messages) AND NOT ( report_id IN (SELECT referenced_report_id FROM acknowledged_deleted_messages) @@ -166,7 +166,7 @@ SELECT gear, mesh, species, - s.species_name, + s.scip_species_type, weight, COALESCE( v_cfr.vessel_type, @@ -174,7 +174,7 @@ SELECT v_ext.vessel_type ) AS vessel_type, last_deps.ircs, - last_deps.external_immatriculation AS external_immatriculation, + last_deps.external_immatriculation, l.last_logbook_message_datetime_utc, departure_datetime_utc, trip_number, diff --git a/datascience/src/pipeline/queries/monitorfish/last_positions.sql b/datascience/src/pipeline/queries/monitorfish/last_positions.sql index d02d2081d2..4db41ec6d6 100644 --- a/datascience/src/pipeline/queries/monitorfish/last_positions.sql +++ b/datascience/src/pipeline/queries/monitorfish/last_positions.sql @@ -1,7 +1,8 @@ -SELECT +SELECT DISTINCT ON (cfr) cfr, - latitude, - longitude, - ST_SetSRId(ST_MakePoint(longitude, latitude), 4326) AS geometry -FROM last_positions -WHERE cfr IS NOT NULL \ No newline at end of file + f.facade +FROM last_positions p +LEFT JOIN facade_areas_subdivided f +ON ST_Intersects(ST_SetSRId(ST_MakePoint(p.longitude, p.latitude), 4326), f.geometry) +WHERE cfr IS NOT NULL +ORDER BY cfr, last_position_datetime_utc DESC \ No newline at end of file diff --git a/datascience/tests/test_data/csv/current_catches.csv b/datascience/tests/test_data/csv/current_catches.csv new file mode 100644 index 0000000000..0cd94bd445 --- /dev/null +++ b/datascience/tests/test_data/csv/current_catches.csv @@ -0,0 +1,32 @@ +"catch_id","cfr","vessel_id","year","fao_area","gear","mesh","species","scip_species_type","weight","vessel_type","ircs","external_immatriculation","last_logbook_message_datetime_utc","departure_datetime_utc","trip_number","gear_onboard","segment","impact_risk_factor" +1,"vessel1_L_T8-9",1,2050,"27.7.a","OTB",80,"BSS",,200,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]",, +2,"vessel1_L_T8-9",1,2050,"27.8.a","OTB",80,"BSS",,200,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","T8-9","2.2" +3,"vessel1_L_T8-9",1,2050,"27.7.a","LLS",,"BSS",,200,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","L","1.9" +4,"vessel1_L_T8-9",1,2050,"27.8.a","LLS",,"BSS",,200,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","L","1.9" +5,"vessel1_L_T8-9",1,2050,"27.7.a","OTB",80,"HKE","DEMERSAL",100,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]",, +6,"vessel1_L_T8-9",1,2050,"27.8.a","OTB",80,"HKE","DEMERSAL",100,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","T8-9","2.2" +7,"vessel1_L_T8-9",1,2050,"27.7.a","LLS",,"HKE","DEMERSAL",100,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","L","1.9" +8,"vessel1_L_T8-9",1,2050,"27.8.a","LLS",,"HKE","DEMERSAL",100,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","L","1.9" +9,"vessel1_L_T8-9",1,2050,"27.7.a","OTB",80,"NEP",,250,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]",, +10,"vessel1_L_T8-9",1,2050,"27.8.a","OTB",80,"NEP",,250,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","T8-9","2.2" +11,"vessel1_L_T8-9",1,2050,"27.7.a","LLS",,"NEP",,250,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","L","1.9" +12,"vessel1_L_T8-9",1,2050,"27.8.a","LLS",,"NEP",,250,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","L","1.9" +13,"vessel1_L_T8-9",1,2050,"27.7.a","OTB",80,"SOL","DEMERSAL",100,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]",, +14,"vessel1_L_T8-9",1,2050,"27.8.a","OTB",80,"SOL","DEMERSAL",100,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","T8-9","2.2" +15,"vessel1_L_T8-9",1,2050,"27.7.a","LLS",,"SOL","DEMERSAL",100,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","L","1.9" +16,"vessel1_L_T8-9",1,2050,"27.8.a","LLS",,"SOL","DEMERSAL",100,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","L","1.9" +17,"vessel1_L_T8-9",1,2050,"27.7.a","OTB",80,"SWO","TUNA",80,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]",, +18,"vessel1_L_T8-9",1,2050,"27.8.a","OTB",80,"SWO","TUNA",80,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","T8-9","2.2" +19,"vessel1_L_T8-9",1,2050,"27.7.a","LLS",,"SWO","TUNA",80,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","L","1.9" +20,"vessel1_L_T8-9",1,2050,"27.8.a","LLS",,"SWO","TUNA",80,"Navire polyvalent","IRCS1",,"2020-12-05 12:58:00","2020-12-05 12:58:00","TRIP_ABC","[{""gear"": ""OTB"", ""mesh"": 80, ""dimensions"": ""20;2""},{""gear"": ""LLS"", ""mesh"": None, ""dimensions"": ""7.0;8.0""}]","L","1.9" +21,"noVesselId_T8-PEL_T8-9",,2050,"27.8","OTM",80,"PIL","PELAGIC",5000,,"IRCS_","EXT_","2021-11-05 10:41:30",,,"None","T8-PEL","2.3" +22,"noVesselId_T8-PEL_T8-9",,2050,"27.9","OTM",80,"PIL","PELAGIC",5000,,"IRCS_","EXT_","2021-11-05 10:41:30",,,"None",, +25,"vessel2_FT",2,2050,"27.7.d","OTB",90,"ABC",,200000,"Chalutier pêche arrière congélateur",,,"2026-05-05 10:41:30","2026-05-05 10:41:30","TRIP_DEF","None","FT","3.3" +26,"vessel2_FT",2,2050,"27.7.e","OTB",90,"DEF",,22000,"Chalutier pêche arrière congélateur",,,"2026-05-05 10:41:30","2026-05-05 10:41:30","TRIP_DEF","None","FT","3.3" +27,"vessel2_FT",2,2050,"28.8.a","OTB",90,"GHI",,15000,"Chalutier pêche arrière congélateur",,,"2026-05-05 10:41:30","2026-05-05 10:41:30","TRIP_DEF","None",, +28,"vessel3_T8-9",3,2050,"27.7.d","OTB",90,"ABC",,200000,"Navire polyvalent","IRCS3","EXT3","2020-02-01 08:21:20","2020-02-01 08:21:20",,"None",, +29,"vessel3_T8-9",3,2050,"27.7.e","OTB",90,"DEF",,22000,"Navire polyvalent","IRCS3","EXT3","2020-02-01 08:21:20","2020-02-01 08:21:20",,"None",, +30,"vessel3_T8-9",3,2050,"27.8.a","OTB",90,"GHI",,15000,"Navire polyvalent","IRCS3","EXT3","2020-02-01 08:21:20","2020-02-01 08:21:20",,"None","T8-9","2.2" +31,"vessel4_L-HKE",4,2050,"27.8.a","LLS",,"HKE","DEMERSAL",120,"Navire polyvalent","IRCS4",,"2020-03-01 08:21:20","2020-03-01 08:21:20","TRIP_GHI","None","L HKE","2.2" +32,"vessel4_L-HKE",4,2050,"27.8.a","LLS",,"COD","DEMERSAL",50,"Navire polyvalent","IRCS4",,"2020-03-01 08:21:20","2020-03-01 08:21:20","TRIP_GHI","None","L HKE","2.2" +33,"vessel5_nocatch",5,2050,,,,,,,"Navire polyvalent","IRCS5",,"2020-03-01 08:21:20","2020-03-01 08:21:20","TRIP_JKL","None",, diff --git a/datascience/tests/test_pipeline/test_flows/test_current_segments.py b/datascience/tests/test_pipeline/test_flows/test_current_segments.py index 26bd845201..8ec6e13903 100644 --- a/datascience/tests/test_pipeline/test_flows/test_current_segments.py +++ b/datascience/tests/test_pipeline/test_flows/test_current_segments.py @@ -1,21 +1,15 @@ -import datetime +from ast import literal_eval -import geopandas as gpd -import numpy as np import pandas as pd import pytest -from dateutil import relativedelta -from shapely.geometry import Polygon +from config import TEST_DATA_LOCATION from src.pipeline.flows.current_segments import ( - compute_control_priorities, compute_current_segments, - compute_last_positions_facade, - extract_catches, extract_control_priorities, + extract_current_catches, extract_last_positions, flow, - join, ) from src.read_query import read_query from tests.mocks import mock_check_flow_not_running @@ -24,82 +18,57 @@ @pytest.fixture -def current_segments() -> pd.DataFrame: - now = datetime.datetime.utcnow() +def expected_last_positions() -> pd.DataFrame: return pd.DataFrame( { - "cfr": [ - "ABC000000000", - "ABC000306959", - "ABC000542519", - "INVA_PNO_VES", - "___TARGET___", - ], - "last_logbook_message_datetime_utc": [ - now - relativedelta.relativedelta(months=1, minutes=27), - now - datetime.timedelta(days=1, hours=6), - now - datetime.timedelta(weeks=1, days=3), - now - relativedelta.relativedelta(months=1, minutes=14), - now - relativedelta.relativedelta(months=1, minutes=34), - ], - "departure_datetime_utc": [ - pd.NaT, - now - datetime.timedelta(days=2), - now - datetime.timedelta(weeks=1, days=5), - pd.NaT, - pd.NaT, - ], - "trip_number": [None, "20210001", "20210002", None, None], - "gear_onboard": [ - None, - [{"gear": "OTM", "mesh": 80, "dimensions": None}], - [{"gear": "OTB", "mesh": 80, "dimensions": None}], - None, - None, - ], - "species_onboard": [ - None, - [ - { - "gear": "OTM", - "weight": 713.0, - "faoZone": "27.8.a", - "species": "HKE", - } - ], - [ - { - "gear": "OTB", - "weight": 157.0, - "faoZone": "27.8.c", - "species": "SOL", - }, - { - "gear": "OTB", - "weight": 2426.0, - "faoZone": "27.8.c", - "species": "HKE", - }, - ], - None, - None, - ], - "segments": [[], ["SWW04"], ["SWW01/02/03"], [], []], - "total_weight_onboard": [0.0, 713.0, 2583.0, 0.0, 0.0], - "probable_segments": [None, None, None, None, None], - "impact_risk_factor": [1.0, 2.1, 3.0, 1.0, 1.0], - "control_priority_level": [1.0, 1.0, 1.0, 1.0, 1.0], - "segment_highest_impact": [None, "SWW04", "SWW01/02/03", None, None], - "segment_highest_priority": [None, None, None, None, None], - "vessel_id": [None, 1.0, 2.0, None, 7.0], - "external_immatriculation": [None, "RV348407", "RO237719", None, None], - "ircs": [None, "LLUK", "FQ7058", None, None], + "cfr": ["ABC000055481", "ABC000542519"], + "facade": [None, "NAMO"], } ) -def test_extract_catches(reset_test_data): - catches = extract_catches.run() +@pytest.fixture +def current_catches() -> pd.DataFrame: + df = pd.read_csv( + TEST_DATA_LOCATION / "csv/current_catches.csv", + converters={"gear_onboard": literal_eval}, + parse_dates=[ + "last_logbook_message_datetime_utc", + "departure_datetime_utc", + ], + ) + return df + + +@pytest.fixture +def segments_of_year() -> pd.DataFrame: + df = pd.read_csv( + TEST_DATA_LOCATION / "csv/segments.csv", + converters={ + "gears": literal_eval, + "fao_areas": literal_eval, + "target_species": literal_eval, + "vessel_types": literal_eval, + }, + ) + + df = df[df.year == 2050].reset_index(drop=True) + return df + + +@pytest.fixture +def control_priorities() -> pd.DataFrame: + return pd.DataFrame( + { + "facade": [], + "segment": [], + "control_priority_level": [], + } + ) + + +def test_extract_current_catches(reset_test_data, current_catches): + catches = extract_current_catches.run(number_of_days=90) assert len(catches) == 6 assert set(catches.cfr) == { "ABC000542519", @@ -115,6 +84,7 @@ def test_extract_catches(reset_test_data): assert catches.loc[ (catches.cfr == "ABC000542519") & (catches.species == "HKE"), "weight" ].to_list() == [2426.0] + assert list(catches) == list(current_catches) def test_extract_control_priorities(reset_test_data): @@ -126,350 +96,22 @@ def test_extract_control_priorities(reset_test_data): pd.testing.assert_frame_equal(control_priorities, expected_control_priorities) -def test_extract_last_positions(reset_test_data): +def test_extract_last_positions(reset_test_data, expected_last_positions): last_positions = extract_last_positions.run() - assert last_positions.crs.to_string() == "EPSG:4326" - last_positions["wkt"] = last_positions["geometry"].map(str) - last_positions = last_positions.drop(columns=["geometry"]) - expected_last_positions = pd.DataFrame( - columns=["cfr", "latitude", "longitude", "wkt"], - data=[ - ["ABC000055481", 53.435, 5.553, "POINT (5.553 53.435)"], - ["ABC000542519", 43.324, 5.359, "POINT (5.359 43.324)"], - ], - ) pd.testing.assert_frame_equal(last_positions, expected_last_positions) -def test_compute_last_positions_facade(): - last_positions = pd.DataFrame( - { - "cfr": ["A", "B", "C", "D"], - "latitude": [45, 45, 45.1, 45], - "longitude": [-5, -5.1, -5, -8], - } +def test_compute_current_segments( + current_catches, segments_of_year, control_priorities +): + res = compute_current_segments.run( + current_catches, segments_of_year, control_priorities ) + print(res) + breakpoint() - last_positions = gpd.GeoDataFrame( - last_positions, - geometry=gpd.points_from_xy(last_positions.longitude, last_positions.latitude), - crs=4326, - ) - - facade_areas = gpd.GeoDataFrame( - { - "facade": ["Facade 1", "Facade 1", "Facade 2"], - "geometry": [ - Polygon( - [ - (-5.05, 45.05), - (0, 45.05), - (0, 0), - (-5.05, 0), - (-5.05, 45.05), - ] - ), - Polygon( - [ - (-5.15, 45.05), - (-7, 45.05), - (-7, 0), - (-5.15, 0), - (-5.15, 45.05), - ] - ), - Polygon( - [ - (-5.05, 47), - (0, 47), - (0, 45.15), - (-5.05, 45.15), - (-5.05, 47), - ] - ), - ], - }, - crs=4326, - ) - - last_positions_facade = compute_last_positions_facade.run( - last_positions, facade_areas - ) - - # In the (rare) case where a ship is just outside the boundary of two nearby - # facades, as is the case for ship C here, it must be attributed to one if the - # facades, but without any guarantee on which one will be picked. - - expected_last_positions_facade_1 = pd.DataFrame( - { - "cfr": ["A", "B", "C", "D"], - "facade": ["Facade 1", "Facade 1", "Facade 2", None], - } - ).set_index("cfr") - - expected_last_positions_facade_2 = pd.DataFrame( - { - "cfr": ["A", "B", "C", "D"], - "facade": ["Facade 1", "Facade 1", "Facade 1", None], - } - ).set_index("cfr") - - try: - pd.testing.assert_frame_equal( - last_positions_facade.convert_dtypes(), - expected_last_positions_facade_1.convert_dtypes(), - ) - except AssertionError: - pd.testing.assert_frame_equal( - last_positions_facade.convert_dtypes(), - expected_last_positions_facade_2.convert_dtypes(), - ) - - -def test_compute_current_segments(): - segments_definitions = pd.DataFrame( - data=[ - ["A", "DRB", "27.7", "SCE", 1.1], - ["A", None, "37", None, 1.1], - ["B", "OTM", "27.7.b.4", "HKE", 1], - ["B", "DRB", "27.7", "SCE", 1], - ["C", "OTM", None, "BFT", 1], - ["D", "OTB", "27.4", None, 1], - ["E", "PTB", None, None, 3], - ["F", None, None, "TUR", 1], - ], - columns=[ - "segment", - "gear", - "fao_area", - "species", - "impact_risk_factor", - ], - ) - - vessels_catches = pd.DataFrame( - data=[ - ["vessel_1", "DRB", "27.7.b", "SCE", 123.56], - ["vessel_2", "PTB", "37.5", "TUR", 1231.4], - ["vessel_3", "OTM", "27.7.b", "HKE", 1203.4], - ["vessel_4", "OTM", "27.7.b.4", "HKE", 13.4], - ["vessel_4", "OTB", "27.4.b.1", "HKE", 1234], - ], - columns=["cfr", "gear", "fao_area", "species", "weight"], - ) - res = compute_current_segments.run(vessels_catches, segments_definitions) - - expected_res = pd.DataFrame( - data=[ - [["A", "B"], 123.56, "A", 1.1], - [["E", "F", "A"], 1231.4, "E", 3.0], - [float("nan"), 1203.4, None, None], - [["B", "D"], 1247.4, "B", 1.0], - ], - columns=[ - "segments", - "total_weight_onboard", - "segment_highest_impact", - "impact_risk_factor", - ], - index=pd.Index( - data=[ - "vessel_1", - "vessel_2", - "vessel_3", - "vessel_4", - ], - name="cfr", - ), - ) - - expected_res["segments"] = expected_res.segments.map(sorted, na_action="ignore") - res["segments"] = res.segments.map(sorted, na_action="ignore") - - pd.testing.assert_frame_equal(res.convert_dtypes(), expected_res.convert_dtypes()) - - -def test_compute_control_priorities(): - current_segments = pd.DataFrame( - { - "cfr": ["Vessel 1", "Vessel 2", "Vessel 3"], - "segments": [ - ["Segment 1", "Segment 2"], - ["Segment 1", "Segment 3"], - None, - ], - } - ).set_index("cfr") - - last_positions_facade = pd.DataFrame( - { - "cfr": ["Vessel 1", "Vessel 2", "Vessel 4"], - "facade": [None, "Facade 2", "Facade 4"], - } - ).set_index("cfr") - - control_priorities = pd.DataFrame( - { - "segment": ["Segment 1", "Segment 3", "Segment 3"], - "facade": ["Facade 2", "Facade 1", "Facade 2"], - "control_priority_level": [1, 4, 3], - } - ) - - control_priorities = compute_control_priorities.run( - current_segments, - last_positions_facade, - control_priorities, - ) - - expected_control_priorities = pd.DataFrame( - { - "cfr": ["Vessel 2"], - "segment_highest_priority": ["Segment 3"], - "control_priority_level": [3], - } - ).set_index("cfr") - - pd.testing.assert_frame_equal(control_priorities, expected_control_priorities) - - -def test_join(): - catches = pd.DataFrame( - columns=pd.Index( - [ - "cfr", - "ircs", - "external_immatriculation", - "last_logbook_message_datetime_utc", - "departure_datetime_utc", - "trip_number", - "gear_onboard", - "species", - "gear", - "fao_area", - "weight", - ] - ), - data=[ - [ - "Vessel_A", - "AA", - "AAA", - datetime.datetime(2021, 2, 3, 13, 58, 21), - datetime.datetime(2021, 2, 3, 13, 56, 21), - 20210003.0, - [{"gear": "OTB", "mesh": 70.0, "dimensions": 45.0}], - "BLI", - "OTB", - "27.8.b", - 13.46, - ], - [ - "Vessel_A", - "AA", - "AAA", - datetime.datetime(2021, 2, 3, 13, 58, 21), - datetime.datetime(2021, 2, 3, 13, 56, 21), - 20210003.0, - [{"gear": "OTB", "mesh": 70.0, "dimensions": 45.0}], - "HKE", - "OTB", - "27.8.c", - 235.6, - ], - [ - "Vessel_B", - None, - "BBB", - datetime.datetime(2020, 12, 3, 15, 58, 21), - datetime.datetime(2020, 12, 3, 15, 56, 21), - 20200053.0, - [{"gear": "OTM", "mesh": 70.0, "dimensions": 45.0}], - None, - None, - None, - np.nan, - ], - ], - ) - - current_segments = pd.DataFrame( - columns=pd.Index( - [ - "cfr", - "segments", - "total_weight_onboard", - "impact_risk_factor", - ] - ), - data=[ - ["Vessel_A", np.array(["Segment 1", "Segment 2"]), 249.06, 2], - ], - ).set_index("cfr") - - control_priorities = pd.DataFrame( - columns=pd.Index( - [ - "cfr", - "segment_highest_priority", - "control_priority_level", - ] - ), - data=[ - ["Vessel_B", "Segment 2", 2], - ], - ).set_index("cfr") - - res = join.run(catches, current_segments, control_priorities) - - expected_res = pd.DataFrame( - { - "cfr": ["Vessel_A", "Vessel_B"], - "ircs": ["AA", None], - "external_immatriculation": ["AAA", "BBB"], - "last_logbook_message_datetime_utc": [ - datetime.datetime(2021, 2, 3, 13, 58, 21), - datetime.datetime(2020, 12, 3, 15, 58, 21), - ], - "departure_datetime_utc": [ - datetime.datetime(2021, 2, 3, 13, 56, 21), - datetime.datetime(2020, 12, 3, 15, 56, 21), - ], - "trip_number": [20210003.0, 20200053.0], - "gear_onboard": [ - [{"gear": "OTB", "mesh": 70.0, "dimensions": 45.0}], - [{"gear": "OTM", "mesh": 70.0, "dimensions": 45.0}], - ], - "species_onboard": [ - [ - { - "gear": "OTB", - "faoZone": "27.8.b", - "species": "BLI", - "weight": 13.46, - }, - { - "gear": "OTB", - "faoZone": "27.8.c", - "species": "HKE", - "weight": 235.6, - }, - ], - float("nan"), - ], - "segments": [["Segment 1", "Segment 2"], float("nan")], - "total_weight_onboard": [249.06, 0.0], - "impact_risk_factor": [2.0, 1.0], - "segment_highest_priority": [float("nan"), "Segment 2"], - "control_priority_level": [1.0, 2.0], - } - ) - - pd.testing.assert_frame_equal(res, expected_res) - - -def test_test_current_segments_flow(reset_test_data, current_segments): +def test_current_segments_flow(reset_test_data): flow.schedule = None state = flow.run() assert state.is_successful() @@ -477,25 +119,5 @@ def test_test_current_segments_flow(reset_test_data, current_segments): computed_current_segments = read_query( "SELECT * FROM current_segments ORDER BY cfr", db="monitorfish_remote" ) - datetime_columns = [ - "last_logbook_message_datetime_utc", - "departure_datetime_utc", - ] - pd.testing.assert_frame_equal( - current_segments.drop(columns=datetime_columns), - computed_current_segments.drop(columns=datetime_columns), - ) - - assert ( - ( - ( - current_segments[datetime_columns] - - computed_current_segments[datetime_columns] - ) - .abs() - .fillna(datetime.timedelta(seconds=0)) - < datetime.timedelta(seconds=10) - ) - .all() - .all() - ) + print(computed_current_segments) + breakpoint()