Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
VincentAntoine committed Dec 10, 2024
1 parent bd9f3f3 commit 7505aaa
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 51 deletions.
12 changes: 2 additions & 10 deletions datascience/src/pipeline/flows/current_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@

from config import default_risk_factors
from src.pipeline.generic_tasks import extract, load
from src.pipeline.helpers.segments import attribute_segments_to_catches
from src.pipeline.helpers.segments import allocate_segments_to_catches
from src.pipeline.processing import df_to_dict_series
from src.pipeline.shared_tasks.control_flow import check_flow_not_running
from src.pipeline.shared_tasks.dates import get_current_year
from src.pipeline.shared_tasks.facades import extract_facade_areas
from src.pipeline.shared_tasks.infrastructure import get_table
from src.pipeline.shared_tasks.segments import extract_segments_of_year, unnest_segments
from src.pipeline.shared_tasks.vessels import add_vessel_id


@task(checkpoint=False)
Expand Down Expand Up @@ -50,7 +49,6 @@ def extract_last_positions():
def compute_last_positions_facade(
last_positions: gpd.GeoDataFrame, facade_areas: gpd.GeoDataFrame
) -> pd.DataFrame:

last_positions_facade_1 = gpd.sjoin(last_positions, facade_areas)[["cfr", "facade"]]

