Skip to content

Commit

Permalink
Split shared cache in backend and frontend (#443)
Browse files Browse the repository at this point in the history
* Split shared cache in backend and frontend

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jan-janssen and pre-commit-ci[bot] authored Oct 27, 2024
1 parent a74c8f3 commit 0751a0d
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 65 deletions.
2 changes: 1 addition & 1 deletion executorlib/backend/cache_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import cloudpickle

from executorlib.shared.cache import backend_load_file, backend_write_file
from executorlib.cache.backend import backend_load_file, backend_write_file


def main() -> None:
Expand Down
4 changes: 2 additions & 2 deletions executorlib/backend/cache_serial.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sys

from executorlib.shared.cache import execute_task_in_file
from executorlib.cache.backend import backend_execute_task_in_file

if __name__ == "__main__":
execute_task_in_file(file_name=sys.argv[1])
backend_execute_task_in_file(file_name=sys.argv[1])
64 changes: 64 additions & 0 deletions executorlib/cache/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
from typing import Any

from executorlib.shared.cache import FutureItem
from executorlib.shared.hdf import dump, load


def backend_load_file(file_name: str) -> dict:
"""
Load the data from an HDF5 file and convert FutureItem objects to their results.
Args:
file_name (str): The name of the HDF5 file.
Returns:
dict: The loaded data from the file.
"""
apply_dict = load(file_name=file_name)
apply_dict["args"] = [
arg if not isinstance(arg, FutureItem) else arg.result()
for arg in apply_dict["args"]
]
apply_dict["kwargs"] = {
key: arg if not isinstance(arg, FutureItem) else arg.result()
for key, arg in apply_dict["kwargs"].items()
}
return apply_dict


def backend_write_file(file_name: str, output: Any) -> None:
"""
Write the output to an HDF5 file.
Args:
file_name (str): The name of the HDF5 file.
output (Any): The output to be written.
Returns:
None
"""
file_name_out = os.path.splitext(file_name)[0]
os.rename(file_name, file_name_out + ".h5ready")
dump(file_name=file_name_out + ".h5ready", data_dict={"output": output})
os.rename(file_name_out + ".h5ready", file_name_out + ".h5out")


def backend_execute_task_in_file(file_name: str) -> None:
"""
Execute the task stored in a given HDF5 file.
Args:
file_name (str): The file name of the HDF5 file as an absolute path.
Returns:
None
"""
apply_dict = backend_load_file(file_name=file_name)
result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
backend_write_file(
file_name=file_name,
output=result,
)
63 changes: 2 additions & 61 deletions executorlib/shared/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import subprocess
import sys
from concurrent.futures import Future
from typing import Any, Tuple
from typing import Tuple

from executorlib.shared.command import get_command_path
from executorlib.shared.hdf import dump, get_output, load
from executorlib.shared.hdf import dump, get_output
from executorlib.shared.serialize import serialize_funct_h5


Expand Down Expand Up @@ -47,47 +47,6 @@ def done(self) -> bool:
return get_output(file_name=self._file_name)[0]


def backend_load_file(file_name: str) -> dict:
"""
Load the data from an HDF5 file and convert FutureItem objects to their results.
Args:
file_name (str): The name of the HDF5 file.
Returns:
dict: The loaded data from the file.
"""
apply_dict = load(file_name=file_name)
apply_dict["args"] = [
arg if not isinstance(arg, FutureItem) else arg.result()
for arg in apply_dict["args"]
]
apply_dict["kwargs"] = {
key: arg if not isinstance(arg, FutureItem) else arg.result()
for key, arg in apply_dict["kwargs"].items()
}
return apply_dict


def backend_write_file(file_name: str, output: Any) -> None:
"""
Write the output to an HDF5 file.
Args:
file_name (str): The name of the HDF5 file.
output (Any): The output to be written.
Returns:
None
"""
file_name_out = os.path.splitext(file_name)[0]
os.rename(file_name, file_name_out + ".h5ready")
dump(file_name=file_name_out + ".h5ready", data_dict={"output": output})
os.rename(file_name_out + ".h5ready", file_name_out + ".h5out")


def execute_in_subprocess(
command: list, task_dependent_lst: list = []
) -> subprocess.Popen:
Expand Down Expand Up @@ -180,24 +139,6 @@ def execute_tasks_h5(
}


def execute_task_in_file(file_name: str) -> None:
"""
Execute the task stored in a given HDF5 file.
Args:
file_name (str): The file name of the HDF5 file as an absolute path.
Returns:
None
"""
apply_dict = backend_load_file(file_name=file_name)
result = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
backend_write_file(
file_name=file_name,
output=result,
)


def _get_execute_command(file_name: str, cores: int = 1) -> list:
"""
Get command to call backend as a list of two strings
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cache_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

try:
from executorlib.shared.cache import (
FutureItem,
execute_task_in_file,
_check_task_output,
)
from executorlib.cache.backend import FutureItem
from executorlib.shared.hdf import dump
from executorlib.shared.serialize import serialize_funct_h5

Expand Down

0 comments on commit 0751a0d

Please sign in to comment.