Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
VincentAntoine committed Dec 11, 2024
1 parent 5eaf3e9 commit d8d54bd
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 570 deletions.
179 changes: 67 additions & 112 deletions datascience/src/pipeline/flows/current_segments.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
from datetime import datetime
from pathlib import Path

import geopandas as gpd
import pandas as pd
import prefect
from prefect import Flow, case, task
from prefect import Flow, Parameter, case, task
from prefect.executors import LocalDaskExecutor

from config import default_risk_factors
from src.pipeline.generic_tasks import extract, load
from src.pipeline.helpers.segments import attribute_segments_to_catches
from src.pipeline.helpers.segments import allocate_segments_to_catches
from src.pipeline.processing import df_to_dict_series
from src.pipeline.shared_tasks.control_flow import check_flow_not_running
from src.pipeline.shared_tasks.dates import get_current_year
from src.pipeline.shared_tasks.facades import extract_facade_areas
from src.pipeline.shared_tasks.infrastructure import get_table
from src.pipeline.shared_tasks.segments import extract_segments_of_year, unnest_segments
from src.pipeline.shared_tasks.vessels import add_vessel_id
from src.pipeline.shared_tasks.segments import extract_segments_of_year


@task(checkpoint=False)
def extract_catches():
def extract_current_catches(number_of_days: int) -> pd.DataFrame:
return extract(
db_name="monitorfish_remote", query_filepath="monitorfish/current_catches.sql"
db_name="monitorfish_remote",
query_filepath="monitorfish/current_catches.sql",
params={"number_of_days": number_of_days},
)


@task(checkpoint=False)
def extract_control_priorities():
def extract_control_priorities() -> pd.DataFrame:
return extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/control_priorities.sql",
Expand All @@ -40,60 +38,68 @@ def extract_last_positions():
return extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/last_positions.sql",
backend="geopandas",
geom_col="geometry",
crs=4326,
)


@task(checkpoint=False)
def compute_last_positions_facade(
last_positions: gpd.GeoDataFrame, facade_areas: gpd.GeoDataFrame
) -> pd.DataFrame:

last_positions_facade_1 = gpd.sjoin(last_positions, facade_areas)[["cfr", "facade"]]

unassigned_last_positions = last_positions[
~last_positions.cfr.isin(last_positions_facade_1.cfr)
].copy(deep=True)

# Vessels that are not directly in a facade geometry are oftentimes vessels in a
# port, which facade geometries genereally do not encompass. In order to match
# these vessels to a nearby facade, we drawed a ~10km circle around them
# and attempt a spatial join on this buffered geometry.
unassigned_last_positions["geometry"] = unassigned_last_positions.buffer(0.1)

last_positions_facade_2 = gpd.sjoin(
unassigned_last_positions, facade_areas, how="left"
)[["cfr", "facade"]]

last_positions_facade_2 = last_positions_facade_2.drop_duplicates(subset=["cfr"])

last_positions_facade = pd.concat(
[last_positions_facade_1, last_positions_facade_2]
).set_index("cfr")

return last_positions_facade


@task(checkpoint=False)
def compute_current_segments(catches, segments):