unassigned_last_positions = last_positions[
Expand Down Expand Up @@ -78,8 +76,7 @@ def compute_last_positions_facade(

@task(checkpoint=False)
def compute_current_segments(catches, segments):

current_segments = attribute_segments_to_catches(
current_segments = allocate_segments_to_catches(
catches[["cfr", "gear", "fao_area", "species"]],
segments[
[
Expand Down Expand Up @@ -138,7 +135,6 @@ def compute_control_priorities(
last_positions_facade: pd.DataFrame,
control_priorities: pd.DataFrame,
) -> pd.DataFrame:

cfr_segment_facade = (
current_segments[["segments"]]
.join(last_positions_facade)
Expand Down Expand Up @@ -170,7 +166,6 @@ def join(
current_segments: pd.DataFrame,
control_priorities: pd.DataFrame,
) -> pd.DataFrame:

# Group catch data of each vessel in a list of dicts like
# [{"gear": "DRB", "species": "SCE", "faoZone": "27.7", "weight": 156.2}, ...]
catch_columns = ["gear", "fao_area", "species", "weight"]
Expand Down Expand Up @@ -231,10 +226,8 @@ def load_current_segments(vessels_segments): # pragma: no cover


with Flow("Current segments", executor=LocalDaskExecutor()) as flow:

flow_not_running = check_flow_not_running()
with case(flow_not_running, True):

# Extract
current_year = get_current_year()
catches = extract_catches()
Expand All @@ -255,7 +248,6 @@ def load_current_segments(vessels_segments): # pragma: no cover
current_segments, last_positions_facade, control_priorities
)
current_segments = join(catches, current_segments, control_priorities)
current_segments = add_vessel_id(current_segments, vessels_table)

# Load
load_current_segments(current_segments)
Expand Down
8 changes: 2 additions & 6 deletions datascience/src/pipeline/flows/recompute_controls_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from src.db_config import create_engine
from src.pipeline.entities.missions import MissionActionType
from src.pipeline.generic_tasks import extract
from src.pipeline.helpers.segments import attribute_segments_to_catches
from src.pipeline.helpers.segments import allocate_segments_to_catches
from src.pipeline.processing import df_to_dict_series, prepare_df_for_loading
from src.pipeline.shared_tasks.control_flow import check_flow_not_running
from src.pipeline.shared_tasks.segments import extract_segments_of_year, unnest_segments
Expand Down Expand Up @@ -63,10 +63,9 @@ def compute_controls_segments(
controls_catches: pd.DataFrame,
segments: pd.DataFrame,
) -> pd.DataFrame:

controls_catches = controls_catches.where(controls_catches.notnull(), None)
controls_segments = (
attribute_segments_to_catches(
allocate_segments_to_catches(
controls_catches,
segments[["segment", "segment_name", "fao_area", "gear", "species"]],
)[["id", "segment", "segment_name"]]
Expand Down Expand Up @@ -100,7 +99,6 @@ def load_controls_segments(controls_segments: pd.DataFrame):

e = create_engine("monitorfish_remote")
with e.begin() as connection:

logger.info("Creating temporary table")
connection.execute(
text(
Expand Down Expand Up @@ -142,10 +140,8 @@ def load_controls_segments(controls_segments: pd.DataFrame):


with Flow("Recompute controls segments", executor=LocalDaskExecutor()) as flow:

flow_not_running = check_flow_not_running()
with case(flow_not_running, True):

# Parameters
year = Parameter("year")
control_types = Parameter("control_types")
Expand Down
148 changes: 114 additions & 34 deletions datascience/src/pipeline/queries/monitorfish/current_catches.sql
Original file line number Diff line number Diff line change
@@ -1,33 +1,82 @@
WITH deleted_or_corrected_messages AS (
WITH acknowledged_messages AS (
SELECT referenced_report_id
FROM public.logbook_reports
WHERE operation_type IN ('DEL', 'COR')
AND operation_datetime_utc > CURRENT_TIMESTAMP - INTERVAL '6 months'
-- exclude VisioCapture (which is not real time but has several months of delay) from current_catches
AND (software IS NULL OR software NOT LIKE '%VISIOCaptures%')
FROM logbook_reports
WHERE
operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL '3 months'
AND operation_datetime_utc < CURRENT_TIMESTAMP + INTERVAL '6 hours'
AND operation_type ='RET'
AND value->>'returnStatus' = '000'
),

deleted_messages AS (
SELECT
operation_number,
referenced_report_id
FROM logbook_reports
WHERE
operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL '3 months'
AND operation_datetime_utc < CURRENT_TIMESTAMP + INTERVAL '6 hours'
AND operation_type ='DEL'
),

acknowledged_deleted_messages AS (
SELECT referenced_report_id
FROM deleted_messages
WHERE
operation_number IN (SELECT referenced_report_id FROM acknowledged_messages)
),

corrected_messages AS (
SELECT
referenced_report_id
FROM logbook_reports
WHERE
operation_datetime_utc >= CURRENT_TIMESTAMP - INTERVAL '3 months'
AND operation_datetime_utc < CURRENT_TIMESTAMP + INTERVAL '6 hours'
AND operation_type ='COR'
AND (
flag_state NOT IN ('FRA', 'GUF', 'VEN') -- Flag states for which we receive RET
OR report_id IN (SELECT referenced_report_id FROM acknowledged_messages)
)
),

ordered_deps AS (
SELECT
cfr,
ircs,
external_identification,
external_identification AS external_immatriculation,
trip_number,
(value->>'departureDatetimeUtc')::timestamptz AS departure_datetime_utc,
ROW_NUMBER() OVER(PARTITION BY cfr ORDER BY (value->>'departureDatetimeUtc')::timestamptz DESC) as rk
activity_datetime_utc AS departure_datetime_utc,
ROW_NUMBER() OVER (PARTITION BY cfr ORDER BY activity_datetime_utc DESC) as rk
FROM public.logbook_reports
WHERE log_type = 'DEP'
AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '6 months'
AND (value->>'departureDatetimeUtc')::timestamptz < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + INTERVAL '24 hours'
AND report_id NOT IN (SELECT referenced_report_id FROM deleted_or_corrected_messages)
AND (software IS NULL OR software NOT LIKE '%VISIOCaptures%')
WHERE
log_type = 'DEP'
AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '3 months'
AND activity_datetime_utc < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + INTERVAL '24 hours'
AND report_id NOT IN (SELECT referenced_report_id FROM corrected_messages)
AND NOT (
report_id IN (SELECT referenced_report_id FROM acknowledged_deleted_messages)
OR (
report_id IN (SELECT referenced_report_id FROM deleted_messages)
AND flag_state NOT IN ('FRA', 'GUF', 'VEN')
)
)
AND (
flag_state NOT IN ('FRA', 'GUF', 'VEN') -- Flag states for which we receive RET
OR report_id IN (SELECT referenced_report_id FROM acknowledged_messages)
)
-- Exclude data that is not real-time electronic logbook data
AND (
software IS NULL -- Non french vessels
OR software NOT LIKE '%VISIOCaptures%'
)
),

last_deps AS (
SELECT
cfr,
ircs,
external_identification,
external_immatriculation,
departure_datetime_utc,
trip_number
FROM ordered_deps
Expand All @@ -40,7 +89,7 @@ last_logbook_reports AS (
MAX(report_datetime_utc) AS last_logbook_message_datetime_utc
FROM public.logbook_reports
WHERE operation_type IN ('DAT', 'COR')
AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '6 months'
AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '3 months'
AND report_datetime_utc < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + INTERVAL '24 hours'
AND (software IS NULL OR software NOT LIKE '%VISIOCaptures%')
GROUP BY cfr
Expand All @@ -60,11 +109,22 @@ catches AS (
JOIN last_deps d
ON r.cfr = d.cfr
AND r.trip_number = d.trip_number
WHERE log_type = 'FAR'
AND operation_type IN ('DAT', 'COR')
AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '6 months'
AND operation_number NOT IN (SELECT referenced_report_id FROM deleted_or_corrected_messages)
AND (software IS NULL OR software NOT LIKE '%VISIOCaptures%')
WHERE
log_type = 'FAR'
AND operation_datetime_utc > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '3 months'
AND report_id NOT IN (SELECT referenced_report_id FROM corrected_messages)
AND NOT (
report_id IN (SELECT referenced_report_id FROM acknowledged_deleted_messages)
OR (
report_id IN (SELECT referenced_report_id FROM deleted_messages)
AND r.flag_state NOT IN ('FRA', 'GUF', 'VEN')
)
)
AND (
flag_state NOT IN ('FRA', 'GUF', 'VEN') -- Flag states for which we receive RET
OR report_id IN (SELECT referenced_report_id FROM acknowledged_messages)
)
AND (software IS NULL OR software NOT LIKE '%VISIOCaptures%')
),

summed_catches AS (
Expand All @@ -73,9 +133,10 @@ summed_catches AS (
species,
gear,
fao_area,
mesh,
SUM(weight) as weight
FROM catches
GROUP BY cfr, species, gear, fao_area
GROUP BY cfr, species, gear, fao_area, mesh
),

gear_onboard AS (
Expand All @@ -93,21 +154,40 @@ gear_onboard AS (
)

SELECT
COALESCE(last_logbook_reports.cfr, last_deps.cfr) AS cfr,
ROW_NUMBER() OVER (ORDER BY COALESCE(l.cfr, last_deps.cfr), species) AS catch_id,
COALESCE(l.cfr, last_deps.cfr) AS cfr,
COALESCE(
v_cfr.id,
v_ircs.id,
v_ext.id
) AS vessel_id,
EXTRACT(YEAR from CURRENT_TIMESTAMP AT TIME ZONE 'UTC') AS year,
fao_area,
gear,
mesh,
species,
s.species_name,
weight,
COALESCE(
v_cfr.vessel_type,
v_ircs.vessel_type,
v_ext.vessel_type
) AS vessel_type,
last_deps.ircs,
last_deps.external_identification AS external_immatriculation,
last_logbook_reports.last_logbook_message_datetime_utc,
last_deps.external_immatriculation AS external_immatriculation,
l.last_logbook_message_datetime_utc,
departure_datetime_utc,
trip_number,
go.gear_onboard,
species,
gear,
fao_area,
weight
FROM last_logbook_reports
go.gear_onboard
FROM last_logbook_reports l
FULL OUTER JOIN last_deps
ON last_logbook_reports.cfr = last_deps.cfr
ON l.cfr = last_deps.cfr
LEFT JOIN summed_catches
ON last_logbook_reports.cfr = summed_catches.cfr
ON l.cfr = summed_catches.cfr
LEFT JOIN gear_onboard go
ON last_logbook_reports.cfr = go.cfr
ON l.cfr = go.cfr
LEFT JOIN species s
ON s.species_code = summed_catches.species
LEFT JOIN vessels v_cfr ON v_cfr.cfr = COALESCE(l.cfr, last_deps.cfr)
LEFT JOIN vessels v_ircs ON v_ircs.ircs = last_deps.ircs AND v_ircs.cfr IS NULL
LEFT JOIN vessels v_ext ON v_ext.external_immatriculation = last_deps.external_immatriculation AND v_ext.cfr IS NULL
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def current_segments() -> pd.DataFrame:

def test_extract_catches(reset_test_data):
catches = extract_catches.run()
assert len(catches) == 6
assert len(catches) == 5
assert set(catches.cfr) == {
"ABC000542519",
"ABC000306959",
Expand Down

0 comments on commit 7505aaa

Please sign in to comment.