From b045446fbfc5b8bb3b29dc0451fe73ea565d393c Mon Sep 17 00:00:00 2001 From: Vincent Date: Fri, 20 Dec 2024 14:47:31 +0100 Subject: [PATCH] Invalidate ports caches in ports flow --- datascience/config.py | 1 + datascience/src/pipeline/flows/ports.py | 9 ++++++++- .../test_pipeline/test_flows/test_ports.py | 18 +++++++++++++++++- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/datascience/config.py b/datascience/config.py index add704158c..259370fd15 100644 --- a/datascience/config.py +++ b/datascience/config.py @@ -187,6 +187,7 @@ REPORTING_ARCHIVING_ENDPOINT_TEMPLATE = ( API_ENDPOINT + "reportings/{reporting_id}/archive" ) +PORTS_CACHE_INVALIDATION_ENDPOINT = API_ENDPOINT + "ports/invalidate" # Backend api key BACKEND_API_KEY = os.environ.get("MONITORFISH_BACKEND_API_KEY") diff --git a/datascience/src/pipeline/flows/ports.py b/datascience/src/pipeline/flows/ports.py index 058a4b398a..bc1962048e 100644 --- a/datascience/src/pipeline/flows/ports.py +++ b/datascience/src/pipeline/flows/ports.py @@ -14,6 +14,7 @@ from config import ( IS_INTEGRATION, + PORTS_CACHE_INVALIDATION_ENDPOINT, PORTS_CSV_RESOURCE_ID, PORTS_CSV_RESOURCE_TITLE, PORTS_DATASET_ID, @@ -863,6 +864,11 @@ def transform_ports_open_data(ports: pd.DataFrame) -> pd.DataFrame: return ports_open_data +@task(checkpoint=False) +def invalidate_cache(): + requests.put(PORTS_CACHE_INVALIDATION_ENDPOINT) + + @task(checkpoint=False) def load_ports(ports): load( @@ -900,7 +906,8 @@ def load_ports(ports): ports_open_data = transform_ports_open_data(ports) # Load - load_ports(ports) + loaded_ports = load_ports(ports) + invalidate_cache(upstream_tasks=[loaded_ports]) ports_open_data_csv_file = get_csv_file_object(ports_open_data) update_resource( diff --git a/datascience/tests/test_pipeline/test_flows/test_ports.py b/datascience/tests/test_pipeline/test_flows/test_ports.py index 9f9e137353..7959bb936d 100644 --- a/datascience/tests/test_pipeline/test_flows/test_ports.py +++ b/datascience/tests/test_pipeline/test_flows/test_ports.py @@ -1,10 +1,11 @@ from io import BytesIO +from unittest.mock import patch import pandas as pd import pytest from prefect import task -from src.pipeline.flows.ports import flow +from src.pipeline.flows.ports import flow, invalidate_cache from src.read_query import read_query from tests.mocks import mock_check_flow_not_running, mock_update_resource @@ -57,9 +58,18 @@ def mock_extract_local_ports() -> pd.DataFrame: return local_ports_data +@task(checkpoint=False) +def mock_invalidate_cache() -> pd.DataFrame: + with patch("src.pipeline.flows.ports.requests") as mock_requests: + invalidate_cache.run() + + return mock_requests + + flow.replace(flow.get_tasks("check_flow_not_running")[0], mock_check_flow_not_running) flow.replace(flow.get_tasks("extract_local_ports")[0], mock_extract_local_ports) flow.replace(flow.get_tasks("update_resource")[0], mock_update_resource) +flow.replace(flow.get_tasks("invalidate_cache")[0], mock_invalidate_cache) def test_flow(reset_test_data, expected_ports_open_data, expected_loaded_ports): @@ -86,3 +96,9 @@ def test_flow(reset_test_data, expected_ports_open_data, expected_loaded_ports): expected_ports_open_data.convert_dtypes(), check_like=True, ) + mock_invalidate_cache_result = state.result[ + flow.get_tasks("mock_invalidate_cache")[0] + ].result + mock_invalidate_cache_result.put.assert_called_once_with( + "https://monitor.fish/api/v1/ports/invalidate" + )