From 21e5a82b890745327dd03b42c66c43d5191fecf5 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Tue, 24 Dec 2024 12:09:18 +0100 Subject: [PATCH 01/12] FrozenFile --- cads_adaptors/adaptors/mars.py | 48 ++++++++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index 6d516c63..d192c091 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -1,6 +1,12 @@ +import dataclasses +import json import os +import pathlib +import tempfile from typing import Any, BinaryIO +import cacholote + from cads_adaptors.adaptors import Context, Request, cds from cads_adaptors.exceptions import MarsNoDataError, MarsRuntimeError, MarsSystemError from cads_adaptors.tools import adaptor_tools @@ -114,12 +120,50 @@ def execute_mars( return target +@dataclasses.dataclass +class CachedExecuteMars: + context: Context + config: dict[str, Any] + mapping: dict[str, Any] + cache_tmp_path: pathlib.Path + + @property + def use_cache(self): + fs, _ = cacholote.utils.get_cache_files_fs_dirname() + return "file" in fs.protocol + + def sort_requests(self, requests: list[Request]) -> list[Request]: + return sorted(requests, key=lambda request: json.dumps(request, sort_keys=True)) + + @cacholote.cacheable + def cached_retrieve(self, requests: list[Request]) -> BinaryIO: + _, target = tempfile.mkstemp(suffix=".grib", dir=self.cache_tmp_path) + result = execute_mars( + self.sort_requests(requests), + self.context, + self.config, + self.mapping, + target, + ) + return open(result, "rb") + + def retrieve(self, requests: list[Request]) -> BinaryIO: + with cacholote.config.set(use_cache=self.use_cache, return_cache_entry=False): + fp = self.cached_retrieve(requests) + return cacholote.extra_encoders.FrozenFile(fp.name) + + class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor): resources = {"MARS_CLIENT": 1} def retrieve(self, request: Request) -> BinaryIO: - result = execute_mars(request, context=self.context) - return open(result, "rb") + cached_execute_mars = CachedExecuteMars( + context=self.context, + config=self.config, + mapping=self.mapping, + cache_tmp_path=self.cache_tmp_path, + ) + return cached_execute_mars.retrieve([request]) class MarsCdsAdaptor(cds.AbstractCdsAdaptor): From da19e722e6d2c586a8fb0ec1ca10d15293757fa1 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Tue, 24 Dec 2024 12:17:13 +0100 Subject: [PATCH 02/12] cleanup --- cads_adaptors/adaptors/mars.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index 07cf9fe8..caaee9eb 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -132,7 +132,7 @@ class CachedExecuteMars: @property def use_cache(self): fs, _ = cacholote.utils.get_cache_files_fs_dirname() - return "file" in fs.protocol + return "local" in ensure_list(fs.protocol) def sort_requests(self, requests: list[Request]) -> list[Request]: return sorted(requests, key=lambda request: json.dumps(request, sort_keys=True)) From 50bd2ed443bb0132563295f7d4b553fa1afd01e2 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Tue, 24 Dec 2024 12:22:35 +0100 Subject: [PATCH 03/12] fix use cache --- cads_adaptors/adaptors/mars.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index caaee9eb..be79b07f 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -150,8 +150,12 @@ def cached_retrieve(self, requests: list[Request]) -> BinaryIO: def retrieve(self, requests: list[Request]) -> BinaryIO: with cacholote.config.set(use_cache=self.use_cache, return_cache_entry=False): - fp = self.cached_retrieve(requests) - return cacholote.extra_encoders.FrozenFile(fp.name) + name = self.cached_retrieve(requests).name + return ( + cacholote.extra_encoders.FrozenFile(name, "rb") + if self.use_cache + else open(name, "rb") + ) class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor): From 26ddc047ae0a4d64ca271894761aa4819e5e9f02 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Tue, 24 Dec 2024 13:40:51 +0100 Subject: [PATCH 04/12] add typing --- cads_adaptors/adaptors/mars.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index be79b07f..d7618194 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -130,7 +130,7 @@ class CachedExecuteMars: cache_tmp_path: pathlib.Path @property - def use_cache(self): + def use_cache(self) -> bool: fs, _ = cacholote.utils.get_cache_files_fs_dirname() return "local" in ensure_list(fs.protocol) From e0dc67da96138fa2772389a2d8c6b48ba5fd96e4 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Wed, 8 Jan 2025 17:42:02 +0100 Subject: [PATCH 05/12] sort requests --- cads_adaptors/adaptors/mars.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index d7618194..f2d97ca1 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -140,7 +140,7 @@ def sort_requests(self, requests: list[Request]) -> list[Request]: @cacholote.cacheable def cached_retrieve(self, requests: list[Request]) -> BinaryIO: result = execute_mars( - self.sort_requests(requests), + requests, self.context, self.config, self.mapping, @@ -149,6 +149,7 @@ def cached_retrieve(self, requests: list[Request]) -> BinaryIO: return open(result, "rb") def retrieve(self, requests: list[Request]) -> BinaryIO: + requests = self.sort_requests(requests) with cacholote.config.set(use_cache=self.use_cache, return_cache_entry=False): name = self.cached_retrieve(requests).name return ( From ccd2f338e53379044a90e17171b9aabf8c9ba75d Mon Sep 17 00:00:00 2001 From: malmans2 Date: Thu, 9 Jan 2025 09:18:41 +0100 Subject: [PATCH 06/12] better sorting --- cads_adaptors/adaptors/mars.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index f2d97ca1..3d4308a2 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -135,7 +135,8 @@ def use_cache(self) -> bool: return "local" in ensure_list(fs.protocol) def sort_requests(self, requests: list[Request]) -> list[Request]: - return sorted(requests, key=lambda request: json.dumps(request, sort_keys=True)) + requests = [dict(sorted(request.items())) for request in requests] + return sorted(requests, key=lambda request: json.dumps(request)) @cacholote.cacheable def cached_retrieve(self, requests: list[Request]) -> BinaryIO: From 888a7a7b2a3c53a42128881a04d606a7c7c2ade5 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Thu, 9 Jan 2025 15:12:04 +0100 Subject: [PATCH 07/12] refactor --- cads_adaptors/adaptors/mars.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index 3d4308a2..cc9baea6 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -138,8 +138,7 @@ def sort_requests(self, requests: list[Request]) -> list[Request]: requests = [dict(sorted(request.items())) for request in requests] return sorted(requests, key=lambda request: json.dumps(request)) - @cacholote.cacheable - def cached_retrieve(self, requests: list[Request]) -> BinaryIO: + def _execute_mars(self, requests: list[Request]) -> BinaryIO: result = execute_mars( requests, self.context, @@ -149,14 +148,17 @@ def cached_retrieve(self, requests: list[Request]) -> BinaryIO: ) return open(result, "rb") - def retrieve(self, requests: list[Request]) -> BinaryIO: + def execute_mars(self, requests: list[Request]) -> str: requests = self.sort_requests(requests) with cacholote.config.set(use_cache=self.use_cache, return_cache_entry=False): - name = self.cached_retrieve(requests).name + return cacholote.cacheable(self._execute_mars)(requests).name + + def retrieve(self, requests: list[Request]) -> BinaryIO: + result = self.execute_mars(requests) return ( - cacholote.extra_encoders.FrozenFile(name, "rb") + cacholote.extra_encoders.FrozenFile(result, "rb") if self.use_cache - else open(name, "rb") + else open(result, "rb") ) From c3356ebe5dc138e05074fb80d76754b7913b8cab Mon Sep 17 00:00:00 2001 From: malmans2 Date: Fri, 10 Jan 2025 08:51:14 +0100 Subject: [PATCH 08/12] rename --- cads_adaptors/adaptors/mars.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index cc9baea6..1df9edb6 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -156,7 +156,7 @@ def execute_mars(self, requests: list[Request]) -> str: def retrieve(self, requests: list[Request]) -> BinaryIO: result = self.execute_mars(requests) return ( - cacholote.extra_encoders.FrozenFile(result, "rb") + cacholote.extra_encoders.InPlaceFile(result, "rb") if self.use_cache else open(result, "rb") ) From bd0ba3338ce035a766fb3122c9847d8a9ff0bd6a Mon Sep 17 00:00:00 2001 From: malmans2 Date: Wed, 15 Jan 2025 11:59:51 +0100 Subject: [PATCH 09/12] add unit test --- cads_adaptors/adaptors/mars.py | 19 +++++++++++------ tests/test_15_mars.py | 37 +++++++++++++++++++++++++++++++++- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index 1df9edb6..f130ba3e 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -2,10 +2,9 @@ import json import os import pathlib +from types import ModuleType from typing import Any, BinaryIO -import cacholote - from cads_adaptors.adaptors import Context, Request, cds from cads_adaptors.exceptions import MarsNoDataError, MarsRuntimeError, MarsSystemError from cads_adaptors.tools import adaptor_tools @@ -129,9 +128,15 @@ class CachedExecuteMars: mapping: dict[str, Any] cache_tmp_path: pathlib.Path + @property + def cacholote(self) -> ModuleType: + import cacholote + + return cacholote + @property def use_cache(self) -> bool: - fs, _ = cacholote.utils.get_cache_files_fs_dirname() + fs, _ = self.cacholote.utils.get_cache_files_fs_dirname() return "local" in ensure_list(fs.protocol) def sort_requests(self, requests: list[Request]) -> list[Request]: @@ -150,13 +155,15 @@ def _execute_mars(self, requests: list[Request]) -> BinaryIO: def execute_mars(self, requests: list[Request]) -> str: requests = self.sort_requests(requests) - with cacholote.config.set(use_cache=self.use_cache, return_cache_entry=False): - return cacholote.cacheable(self._execute_mars)(requests).name + with self.cacholote.config.set( + use_cache=self.use_cache, return_cache_entry=False + ): + return self.cacholote.cacheable(self._execute_mars)(requests).name def retrieve(self, requests: list[Request]) -> BinaryIO: result = self.execute_mars(requests) return ( - cacholote.extra_encoders.InPlaceFile(result, "rb") + self.cacholote.extra_encoders.InPlaceFile(result, "rb") if self.use_cache else open(result, "rb") ) diff --git a/tests/test_15_mars.py b/tests/test_15_mars.py index 6587a48a..76092ac8 100644 --- a/tests/test_15_mars.py +++ b/tests/test_15_mars.py @@ -1,8 +1,13 @@ +import json import os +import pathlib +from typing import Any +import cacholote +import pytest import requests -from cads_adaptors.adaptors import mars +from cads_adaptors.adaptors import Context, mars TEST_GRIB_FILE = ( "https://get.ecmwf.int/repository/test-data/cfgrib/era5-levels-members.grib" @@ -65,3 +70,33 @@ def test_convert_format(tmp_path, monkeypatch): _, out_ext = os.path.splitext(converted_files[0]) assert out_ext == ".nc" assert "/test_subdir/" in converted_files[0] + + +def test_cached_execute_mars( + monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path +) -> None: + def mock_execute_mars( + request: dict[str, Any] | list[dict[str, Any]], + context: Context = Context(), + config: dict[str, Any] = dict(), + mapping: dict[str, Any] = dict(), + target_fname: str = "data.grib", + target_dir: str | pathlib.Path = "", + ) -> str: + target_path = pathlib.Path(target_dir) / target_fname + target_path.write_text(json.dumps(request)) + return str(target_path) + + monkeypatch.setattr(mars, "execute_mars", mock_execute_mars) + cached_execute_mars = mars.CachedExecuteMars(Context(), {}, {}, tmp_path) + + requests = [{"1": 1, "2": 2}, {"3": 3}, {"4": 4}] + cached_file = cached_execute_mars.execute_mars(requests) + assert cached_file.startswith(str(tmp_path / "cache_files")) + + reversed_requests = [{"4": 4}, {"3": 3}, {"2": 2, "1": 1}] + assert cached_execute_mars.execute_mars(reversed_requests) == cached_file + + result = cached_execute_mars.retrieve(requests) + assert isinstance(result, cacholote.extra_encoders.InPlaceFile) + assert result.name == cached_execute_mars.execute_mars(requests) From 3074f100385a65de8b4a740b50ae287fcbcaf4c0 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Wed, 15 Jan 2025 12:14:17 +0100 Subject: [PATCH 10/12] add in_place_open --- cads_adaptors/adaptors/mars.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index f130ba3e..e221274b 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -139,6 +139,9 @@ def use_cache(self) -> bool: fs, _ = self.cacholote.utils.get_cache_files_fs_dirname() return "local" in ensure_list(fs.protocol) + def in_place_open(self, filename: str) -> BinaryIO: + return self.cacholote.extra_encoders.InPlaceFile(filename, "rb") + def sort_requests(self, requests: list[Request]) -> list[Request]: requests = [dict(sorted(request.items())) for request in requests] return sorted(requests, key=lambda request: json.dumps(request)) @@ -162,11 +165,7 @@ def execute_mars(self, requests: list[Request]) -> str: def retrieve(self, requests: list[Request]) -> BinaryIO: result = self.execute_mars(requests) - return ( - self.cacholote.extra_encoders.InPlaceFile(result, "rb") - if self.use_cache - else open(result, "rb") - ) + return self.in_place_open(result) if self.use_cache else open(result, "rb") class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor): From 4e32752e7afae1e81d838dfcfd3b544e760e4b01 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Wed, 15 Jan 2025 12:21:40 +0100 Subject: [PATCH 11/12] add test --- tests/test_15_mars.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_15_mars.py b/tests/test_15_mars.py index 76092ac8..00317464 100644 --- a/tests/test_15_mars.py +++ b/tests/test_15_mars.py @@ -94,6 +94,8 @@ def mock_execute_mars( cached_file = cached_execute_mars.execute_mars(requests) assert cached_file.startswith(str(tmp_path / "cache_files")) + assert cached_execute_mars.execute_mars([{"foo": "bar"}]) != cached_file + reversed_requests = [{"4": 4}, {"3": 3}, {"2": 2, "1": 1}] assert cached_execute_mars.execute_mars(reversed_requests) == cached_file From 9c0d9f61bee39d61794457c7cccbe3101425f951 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Wed, 15 Jan 2025 12:26:36 +0100 Subject: [PATCH 12/12] cleanup --- tests/test_15_mars.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_15_mars.py b/tests/test_15_mars.py index 00317464..e05c8f72 100644 --- a/tests/test_15_mars.py +++ b/tests/test_15_mars.py @@ -101,4 +101,4 @@ def mock_execute_mars( result = cached_execute_mars.retrieve(requests) assert isinstance(result, cacholote.extra_encoders.InPlaceFile) - assert result.name == cached_execute_mars.execute_mars(requests) + assert result.name == cached_file