From ca478dc1c82faf2f5a07f1d73e556ee2d438261c Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Wed, 18 Dec 2024 16:54:33 -0500 Subject: [PATCH 1/6] Improving obj serializer --- src/flowcept/commons/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/flowcept/commons/utils.py b/src/flowcept/commons/utils.py index d37ffb62..b00761aa 100644 --- a/src/flowcept/commons/utils.py +++ b/src/flowcept/commons/utils.py @@ -165,6 +165,8 @@ def replace_non_serializable(obj): return [replace_non_serializable(item) for item in obj] else: return obj + elif hasattr(obj, "__dict__"): + return obj.__dict__ else: # Replace non-serializable values with id() return f"{obj.__class__.__name__}_instance_id_{id(obj)}" From 6aec6ec7bf9e9c6ce2bc6c5c9670b1afd9ad0e64 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Fri, 20 Dec 2024 10:08:28 -0500 Subject: [PATCH 2/6] Adding argparse to default args_handler --- Makefile | 2 +- src/flowcept/commons/utils.py | 2 - src/flowcept/flowcept_api/db_api.py | 8 ++++ src/flowcept/instrumentation/flowcept_task.py | 24 +++++++----- .../flowcept_task_decorator_test.py | 38 +++++++++++++++++-- 5 files changed, 57 insertions(+), 17 deletions(-) diff --git a/Makefile b/Makefile index eb1e9470..a0a48dda 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,7 @@ checks: ruff format --check src reformat: - ruff check src --fix + ruff check src --fix --unsafe-fixes ruff format src # Remove cache directories and Sphinx build output diff --git a/src/flowcept/commons/utils.py b/src/flowcept/commons/utils.py index b00761aa..d37ffb62 100644 --- a/src/flowcept/commons/utils.py +++ b/src/flowcept/commons/utils.py @@ -165,8 +165,6 @@ def replace_non_serializable(obj): return [replace_non_serializable(item) for item in obj] else: return obj - elif hasattr(obj, "__dict__"): - return obj.__dict__ else: # Replace non-serializable values with id() return f"{obj.__class__.__name__}_instance_id_{id(obj)}" diff --git a/src/flowcept/flowcept_api/db_api.py b/src/flowcept/flowcept_api/db_api.py index 132d2a6d..47bd38ff 100644 --- a/src/flowcept/flowcept_api/db_api.py +++ b/src/flowcept/flowcept_api/db_api.py @@ -62,6 +62,14 @@ def workflow_query(self, filter) -> List[Dict]: return None return results + def get_tasks_from_current_workflow(self): + """ + Get the tasks of the current workflow in the Flowcept instance. + """ + from flowcept.flowcept_api.flowcept_controller import Flowcept + + return self.task_query(filter={"workflow_id": Flowcept.current_workflow_id}) + def task_query( self, filter: Dict, diff --git a/src/flowcept/instrumentation/flowcept_task.py b/src/flowcept/instrumentation/flowcept_task.py index d1e06629..1e262b81 100644 --- a/src/flowcept/instrumentation/flowcept_task.py +++ b/src/flowcept/instrumentation/flowcept_task.py @@ -3,6 +3,7 @@ import threading from time import time from functools import wraps +import argparse from flowcept.commons.flowcept_dataclasses.task_object import ( TaskObject, ) @@ -21,18 +22,17 @@ # TODO: :code-reorg: consider moving it to utils and reusing it in dask interceptor -def default_args_handler(task_message: TaskObject, *args, **kwargs): +def default_args_handler(*args, **kwargs): """Get default arguments.""" args_handled = {} if args is not None and len(args): + if isinstance(args[0], argparse.Namespace): + args_handled.update(args[0].__dict__) + args = args[1:] for i in range(len(args)): args_handled[f"arg_{i}"] = args[i] if kwargs is not None and len(kwargs): - task_message.workflow_id = kwargs.pop("workflow_id", None) - task_message.campaign_id = kwargs.pop("campaign_id", None) args_handled.update(kwargs) - task_message.workflow_id = task_message.workflow_id or Flowcept.current_workflow_id - task_message.campaign_id = task_message.campaign_id or Flowcept.campaign_id if REPLACE_NON_JSON_SERIALIZABLE: args_handled = replace_non_serializable(args_handled) return args_handled @@ -123,7 +123,10 @@ def wrapper(*args, **kwargs): task_obj = TaskObject() task_obj.activity_id = func.__name__ - task_obj.used = args_handler(task_obj, *args, **kwargs) + handled_args = args_handler(*args, **kwargs) + task_obj.workflow_id = handled_args.pop("workflow_id", Flowcept.current_workflow_id) + task_obj.campaign_id = handled_args.pop("campaign_id", Flowcept.campaign_id) + task_obj.used = handled_args task_obj.started_at = time() task_obj.task_id = str(task_obj.started_at) _thread_local._flowcept_current_context_task_id = task_obj.task_id @@ -139,10 +142,11 @@ def wrapper(*args, **kwargs): task_obj.ended_at = time() task_obj.telemetry_at_end = interceptor.telemetry_capture.capture() try: - if isinstance(result, dict): - task_obj.generated = args_handler(task_obj, **result) - else: - task_obj.generated = args_handler(task_obj, result) + if result is not None: + if isinstance(result, dict): + task_obj.generated = args_handler(task_obj, **result) + else: + task_obj.generated = args_handler(task_obj, result) except Exception as e: logger.exception(e) diff --git a/tests/decorator_tests/flowcept_task_decorator_test.py b/tests/decorator_tests/flowcept_task_decorator_test.py index ad594b30..06051af3 100644 --- a/tests/decorator_tests/flowcept_task_decorator_test.py +++ b/tests/decorator_tests/flowcept_task_decorator_test.py @@ -2,7 +2,7 @@ import psutil import uuid import random - +from unittest.mock import patch import pandas as pd from time import time, sleep @@ -76,6 +76,19 @@ def lightweight_decorated_static_function3(x): return 3 +def parse_args(): + import argparse + parser = argparse.ArgumentParser() + parser.add_argument("--a", type=int, required=True, help="An integer argument") + parser.add_argument("--b", type=str, required=True, help="A string argument") + return parser.parse_known_args() + + +@flowcept_task +def process_arguments_task(known_args, unknown_args): + print(known_args, unknown_args) + + def compute_statistics(array): import numpy as np @@ -213,6 +226,23 @@ def test_decorated_function(self): max_trials=60, ) + @patch("sys.argv", ["script_name", "--a", "123", "--b", "abc", "--unknown_arg", "unk", "['a']"]) + def test_argparse(self): + known_args, unknown_args = parse_args() + self.assertEqual(known_args.a, 123) + self.assertEqual(known_args.b, "abc") + + with Flowcept(): + print(Flowcept.current_workflow_id) + process_arguments_task(known_args, unknown_args) + + task = Flowcept.db.get_tasks_from_current_workflow()[0] + assert task["status"] == Status.FINISHED.value + assert task["used"]["a"] == 123 + assert task["used"]["b"] == "abc" + assert task["used"]["arg_0"] == ['--unknown_arg', 'unk', "['a']"] + + def test_online_offline(self): flowcept.configs.DB_FLUSH_MODE = "offline" # flowcept.instrumentation.decorators.instrumentation_interceptor = ( @@ -291,7 +321,8 @@ def test_flowcept_loop_types(self): loop = FlowceptLoop(items=items, loop_name="our_loop") for _ in loop: loop.end_iter({"a": 1}) - docs = Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id, "activity_id": "our_loop_iteration"}) + docs = Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id, + "activity_id": "our_loop_iteration"}) assert len(docs) == len(items) assert all(d["generated"]["a"] == 1 for d in docs) @@ -363,7 +394,6 @@ def test_flowcept_loop_types(self): docs = Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id}) assert len(docs) == 0 - def test_flowcept_loop_generator(self): number_of_epochs = 1 epochs = range(0, number_of_epochs) @@ -376,7 +406,7 @@ def test_flowcept_loop_generator(self): loop.end_iter({"loss": loss}) docs = Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id}) - assert len(docs) == number_of_epochs+1 # 1 (parent_task) + #epochs (sub_tasks) + assert len(docs) == number_of_epochs + 1 # 1 (parent_task) + #epochs (sub_tasks) iteration_tasks = [] whole_loop_task = None From 91b9c6564bd43db62552c0cd58e85f6a828e3bde Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Fri, 20 Dec 2024 15:14:03 -0500 Subject: [PATCH 3/6] Adding more Task Capture context mgmt --- .github/workflows/run-tests.yml | 2 +- pyproject.toml | 2 +- src/flowcept/__init__.py | 21 ++- .../commons/daos/docdb_dao/lmdb_dao.py | 4 +- .../flowcept_dataclasses/workflow_object.py | 13 +- src/flowcept/configs.py | 52 +++++--- .../flowcept_api/flowcept_controller.py | 26 ++-- src/flowcept/instrumentation/flowcept_task.py | 9 ++ src/flowcept/instrumentation/task_capture.py | 125 ++++++++++++++++++ .../flowcept_task_decorator_test.py | 38 +++++- 10 files changed, 244 insertions(+), 48 deletions(-) create mode 100644 src/flowcept/instrumentation/task_capture.py diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 84a8f7f9..379ca372 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -1,4 +1,4 @@ -name: (With Mongo) Unit, integration, and notebook tests +name: Main Tests on: push: schedule: diff --git a/pyproject.toml b/pyproject.toml index 4f3c6961..40ab058d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,7 @@ line-length = 100 [tool.ruff.lint] extend-select = ["E501", "D"] -ignore = ["D200", "D212", "D105", "D401"] +ignore = ["D200", "D212", "D105", "D401", "D205", "D100"] [tool.ruff.lint.pydocstyle] convention = "numpy" diff --git a/src/flowcept/__init__.py b/src/flowcept/__init__.py index 8d43f7ba..93107d2c 100644 --- a/src/flowcept/__init__.py +++ b/src/flowcept/__init__.py @@ -1,6 +1,5 @@ """Flowcept package.""" -from flowcept.configs import SETTINGS_PATH from flowcept.version import __version__ from flowcept.commons.flowcept_dataclasses.workflow_object import ( @@ -19,6 +18,11 @@ def __getattr__(name): return flowcept_task + elif name == "FlowceptTask": + from flowcept.instrumentation.task_capture import FlowceptTask + + return FlowceptTask + elif name == "flowcept_torch": from flowcept.instrumentation.flowcept_torch import flowcept_torch @@ -34,6 +38,11 @@ def __getattr__(name): return telemetry_flowcept_task + elif name == "lightweight_flowcept_task": + from flowcept.instrumentation.flowcept_task import lightweight_flowcept_task + + return lightweight_flowcept_task + if name == "MLFlowInterceptor": from flowcept.flowceptor.adapters.mlflow.mlflow_interceptor import ( MLFlowInterceptor, @@ -58,12 +67,10 @@ def __getattr__(name): ) return TensorboardInterceptor - elif name == "ZambezeInterceptor": - from flowcept.flowceptor.adapters.zambeze.zambeze_interceptor import ( - ZambezeInterceptor, - ) + elif name == "SETTINGS_PATH": + from configs import SETTINGS_PATH - return ZambezeInterceptor + return SETTINGS_PATH elif name == "TaskQueryAPI": from flowcept.flowcept_api.task_query_api import TaskQueryAPI @@ -80,7 +87,9 @@ def __getattr__(name): "TaskQueryAPI", "flowcept_task", "FlowceptLoop", + "FlowceptTask", "telemetry_flowcept_task", + "lightweight_flowcept_task", "Flowcept", "flowcept_torch", "WorkflowObject", diff --git a/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py b/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py index 2acd6123..dede1e46 100644 --- a/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py +++ b/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py @@ -12,7 +12,7 @@ from flowcept import WorkflowObject from flowcept.commons.daos.docdb_dao.docdb_dao_base import DocumentDBDAO -from flowcept.configs import PERF_LOG, DATABASES +from flowcept.configs import PERF_LOG, LMDB_SETTINGS from flowcept.flowceptor.consumers.consumer_utils import curate_dict_task_messages @@ -36,7 +36,7 @@ def __init__(self): def _open(self): """Open LMDB environment and databases.""" - _path = DATABASES.get("lmdb").get("path", "lmdb") + _path = LMDB_SETTINGS.get("path", "flowcept_lmdb") self._env = lmdb.open(_path, map_size=10**12, max_dbs=2) self._tasks_db = self._env.open_db(b"tasks") self._workflows_db = self._env.open_db(b"workflows") diff --git a/src/flowcept/commons/flowcept_dataclasses/workflow_object.py b/src/flowcept/commons/flowcept_dataclasses/workflow_object.py index fe36d816..34a9c06d 100644 --- a/src/flowcept/commons/flowcept_dataclasses/workflow_object.py +++ b/src/flowcept/commons/flowcept_dataclasses/workflow_object.py @@ -2,7 +2,7 @@ from typing import Dict, AnyStr, List import msgpack -from omegaconf import OmegaConf +from omegaconf import OmegaConf, DictConfig from flowcept.version import __version__ from flowcept.commons.utils import get_utc_now @@ -70,7 +70,9 @@ def to_dict(self): def enrich(self, adapter_key=None): """Enrich it.""" self.utc_timestamp = get_utc_now() - self.flowcept_settings = OmegaConf.to_container(settings) + self.flowcept_settings = ( + OmegaConf.to_container(settings) if isinstance(settings, DictConfig) else settings + ) if adapter_key is not None: # TODO :base-interceptor-refactor: :code-reorg: :usability: @@ -90,7 +92,12 @@ def enrich(self, adapter_key=None): self.sys_name = SYS_NAME if self.extra_metadata is None and EXTRA_METADATA is not None: - self.extra_metadata = OmegaConf.to_container(EXTRA_METADATA) + _extra_metadata = ( + OmegaConf.to_container(EXTRA_METADATA) + if isinstance(EXTRA_METADATA, DictConfig) + else EXTRA_METADATA + ) + self.extra_metadata = _extra_metadata if self.flowcept_version is None: self.flowcept_version = __version__ diff --git a/src/flowcept/configs.py b/src/flowcept/configs.py index dba19ed2..b3970173 100644 --- a/src/flowcept/configs.py +++ b/src/flowcept/configs.py @@ -3,26 +3,46 @@ import os import socket import getpass - -from omegaconf import OmegaConf import random + +PROJECT_NAME = "flowcept" +USE_DEFAULT = os.getenv("FLOWCEPT_USE_DEFAULT", "False").lower() == "true" ######################## # Project Settings # ######################## -PROJECT_NAME = os.getenv("PROJECT_NAME", "flowcept") -_SETTINGS_DIR = os.path.expanduser(f"~/.{PROJECT_NAME}") -SETTINGS_PATH = os.getenv("FLOWCEPT_SETTINGS_PATH", f"{_SETTINGS_DIR}/settings.yaml") +if USE_DEFAULT: + settings = { + "log": {}, + "project": {}, + "telemetry_capture": {}, + "instrumentation": {}, + "experiment": {}, + "mq": {}, + "kv_db": {}, + "web_server": {}, + "sys_metadata": {}, + "extra_metadata": {}, + "analytics": {}, + "buffer": {}, + "databases": {}, + "adapters": {}, + } +else: + from omegaconf import OmegaConf + + _SETTINGS_DIR = os.path.expanduser(f"~/.{PROJECT_NAME}") + SETTINGS_PATH = os.getenv("FLOWCEPT_SETTINGS_PATH", f"{_SETTINGS_DIR}/settings.yaml") -if not os.path.exists(SETTINGS_PATH): - SETTINGS_PATH = None - from importlib import resources + if not os.path.exists(SETTINGS_PATH): + SETTINGS_PATH = None + from importlib import resources - with resources.files("resources").joinpath("sample_settings.yaml").open("r") as f: - settings = OmegaConf.load(f) -else: - settings = OmegaConf.load(SETTINGS_PATH) + with resources.files("resources").joinpath("sample_settings.yaml").open("r") as f: + settings = OmegaConf.load(f) + else: + settings = OmegaConf.load(SETTINGS_PATH) ######################## # Log Settings # @@ -96,13 +116,13 @@ ###################### # LMDB Settings # ###################### -_lmdb_settings = DATABASES.get("lmdb", None) +LMDB_SETTINGS = DATABASES.get("lmdb", {}) LMDB_ENABLED = False -if _lmdb_settings: +if LMDB_SETTINGS: if "LMDB_ENABLED" in os.environ: LMDB_ENABLED = os.environ.get("LMDB_ENABLED").lower() == "true" else: - LMDB_ENABLED = _lmdb_settings.get("enabled", False) + LMDB_ENABLED = LMDB_SETTINGS.get("enabled", False) if not LMDB_ENABLED and not MONGO_ENABLED: # At least one of these variables need to be enabled. @@ -261,7 +281,7 @@ #################### INSTRUMENTATION = settings.get("instrumentation", {}) -INSTRUMENTATION_ENABLED = INSTRUMENTATION.get("enabled", False) +INSTRUMENTATION_ENABLED = True # INSTRUMENTATION.get("enabled", False) #################### # Enabled ADAPTERS # diff --git a/src/flowcept/flowcept_api/flowcept_controller.py b/src/flowcept/flowcept_api/flowcept_controller.py index 5fd05486..6d0d17ad 100644 --- a/src/flowcept/flowcept_api/flowcept_controller.py +++ b/src/flowcept/flowcept_api/flowcept_controller.py @@ -41,6 +41,7 @@ def __init__( workflow_name: str = None, workflow_args: str = None, start_persistence=True, + save_workflow=True, ): """Flowcept controller. @@ -58,6 +59,7 @@ def __init__( start_persistence - Whether you want to persist the messages in one of the DBs defined in the `databases` settings. + save_workflow - Whether you want to send a workflow object message. """ self.logger = FlowceptLogger() self._enable_persistence = start_persistence @@ -82,6 +84,7 @@ def __init__( interceptors = [interceptors] self._interceptors: List[BaseInterceptor] = interceptors + self._save_workflow = save_workflow self.current_workflow_id = workflow_id self.campaign_id = campaign_id self.workflow_name = workflow_name @@ -105,18 +108,17 @@ def start(self): self.logger.debug(f"...Flowceptor {key} started ok!") if interceptor.kind == "instrumentation": - wf_obj = WorkflowObject() - wf_obj.workflow_id = self.current_workflow_id or str(uuid4()) - wf_obj.campaign_id = self.campaign_id or str(uuid4()) - - Flowcept.current_workflow_id = wf_obj.workflow_id - Flowcept.campaign_id = wf_obj.campaign_id - - if self.workflow_name: - wf_obj.name = self.workflow_name - if self.workflow_args: - wf_obj.used = self.workflow_args - interceptor.send_workflow_message(wf_obj) + Flowcept.current_workflow_id = self.current_workflow_id or str(uuid4()) + Flowcept.campaign_id = self.campaign_id or str(uuid4()) + if self._save_workflow: + wf_obj = WorkflowObject() + wf_obj.workflow_id = Flowcept.current_workflow_id + wf_obj.campaign_id = Flowcept.campaign_id + if self.workflow_name: + wf_obj.name = self.workflow_name + if self.workflow_args: + wf_obj.used = self.workflow_args + interceptor.send_workflow_message(wf_obj) else: Flowcept.current_workflow_id = None diff --git a/src/flowcept/instrumentation/flowcept_task.py b/src/flowcept/instrumentation/flowcept_task.py index 1e262b81..fec469fb 100644 --- a/src/flowcept/instrumentation/flowcept_task.py +++ b/src/flowcept/instrumentation/flowcept_task.py @@ -107,6 +107,15 @@ def wrapper(*args, **kwargs): return decorator(func) +# def flowcept_task_switch(mode=None): +# if mode is None: +# return flowcept_task +# elif mode == "disable": +# return lambda _: _ +# else: +# raise NotImplementedError + + def flowcept_task(func=None, **decorator_kwargs): """Get flowcept task.""" if INSTRUMENTATION_ENABLED: diff --git a/src/flowcept/instrumentation/task_capture.py b/src/flowcept/instrumentation/task_capture.py new file mode 100644 index 00000000..9d2b2609 --- /dev/null +++ b/src/flowcept/instrumentation/task_capture.py @@ -0,0 +1,125 @@ +from time import time +from typing import Dict + +from flowcept.commons.flowcept_dataclasses.task_object import ( + TaskObject, +) +from flowcept.commons.vocabulary import Status +from flowcept.configs import INSTRUMENTATION_ENABLED +from flowcept.flowcept_api.flowcept_controller import Flowcept +from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor + + +class FlowceptTask(object): + """ + A context manager for capturing and provenance and task telemetry data within the Flowcept + framework. + + This class encapsulates the lifecycle of a task, recording its start and end times, telemetry, + and metadata. It integrates with the Flowcept API and Instrumentation Interceptor to + log task-specific details. + + Parameters + ---------- + task_id : str, optional + Unique identifier for the task. If not provided, it defaults to the current timestamp. + workflow_id : str, optional + ID of the workflow to which this task belongs. Defaults to the current workflow ID from + Flowcept. + campaign_id : str, optional + ID of the campaign to which this task belongs. Defaults to the current campaign ID from + Flowcept. + used : Dict, optional + Metadata about the resources or data used during the task execution. + custom_metadata : Dict, optional + User-defined metadata associated with the task. + + Methods + ------- + __enter__() + Sets up the task context. + __exit__(exc_type, exc_val, exc_tb) + Ends the task context, ensuring telemetry and metadata are recorded. + end(generated=None, ended_at=None, stdout=None, stderr=None, status=Status.FINISHED) + Finalizes the task, capturing telemetry, status, and other details. + + Notes + ----- + If instrumentation is disabled (`INSTRUMENTATION_ENABLED` is False), the methods in this class + are no-ops, and no data is captured. + """ + + if INSTRUMENTATION_ENABLED: + _interceptor = InstrumentationInterceptor.get_instance() + + def __init__( + self, + task_id: str = None, + workflow_id: str = None, + campaign_id: str = None, + used: Dict = None, + custom_metadata: Dict = None, + ): + if not INSTRUMENTATION_ENABLED: + self._ended = True + return + self._task = TaskObject() + self._task.telemetry_at_start = FlowceptTask._interceptor.telemetry_capture.capture() + self._task.started_at = time() + self._task.task_id = task_id or str(self._task.started_at) + self._task.workflow_id = workflow_id or Flowcept.current_workflow_id + self._task.campaign_id = campaign_id or Flowcept.campaign_id + self._task.used = used + self._task.custom_metadata = custom_metadata + self._ended = False + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if not self._ended: + self.end() + + def end( + self, + generated: Dict = None, + ended_at: float = None, + stdout: str = None, + stderr: str = None, + status: Status = Status.FINISHED, + ): + """ + Finalizes the task by capturing its end state, telemetry, and status. + + This method records the task's ending telemetry data, status, and any outputs or errors. + It also sends the task data to the instrumentation interceptor for logging or further + processing. + + Parameters + ---------- + generated : Dict, optional + Metadata or data generated during the task's execution. Defaults to None. + ended_at : float, optional + Timestamp indicating when the task ended. If not provided, defaults to the current time. + stdout : str, optional + Standard output captured during the task's execution. Defaults to None. + stderr : str, optional + Standard error captured during the task's execution. Defaults to None. + status : Status, optional + Status of the task at the time of completion. Defaults to `Status.FINISHED`. + + Notes + ----- + If instrumentation is disabled (`INSTRUMENTATION_ENABLED` is False), this method performs + no actions. + """ + if not INSTRUMENTATION_ENABLED: + return + self._task.telemetry_at_end = FlowceptTask._interceptor.telemetry_capture.capture() + self._task.ended_at = ended_at or time() + self._task.status = status + self._task.stderr = stderr + self._task.stdout = stdout + self._task.generated = generated + FlowceptTask._interceptor.intercept(self._task.to_dict()) + self._ended = True diff --git a/tests/decorator_tests/flowcept_task_decorator_test.py b/tests/decorator_tests/flowcept_task_decorator_test.py index 06051af3..2010f958 100644 --- a/tests/decorator_tests/flowcept_task_decorator_test.py +++ b/tests/decorator_tests/flowcept_task_decorator_test.py @@ -6,18 +6,19 @@ import pandas as pd from time import time, sleep -from flowcept import Flowcept -import flowcept import unittest +import flowcept from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.utils import assert_by_querying_tasks_until from flowcept.commons.vocabulary import Status -from flowcept.instrumentation.flowcept_loop import FlowceptLoop -from flowcept.instrumentation.flowcept_task import ( - flowcept_task, - lightweight_flowcept_task, -) +from flowcept import FlowceptTask, FlowceptLoop, Flowcept, lightweight_flowcept_task, flowcept_task +# from flowcept.instrumentation.flowcept_loop import FlowceptLoop +# from flowcept.instrumentation.flowcept_task import ( +# flowcept_task, +# lightweight_flowcept_task, +# ) +# from flowcept.instrumentation.task_capture import FlowceptTask def calc_time_to_sleep() -> float: @@ -429,3 +430,26 @@ def test_flowcept_loop_generator(self): assert t["used"]["epoch"] == i assert t["status"] == Status.FINISHED.value assert t["parent_task_id"] == whole_loop_task["task_id"] + + def test_task_capture(self): + + with Flowcept(): + used_args = {"a": 1} + with FlowceptTask(used=used_args) as t: + t.end(generated={"b": 2}) + + task = Flowcept.db.get_tasks_from_current_workflow()[0] + assert task["used"]["a"] == 1 + assert task["generated"]["b"] == 2 + assert task["status"] == Status.FINISHED.value + + with Flowcept(): + used_args = {"a": 1} + with FlowceptTask(used=used_args): + pass + + task = Flowcept.db.get_tasks_from_current_workflow()[0] + assert task["used"]["a"] == 1 + assert task["status"] == Status.FINISHED.value + assert "generated" not in task + From d365b4c583327cce2613961cc7fc24731186420b Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Fri, 20 Dec 2024 16:02:38 -0500 Subject: [PATCH 4/6] GH actions matrix strategy for python version --- .../workflows/create-release-n-publish.yml | 3 + ...py11-all-dbs.yml => run-tests-all-dbs.yml} | 9 +- .github/workflows/run-tests-in-container.yml | 2 +- .github/workflows/run-tests-py11-simple.yml | 56 ------------ .github/workflows/run-tests-py11.yml | 90 ------------------- .github/workflows/run-tests-simple.yml | 7 +- .github/workflows/run-tests.yml | 7 +- 7 files changed, 20 insertions(+), 154 deletions(-) rename .github/workflows/{run-tests-py11-all-dbs.yml => run-tests-all-dbs.yml} (88%) delete mode 100644 .github/workflows/run-tests-py11-simple.yml delete mode 100644 .github/workflows/run-tests-py11.yml diff --git a/.github/workflows/create-release-n-publish.yml b/.github/workflows/create-release-n-publish.yml index da182417..486848c3 100644 --- a/.github/workflows/create-release-n-publish.yml +++ b/.github/workflows/create-release-n-publish.yml @@ -8,6 +8,9 @@ jobs: build: name: Create Release and Publish runs-on: ubuntu-latest + env: + MONGO_ENABLED: false + LMDB_ENABLED: true timeout-minutes: 60 steps: - name: Checkout code diff --git a/.github/workflows/run-tests-py11-all-dbs.yml b/.github/workflows/run-tests-all-dbs.yml similarity index 88% rename from .github/workflows/run-tests-py11-all-dbs.yml rename to .github/workflows/run-tests-all-dbs.yml index 7eaa0c79..3e46900f 100644 --- a/.github/workflows/run-tests-py11-all-dbs.yml +++ b/.github/workflows/run-tests-all-dbs.yml @@ -1,4 +1,4 @@ -name: (With and Without Mongo) Tests on py11 +name: With and Without Mongo on: [pull_request] jobs: @@ -6,6 +6,9 @@ jobs: build: runs-on: ubuntu-latest timeout-minutes: 40 + strategy: + matrix: + python-version: [ "3.10", "3.11", "3.12", "3.13" ] env: MONGO_ENABLED: true LMDB_ENABLED: true @@ -17,10 +20,10 @@ jobs: with: fetch-depth: 1 - - name: Set up Python 3.11 + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: - python-version: "3.11" + python-version: ${{ matrix.python-version }} cache: "pip" - name: Show OS Info diff --git a/.github/workflows/run-tests-in-container.yml b/.github/workflows/run-tests-in-container.yml index 890753e2..043ae614 100644 --- a/.github/workflows/run-tests-in-container.yml +++ b/.github/workflows/run-tests-in-container.yml @@ -1,4 +1,4 @@ -name: (With and Without Mongo) Tests inside a Container Tests inside a Container +name: (With and Without Mongo) Inside a Container on: [pull_request] jobs: diff --git a/.github/workflows/run-tests-py11-simple.yml b/.github/workflows/run-tests-py11-simple.yml deleted file mode 100644 index 64cb009b..00000000 --- a/.github/workflows/run-tests-py11-simple.yml +++ /dev/null @@ -1,56 +0,0 @@ -name: (Without Mongo) Tests on py11 -on: [pull_request] - -jobs: - - build: - runs-on: ubuntu-latest - timeout-minutes: 40 - env: - MONGO_ENABLED: false - if: "!contains(github.event.head_commit.message, 'CI Bot')" - - steps: - - uses: actions/checkout@v4 - with: - fetch-depth: 1 - - - name: Set up Python 3.11 - uses: actions/setup-python@v5 - with: - python-version: "3.11" - cache: "pip" - - - name: Show OS Info - run: '[[ "$OSTYPE" == "linux-gnu"* ]] && { echo "OS Type: Linux"; (command -v lsb_release &> /dev/null && lsb_release -a) || cat /etc/os-release; uname -r; } || [[ "$OSTYPE" == "darwin"* ]] && { echo "OS Type: macOS"; sw_vers; uname -r; } || echo "Unsupported OS type: $OSTYPE"' - - - name: Start docker compose with redis - run: make services - - - name: Upgrade pip - run: python -m pip install --upgrade pip - - - name: Show Python version - run: python --version && pip --version - - - name: Test examples - run: bash .github/workflows/run_examples.sh examples false # without mongo - - - name: Install all dependencies - run: | - python -m pip install --upgrade pip - python -m pip install .[all] - python -m pip install .[ml_dev] - - - name: List installed packages - run: pip list - - - name: Test with pytest and redis - run: | - make tests - - - name: Test notebooks with pytest and redis - run: pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore="notebooks/dask_from_CLI.ipynb" --ignore="notebooks/analytics.ipynb" - - - name: Stop services - run: make services-stop diff --git a/.github/workflows/run-tests-py11.yml b/.github/workflows/run-tests-py11.yml deleted file mode 100644 index 8f0a832b..00000000 --- a/.github/workflows/run-tests-py11.yml +++ /dev/null @@ -1,90 +0,0 @@ -name: (With Mongo) Tests on py11 -on: [pull_request] - -jobs: - - build: - runs-on: ubuntu-latest - timeout-minutes: 40 - env: - MONGO_ENABLED: true - LMDB_ENABLED: false - if: "!contains(github.event.head_commit.message, 'CI Bot')" - - steps: - - uses: actions/checkout@v4 - with: - fetch-depth: 1 - - - name: Set up Python 3.11 - uses: actions/setup-python@v5 - with: - python-version: "3.11" - cache: "pip" - - - name: Show OS Info - run: '[[ "$OSTYPE" == "linux-gnu"* ]] && { echo "OS Type: Linux"; (command -v lsb_release &> /dev/null && lsb_release -a) || cat /etc/os-release; uname -r; } || [[ "$OSTYPE" == "darwin"* ]] && { echo "OS Type: macOS"; sw_vers; uname -r; } || echo "Unsupported OS type: $OSTYPE"' - - - name: Start docker compose with redis - run: make services-mongo - - - name: Upgrade pip - run: python -m pip install --upgrade pip - - - name: Show Python version - run: python --version && pip --version - - - name: Test examples - run: bash .github/workflows/run_examples.sh examples true # with mongo - - - name: Install all dependencies - run: | - python -m pip install .[all] - python -m pip install .[ml_dev] - - - name: List installed packages - run: pip list - - - name: Test with pytest and redis - run: | - make tests - - - name: Test notebooks with pytest and redis - run: make tests-notebooks - - - name: Stop services - run: make services-stop-mongo - - - name: Clean up - run: | - make clean - find /home/runner/runners/ -type f -name "*.log" -exec sh -c 'echo {}; >"{}"' \; || true - - - name: Start docker compose with kafka - run: docker compose -f deployment/compose-kafka.yml up -d - - - name: Wait for one minute - run: sleep 60 - - - name: Check liveness - run: | - export MQ_TYPE=kafka - export MQ_PORT=9092 - python -c 'from flowcept.configs import MQ_TYPE, MQ_PORT; print(f"MQ_TYPE={MQ_TYPE}"); print(f"MQ_PORT={MQ_PORT}")' - python -c 'from flowcept import Flowcept; assert Flowcept.services_alive()' - - - name: Run tests with kafka - run: | - export MQ_TYPE=kafka - export MQ_PORT=9092 - # Ignoring heavy tests. They are executed with Kafka in another GH Action. - pytest - - - name: Stop services - run: docker compose -f deployment/compose-kafka.yml down - - - name: Clean up - run: | - make clean - find /home/runner/runners/ -type f -name "*.log" -exec sh -c 'echo {}; >"{}"' \; || true - docker image prune -a -f diff --git a/.github/workflows/run-tests-simple.yml b/.github/workflows/run-tests-simple.yml index 1a77ea98..5adffa6c 100644 --- a/.github/workflows/run-tests-simple.yml +++ b/.github/workflows/run-tests-simple.yml @@ -8,6 +8,9 @@ jobs: build: runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ "3.10", "3.11", "3.12", "3.13" ] env: MONGO_ENABLED: false LMDB_ENABLED: true @@ -19,10 +22,10 @@ jobs: with: fetch-depth: 1 - - name: Set up Python 3.10 + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: - python-version: "3.10" + python-version: ${{ matrix.python-version }} cache: "pip" - name: Show OS Info diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 379ca372..b1350966 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -8,6 +8,9 @@ jobs: build: runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ "3.10", "3.11", "3.12", "3.13" ] env: MONGO_ENABLED: true LMDB_ENABLED: false @@ -19,10 +22,10 @@ jobs: with: fetch-depth: 1 - - name: Set up Python 3.10 + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: - python-version: "3.10" + python-version: ${{ matrix.python-version }} cache: "pip" - name: Show OS Info From 0e24308eb6f578e493f99e7f29a7c0a162eaeef0 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Fri, 20 Dec 2024 16:08:57 -0500 Subject: [PATCH 5/6] Removing py313b --- .github/workflows/run-tests-simple.yml | 2 +- .github/workflows/run-tests.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/run-tests-simple.yml b/.github/workflows/run-tests-simple.yml index 5adffa6c..3d6135a0 100644 --- a/.github/workflows/run-tests-simple.yml +++ b/.github/workflows/run-tests-simple.yml @@ -10,7 +10,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ "3.10", "3.11", "3.12", "3.13" ] + python-version: [ "3.10", "3.11", "3.12" ] env: MONGO_ENABLED: false LMDB_ENABLED: true diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index b1350966..15a250c4 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -10,7 +10,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ "3.10", "3.11", "3.12", "3.13" ] + python-version: [ "3.10", "3.11", "3.12" ] env: MONGO_ENABLED: true LMDB_ENABLED: false From f36fbf777282e49e4f2cb855cdb503ceff0f313c Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Fri, 20 Dec 2024 16:28:22 -0500 Subject: [PATCH 6/6] Updating GH actions --- .github/workflows/run-tests-all-dbs.yml | 4 ++-- .github/workflows/run-tests-kafka.yml | 7 +++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/.github/workflows/run-tests-all-dbs.yml b/.github/workflows/run-tests-all-dbs.yml index 3e46900f..a57b664d 100644 --- a/.github/workflows/run-tests-all-dbs.yml +++ b/.github/workflows/run-tests-all-dbs.yml @@ -1,4 +1,4 @@ -name: With and Without Mongo +name: With and Without Mongo at the same time on: [pull_request] jobs: @@ -8,7 +8,7 @@ jobs: timeout-minutes: 40 strategy: matrix: - python-version: [ "3.10", "3.11", "3.12", "3.13" ] + python-version: [ "3.10", "3.11", "3.12" ] env: MONGO_ENABLED: true LMDB_ENABLED: true diff --git a/.github/workflows/run-tests-kafka.yml b/.github/workflows/run-tests-kafka.yml index 1ec04bee..8751cf1d 100644 --- a/.github/workflows/run-tests-kafka.yml +++ b/.github/workflows/run-tests-kafka.yml @@ -10,6 +10,9 @@ jobs: build: runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ "3.10", "3.11", "3.12" ] env: MONGO_ENABLED: true LMDB_ENABLED: false @@ -21,10 +24,10 @@ jobs: with: fetch-depth: 1 - - name: Set up Python 3.10 + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: - python-version: "3.10" + python-version: ${{ matrix.python-version }} cache: "pip" - name: Run docker compose