current_segments = attribute_segments_to_catches(
catches[["cfr", "gear", "fao_area", "species"]],
segments[
def compute_current_segments(current_catches, segments, control_priorities):
current_segments = allocate_segments_to_catches(
current_catches[
[
"segment",
"gear",
"catch_id",
"cfr",
"vessel_id",
"year",
"fao_area",
"gear",
"mesh",
"species",
"scip_species_type",
"weight",
"vessel_type",
"ircs",
"external_immatriculation",
"last_logbook_message_datetime_utc",
"departure_datetime_utc",
"trip_number",
"gear_onboard",
]
],
segments[
[
"year",
"segment",
"segment_name",
"gears",
"fao_areas",
"min_mesh",
"max_mesh",
"target_species",
"min_share_of_target_species",
"main_scip_species_type",
"priority",
"vessel_types",
"impact_risk_factor",
]
],
catch_id_column="catch_id",
batch_id_column="cfr",
)

# Aggregate by vessel
control_priorities = (
pd.merge(current_segments, control_priorities, on=["segment", "facade"])
.sort_values("control_priority_level", ascending=False)
.groupby("cfr")[["cfr", "segment", "control_priority_level"]]
.head(1)
.set_index("cfr")
.rename(
columns={
"segment": "segment_highest_priority",
}
)
)

# Aggregate by vessel
current_segments_impact = (
current_segments.sort_values("impact_risk_factor", ascending=False)
.groupby("cfr")[["cfr", "segment", "impact_risk_factor"]]
Expand All @@ -110,7 +116,7 @@ def compute_current_segments(catches, segments):
current_segments.groupby("cfr")["segment"].unique().rename("segments")
)

total_catch_weight = catches.groupby("cfr")["weight"].sum()
total_catch_weight = current_catches.groupby("cfr")["weight"].sum()
total_catch_weight = total_catch_weight.rename("total_weight_onboard")

current_segments = pd.merge(
Expand All @@ -129,57 +135,15 @@ def compute_current_segments(catches, segments):
how="outer",
)

return current_segments


@task(checkpoint=False)
def compute_control_priorities(
current_segments: pd.DataFrame,
last_positions_facade: pd.DataFrame,
control_priorities: pd.DataFrame,
) -> pd.DataFrame:

cfr_segment_facade = (
current_segments[["segments"]]
.join(last_positions_facade)
.explode("segments")
.rename(columns={"segments": "segment"})
.reset_index()
.dropna(subset=["segment", "facade"])
)

control_priorities = (
pd.merge(cfr_segment_facade, control_priorities, on=["segment", "facade"])
.sort_values("control_priority_level", ascending=False)
.groupby("cfr")[["cfr", "segment", "control_priority_level"]]
.head(1)
.set_index("cfr")
.rename(
columns={
"segment": "segment_highest_priority",
}
)
)

return control_priorities


@task(checkpoint=False)
def join(
catches: pd.DataFrame,
current_segments: pd.DataFrame,
control_priorities: pd.DataFrame,
) -> pd.DataFrame:

# Group catch data of each vessel in a list of dicts like
# [{"gear": "DRB", "species": "SCE", "faoZone": "27.7", "weight": 156.2}, ...]
catch_columns = ["gear", "fao_area", "species", "weight"]
species_onboard = catches[catch_columns]
species_onboard = current_catches[catch_columns]
species_onboard = species_onboard.rename(columns={"fao_area": "faoZone"})
species_onboard = df_to_dict_series(
species_onboard.dropna(subset=["species"]), result_colname="species_onboard"
)
species_onboard = catches[["cfr"]].join(species_onboard)
species_onboard = current_catches[["cfr"]].join(species_onboard)
species_onboard = species_onboard.dropna(subset=["species_onboard"])
species_onboard = species_onboard.groupby("cfr")["species_onboard"].apply(list)

Expand All @@ -195,7 +159,9 @@ def join(
"gear_onboard",
]

last_logbook_report = catches[last_logbook_report_columns].groupby("cfr").head(1)
last_logbook_report = (
current_catches[last_logbook_report_columns].groupby("cfr").head(1)
)
last_logbook_report = last_logbook_report.set_index("cfr")

# Join departure, catches and segments information into a single table with 1 line
Expand Down Expand Up @@ -231,31 +197,20 @@ def load_current_segments(vessels_segments): # pragma: no cover


with Flow("Current segments", executor=LocalDaskExecutor()) as flow:

flow_not_running = check_flow_not_running()
with case(flow_not_running, True):

# Extract
number_of_days = Parameter("number_of_days", 90)
current_year = get_current_year()
catches = extract_catches()
current_catches = extract_current_catches(number_of_days=number_of_days)
last_positions = extract_last_positions()
segments = extract_segments_of_year(current_year)
facade_areas = extract_facade_areas()
control_priorities = extract_control_priorities()

vessels_table = get_table("vessels")

# Transform
last_positions_facade = compute_last_positions_facade(
last_positions, facade_areas
)
segments = unnest_segments(segments)
current_segments = compute_current_segments(catches, segments)
control_priorities = compute_control_priorities(
current_segments, last_positions_facade, control_priorities
current_segments = compute_current_segments(
current_catches, segments, control_priorities
)
current_segments = join(catches, current_segments, control_priorities)
current_segments = add_vessel_id(current_segments, vessels_table)

# Load
load_current_segments(current_segments)
Expand Down
8 changes: 2 additions & 6 deletions datascience/src/pipeline/flows/recompute_controls_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from src.db_config import create_engine
from src.pipeline.entities.missions import MissionActionType
from src.pipeline.generic_tasks import extract
from src.pipeline.helpers.segments import attribute_segments_to_catches
from src.pipeline.helpers.segments import allocate_segments_to_catches
from src.pipeline.processing import df_to_dict_series, prepare_df_for_loading
from src.pipeline.shared_tasks.control_flow import check_flow_not_running
from src.pipeline.shared_tasks.segments import extract_segments_of_year, unnest_segments
Expand Down Expand Up @@ -63,10 +63,9 @@ def compute_controls_segments(
controls_catches: pd.DataFrame,
segments: pd.DataFrame,
) -> pd.DataFrame:

controls_catches = controls_catches.where(controls_catches.notnull(), None)
controls_segments = (
attribute_segments_to_catches(
allocate_segments_to_catches(
controls_catches,
segments[["segment", "segment_name", "fao_area", "gear", "species"]],
)[["id", "segment", "segment_name"]]
Expand Down Expand Up @@ -100,7 +99,6 @@ def load_controls_segments(controls_segments: pd.DataFrame):

e = create_engine("monitorfish_remote")
with e.begin() as connection:

logger.info("Creating temporary table")
connection.execute(
text(
Expand Down Expand Up @@ -142,10 +140,8 @@ def load_controls_segments(controls_segments: pd.DataFrame):


with Flow("Recompute controls segments", executor=LocalDaskExecutor()) as flow:

flow_not_running = check_flow_not_running()
with case(flow_not_running, True):

# Parameters
year = Parameter("year")
control_types = Parameter("control_types")
Expand Down
16 changes: 8 additions & 8 deletions datascience/src/pipeline/queries/monitorfish/current_catches.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ WITH acknowledged_messages AS (
SELECT referenced_report_id
FROM logbook_reports
WHERE
operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL '3 months'
operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL ':number_of_days days'
AND operation_datetime_utc < CURRENT_TIMESTAMP + INTERVAL '6 hours'
AND operation_type ='RET'
AND value->>'returnStatus' = '000'
Expand All @@ -14,7 +14,7 @@ deleted_messages AS (
referenced_report_id
FROM logbook_reports
WHERE
operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL '3 months'
operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL ':number_of_days days'
AND operation_datetime_utc < CURRENT_TIMESTAMP + INTERVAL '6 hours'
AND operation_type ='DEL'
),
Expand All @@ -31,7 +31,7 @@ corrected_messages AS (
referenced_report_id
FROM logbook_reports
WHERE
operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL '3 months'
operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL ':number_of_days days'
AND operation_datetime_utc < CURRENT_TIMESTAMP + INTERVAL '6 hours'
AND operation_type ='COR'
AND (
Expand All @@ -51,7 +51,7 @@ ordered_deps AS (
FROM public.logbook_reports
WHERE
log_type = 'DEP'
AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '3 months'
AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL ':number_of_days days'
AND activity_datetime_utc < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + INTERVAL '24 hours'
AND report_id NOT IN (SELECT referenced_report_id FROM corrected_messages)
AND NOT (
Expand Down Expand Up @@ -89,7 +89,7 @@ last_logbook_reports AS (
MAX(report_datetime_utc) AS last_logbook_message_datetime_utc
FROM public.logbook_reports
WHERE operation_type IN ('DAT', 'COR')
AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '3 months'
AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL ':number_of_days days'
AND report_datetime_utc < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + INTERVAL '24 hours'
AND (software IS NULL OR software NOT LIKE '%VISIOCaptures%')
GROUP BY cfr
Expand All @@ -111,7 +111,7 @@ catches AS (
AND r.trip_number = d.trip_number
WHERE
log_type = 'FAR'
AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '3 months'
AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL ':number_of_days days'
AND report_id NOT IN (SELECT referenced_report_id FROM corrected_messages)
AND NOT (
report_id IN (SELECT referenced_report_id FROM acknowledged_deleted_messages)
Expand Down Expand Up @@ -166,15 +166,15 @@ SELECT
gear,
mesh,
species,
s.species_name,
s.scip_species_type,
weight,
COALESCE(
v_cfr.vessel_type,
v_ircs.vessel_type,
v_ext.vessel_type
) AS vessel_type,
last_deps.ircs,
last_deps.external_immatriculation AS external_immatriculation,
last_deps.external_immatriculation,
l.last_logbook_message_datetime_utc,
departure_datetime_utc,
trip_number,
Expand Down
13 changes: 7 additions & 6 deletions datascience/src/pipeline/queries/monitorfish/last_positions.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
SELECT
SELECT DISTINCT ON (cfr)
cfr,
latitude,
longitude,
ST_SetSRId(ST_MakePoint(longitude, latitude), 4326) AS geometry
FROM last_positions
WHERE cfr IS NOT NULL
f.facade
FROM last_positions p
LEFT JOIN facade_areas_subdivided f
ON ST_Intersects(ST_SetSRId(ST_MakePoint(p.longitude, p.latitude), 4326), f.geometry)
WHERE cfr IS NOT NULL
ORDER BY cfr, last_position_datetime_utc DESC
Loading

0 comments on commit d8d54bd

Please sign in to comment.