diff --git a/dff/pipeline/conditions.py b/dff/pipeline/conditions.py index b967b72ee..fe7e65ce5 100644 --- a/dff/pipeline/conditions.py +++ b/dff/pipeline/conditions.py @@ -38,22 +38,22 @@ def service_successful_condition(path: Optional[str] = None) -> StartConditionCh """ def check_service_state(ctx: Context, _: Pipeline): - state = ctx.framework_states[PIPELINE_STATE_KEY].get(path, ComponentExecutionState.NOT_RUN.name) + state = ctx.framework_states[PIPELINE_STATE_KEY].get(path, ComponentExecutionState.NOT_RUN) return ComponentExecutionState[state] == ComponentExecutionState.FINISHED return check_service_state -def not_condition(function: StartConditionCheckerFunction) -> StartConditionCheckerFunction: +def not_condition(func: StartConditionCheckerFunction) -> StartConditionCheckerFunction: """ Condition that returns opposite boolean value to the one returned by incoming function. Returns :py:data:`~.StartConditionCheckerFunction`. - :param function: The function to return opposite of. + :param func: The function to return opposite of. """ def not_function(ctx: Context, pipeline: Pipeline): - return not function(ctx, pipeline) + return not func(ctx, pipeline) return not_function @@ -70,7 +70,7 @@ def aggregate_condition( """ def aggregation_function(ctx: Context, pipeline: Pipeline): - return aggregator([function(ctx, pipeline) for function in functions]) + return aggregator([func(ctx, pipeline) for func in functions]) return aggregation_function diff --git a/dff/pipeline/pipeline/component.py b/dff/pipeline/pipeline/component.py index fab655c7d..2291a4d55 100644 --- a/dff/pipeline/pipeline/component.py +++ b/dff/pipeline/pipeline/component.py @@ -115,7 +115,7 @@ def _set_state(self, ctx: Context, value: ComponentExecutionState): """ if PIPELINE_STATE_KEY not in ctx.framework_states: ctx.framework_states[PIPELINE_STATE_KEY] = {} - ctx.framework_states[PIPELINE_STATE_KEY][self.path] = value.name + ctx.framework_states[PIPELINE_STATE_KEY][self.path] = value def get_state(self, ctx: Context, default: Optional[ComponentExecutionState] = None) -> ComponentExecutionState: """ @@ -127,9 +127,7 @@ def get_state(self, ctx: Context, default: Optional[ComponentExecutionState] = N (usually it's :py:attr:`~.pipeline.types.ComponentExecutionState.NOT_RUN`). :return: :py:class:`~pipeline.types.ComponentExecutionState` of this service or default if not found. """ - return ComponentExecutionState[ - ctx.framework_states[PIPELINE_STATE_KEY].get(self.path, default if default is not None else None) - ] + return ctx.framework_states[PIPELINE_STATE_KEY].get(self.path, default if default is not None else None) @property def asynchronous(self) -> bool: @@ -162,7 +160,7 @@ async def run_extra_handler(self, stage: ExtraHandlerType, ctx: Context, pipelin if extra_handler.asynchronous and isinstance(extra_handler_result, Awaitable): await extra_handler_result except asyncio.TimeoutError: - logger.warning(f"{type(self).__name__} '{self.name}' {extra_handler.stage.name} extra handler timed out!") + logger.warning(f"{type(self).__name__} '{self.name}' {extra_handler.stage} extra handler timed out!") @abc.abstractmethod async def _run(self, ctx: Context, pipeline: Optional[Pipeline] = None) -> Optional[Context]: @@ -213,15 +211,15 @@ def _get_runtime_info(self, ctx: Context) -> ServiceRuntimeInfo: :param ctx: Current dialog :py:class:`~.Context`. :return: :py:class:`~.dff.script.typing.ServiceRuntimeInfo` - dict where all not set fields are replaced with `[None]`. - """ - return { - "name": self.name if self.name is not None else "[None]", - "path": self.path if self.path is not None else "[None]", - "timeout": self.timeout, - "asynchronous": self.asynchronous, - "execution_state": copy.deepcopy(ctx.framework_states[PIPELINE_STATE_KEY]), - } + object where all not set fields are replaced with `[None]`. + """ + return ServiceRuntimeInfo( + name=self.name if self.name is not None else "[None]", + path=self.path if self.path is not None else "[None]", + timeout=self.timeout, + asynchronous=self.asynchronous, + execution_state=copy.deepcopy(ctx.framework_states[PIPELINE_STATE_KEY]), + ) @property def info_dict(self) -> dict: diff --git a/dff/pipeline/service/extra.py b/dff/pipeline/service/extra.py index 49f152163..dc1242fba 100644 --- a/dff/pipeline/service/extra.py +++ b/dff/pipeline/service/extra.py @@ -69,7 +69,7 @@ def __init__( self.functions = functions self.timeout = timeout self.requested_async_flag = asynchronous - self.calculated_async_flag = all([asyncio.iscoroutinefunction(function) for function in self.functions]) + self.calculated_async_flag = all([asyncio.iscoroutinefunction(func) for func in self.functions]) self.stage = stage else: raise Exception(f"Unknown type for {type(self).__name__} {functions}") @@ -93,24 +93,20 @@ def asynchronous(self) -> bool: return self.calculated_async_flag if self.requested_async_flag is None else self.requested_async_flag async def _run_function( - self, function: ExtraHandlerFunction, ctx: Context, pipeline: Pipeline, component_info: ServiceRuntimeInfo + self, func: ExtraHandlerFunction, ctx: Context, pipeline: Pipeline, component_info: ServiceRuntimeInfo ): - handler_params = len(inspect.signature(function).parameters) + handler_params = len(inspect.signature(func).parameters) if handler_params == 1: - await wrap_sync_function_in_async(function, ctx) + await wrap_sync_function_in_async(func, ctx) elif handler_params == 2: - await wrap_sync_function_in_async(function, ctx, pipeline) + await wrap_sync_function_in_async(func, ctx, pipeline) elif handler_params == 3: - extra_handler_runtime_info: ExtraHandlerRuntimeInfo = { - "function": function, - "stage": self.stage, - "component": component_info, - } - await wrap_sync_function_in_async(function, ctx, pipeline, extra_handler_runtime_info) + extra_handler_runtime_info = ExtraHandlerRuntimeInfo(func=func, stage=self.stage, component=component_info) + await wrap_sync_function_in_async(func, ctx, pipeline, extra_handler_runtime_info) else: raise Exception( - f"Too many parameters required for component {component_info['name']} {self.stage.name}" - f" wrapper handler '{function.__name__}': {handler_params}!" + f"Too many parameters required for component {component_info.name} {self.stage}" + f" wrapper handler '{func.__name__}': {handler_params}!" ) async def _run(self, ctx: Context, pipeline: Pipeline, component_info: ServiceRuntimeInfo): @@ -126,18 +122,16 @@ async def _run(self, ctx: Context, pipeline: Pipeline, component_info: ServiceRu """ if self.asynchronous: - futures = [self._run_function(function, ctx, pipeline, component_info) for function in self.functions] - for function, future in zip(self.functions, asyncio.as_completed(futures)): + futures = [self._run_function(func, ctx, pipeline, component_info) for func in self.functions] + for func, future in zip(self.functions, asyncio.as_completed(futures)): try: await future except asyncio.TimeoutError: - logger.warning( - f"Component {component_info['name']} {self.stage.name} wrapper '{function.__name__}' timed out!" - ) + logger.warning(f"Component {component_info.name} {self.stage} wrapper '{func.__name__}' timed out!") else: - for function in self.functions: - await self._run_function(function, ctx, pipeline, component_info) + for func in self.functions: + await self._run_function(func, ctx, pipeline, component_info) async def __call__(self, ctx: Context, pipeline: Pipeline, component_info: ServiceRuntimeInfo): """ @@ -167,7 +161,7 @@ def info_dict(self) -> dict: "type": type(self).__name__, "timeout": self.timeout, "asynchronous": self.asynchronous, - "functions": [function.__name__ for function in self.functions], + "functions": [func.__name__ for func in self.functions], } diff --git a/dff/pipeline/service/group.py b/dff/pipeline/service/group.py index 22b8b0453..07518c300 100644 --- a/dff/pipeline/service/group.py +++ b/dff/pipeline/service/group.py @@ -244,7 +244,7 @@ def _create_components(services: ServiceGroupBuilder) -> List[Union[Service, "Se :type services: :py:data:`~.ServiceGroupBuilder` :return: List of services and service groups. """ - handled_services = [] + handled_services: List[Union[Service, "ServiceGroup"]] = [] for service in services: if isinstance(service, List) or isinstance(service, ServiceGroup): handled_services.append(ServiceGroup(service)) diff --git a/dff/pipeline/service/service.py b/dff/pipeline/service/service.py index 9ab39c49e..406ea3a81 100644 --- a/dff/pipeline/service/service.py +++ b/dff/pipeline/service/service.py @@ -180,6 +180,7 @@ async def _run(self, ctx: Context, pipeline: Optional[Pipeline] = None) -> Optio if isinstance(self.handler, str) and self.handler == "ACTOR": return ctx + return None @property def info_dict(self) -> dict: diff --git a/dff/pipeline/service/utils.py b/dff/pipeline/service/utils.py index 744564e0a..e3162418c 100644 --- a/dff/pipeline/service/utils.py +++ b/dff/pipeline/service/utils.py @@ -8,20 +8,20 @@ from typing import Callable, Any, Optional, Tuple, Mapping -async def wrap_sync_function_in_async(function: Callable, *args, **kwargs) -> Any: +async def wrap_sync_function_in_async(func: Callable, *args, **kwargs) -> Any: """ Utility function, that wraps both functions and coroutines in coroutines. - Invokes `function` if it is just a callable and awaits, if this is a coroutine. + Invokes `func` if it is just a callable and awaits, if this is a coroutine. - :param function: Callable to wrap. + :param func: Callable to wrap. :param \\*args: Function args. :param \\**kwargs: Function kwargs. :return: What function returns. """ - if asyncio.iscoroutinefunction(function): - return await function(*args, **kwargs) + if asyncio.iscoroutinefunction(func): + return await func(*args, **kwargs) else: - return function(*args, **kwargs) + return func(*args, **kwargs) def _get_attrs_with_updates( diff --git a/dff/pipeline/types.py b/dff/pipeline/types.py index 98f449d25..0a1e679fe 100644 --- a/dff/pipeline/types.py +++ b/dff/pipeline/types.py @@ -6,26 +6,27 @@ data structures, and other types that are defined for type hinting. """ from abc import ABC -from enum import unique, Enum, auto -from typing import Callable, Union, Awaitable, Dict, List, Optional, NewType, Iterable +from enum import unique, Enum +from typing import Callable, Union, Awaitable, Dict, List, Optional, NewType, Iterable, Any from dff.context_storages import DBContextStorage from dff.script import Context, ActorStage, NodeLabel2Type, Script from typing_extensions import NotRequired, TypedDict, TypeAlias +from pydantic import BaseModel -_ForwardPipeline = NewType("Pipeline", None) -_ForwardPipelineComponent = NewType("PipelineComponent", None) +_ForwardPipeline = NewType("Pipeline", Any) +_ForwardPipelineComponent = NewType("PipelineComponent", Any) _ForwardService = NewType("Service", _ForwardPipelineComponent) -_ForwardServiceBuilder = NewType("ServiceBuilder", None) +_ForwardServiceBuilder = NewType("ServiceBuilder", Any) _ForwardServiceGroup = NewType("ServiceGroup", _ForwardPipelineComponent) -_ForwardComponentExtraHandler = NewType("_ComponentExtraHandler", None) +_ForwardComponentExtraHandler = NewType("_ComponentExtraHandler", Any) _ForwardProvider = NewType("ABCProvider", ABC) -_ForwardExtraHandlerFunction = NewType("ExtraHandlerFunction", None) +_ForwardExtraHandlerRuntimeInfo = NewType("ExtraHandlerRuntimeInfo", Any) @unique -class ComponentExecutionState(Enum): +class ComponentExecutionState(str, Enum): """ Enum, representing pipeline component execution state. These states are stored in `ctx.framework_keys[PIPELINE_STATE_KEY]`, @@ -38,14 +39,14 @@ class ComponentExecutionState(Enum): - FAILED: component execution failed. """ - NOT_RUN = auto() - RUNNING = auto() - FINISHED = auto() - FAILED = auto() + NOT_RUN = "NOT_RUN" + RUNNING = "RUNNING" + FINISHED = "FINISHED" + FAILED = "FAILED" @unique -class GlobalExtraHandlerType(Enum): +class GlobalExtraHandlerType(str, Enum): """ Enum, representing types of global wrappers, that can be set applied for a pipeline. The following types are supported: @@ -56,25 +57,26 @@ class GlobalExtraHandlerType(Enum): - AFTER_ALL: function called after each pipeline call. """ - BEFORE_ALL = auto() - BEFORE = auto() - AFTER = auto() - AFTER_ALL = auto() + BEFORE_ALL = "BEFORE_ALL" + BEFORE = "BEFORE" + AFTER = "AFTER" + AFTER_ALL = "AFTER_ALL" @unique -class ExtraHandlerType(Enum): +class ExtraHandlerType(str, Enum): """ - Enum, representing wrapper type, pre- or postprocessing. + Enum, representing wrapper execution stage: before or after the wrapped function. The following types are supported: - - PREPROCESSING: wrapper function called before component, - - POSTPROCESSING: wrapper function called after component. + - UNDEFINED: wrapper function with undetermined execution stage, + - BEFORE: wrapper function called before component, + - AFTER: wrapper function called after component. """ - UNDEFINED = auto() - BEFORE = auto() - AFTER = auto() + UNDEFINED = "UNDEFINED" + BEFORE = "BEFORE" + AFTER = "AFTER" PIPELINE_STATE_KEY = "PIPELINE" @@ -107,47 +109,43 @@ class ExtraHandlerType(Enum): """ -ServiceRuntimeInfo: TypeAlias = TypedDict( - "ServiceRuntimeInfo", - { - "name": str, - "path": str, - "timeout": Optional[float], - "asynchronous": bool, - "execution_state": Dict[str, ComponentExecutionState], - }, -) +class ServiceRuntimeInfo(BaseModel): + name: str + path: str + timeout: Optional[float] + asynchronous: bool + execution_state: Dict[str, ComponentExecutionState] + + """ -Type of dictionary, that is passed to components in runtime. +Type of object, that is passed to components in runtime. Contains current component info (`name`, `path`, `timeout`, `asynchronous`). Also contains `execution_state` - a dictionary, -containing other pipeline components execution stats mapped to their paths. +containing execution states of other components mapped to their paths. """ -ExtraHandlerRuntimeInfo: TypeAlias = TypedDict( - "ExtraHandlerRuntimeInfo", - { - "function": _ForwardExtraHandlerFunction, - "stage": ExtraHandlerType, - "component": ServiceRuntimeInfo, - }, -) +ExtraHandlerFunction: TypeAlias = Union[ + Callable[[Context], Any], + Callable[[Context, _ForwardPipeline], Any], + Callable[[Context, _ForwardPipeline, _ForwardExtraHandlerRuntimeInfo], Any], +] """ -Type of dictionary, that is passed to wrappers in runtime. -Contains current wrapper info (`name`, `stage`). -Also contains `component` - runtime info dictionary of the component this wrapper is attached to. +A function type for creating wrappers (before and after functions). +Can accept current dialog context, pipeline, and current wrapper info. """ -ExtraHandlerFunction: TypeAlias = Union[ - Callable[[Context], None], - Callable[[Context, _ForwardPipeline], None], - Callable[[Context, _ForwardPipeline, ExtraHandlerRuntimeInfo], None], -] +class ExtraHandlerRuntimeInfo(BaseModel): + func: ExtraHandlerFunction + stage: ExtraHandlerType + component: ServiceRuntimeInfo + + """ -A function type for creating wrappers (before and after functions). -Can accept current dialog context, pipeline, and current wrapper info dictionary. +Type of object, that is passed to wrappers in runtime. +Contains current wrapper info (`name`, `stage`). +Also contains `component` - runtime info of the component this wrapper is attached to. """ @@ -161,7 +159,7 @@ class ExtraHandlerType(Enum): ] """ A function type for creating service handlers. -Can accept current dialog context, pipeline, and current service info dictionary. +Can accept current dialog context, pipeline, and current service info. Can be both synchronous and asynchronous. """ diff --git a/dff/script/__init__.py b/dff/script/__init__.py index cd75b90bd..6c88b5461 100644 --- a/dff/script/__init__.py +++ b/dff/script/__init__.py @@ -18,7 +18,6 @@ normalize_transitions, normalize_response, normalize_processing, - normalize_keywords, normalize_script, ) from .core.script import Node, Script diff --git a/dff/script/core/keywords.py b/dff/script/core/keywords.py index 471d5834b..076577ae4 100644 --- a/dff/script/core/keywords.py +++ b/dff/script/core/keywords.py @@ -5,10 +5,10 @@ They are used to determine all nodes in the script and to assign python objects and python functions for nodes. """ -from enum import Enum, auto +from enum import Enum -class Keywords(Enum): +class Keywords(str, Enum): """ Keywords used to define the dialog script (:py:class:`~dff.script.Script`). The data type `dict` is used to describe the scenario. @@ -80,13 +80,14 @@ class Keywords(Enum): """ - GLOBAL = auto() - LOCAL = auto() - TRANSITIONS = auto() - RESPONSE = auto() - MISC = auto() - PRE_RESPONSE_PROCESSING = auto() - PRE_TRANSITIONS_PROCESSING = auto() + GLOBAL = "global" + LOCAL = "local" + TRANSITIONS = "transitions" + RESPONSE = "response" + MISC = "misc" + PRE_RESPONSE_PROCESSING = "pre_response_processing" + PRE_TRANSITIONS_PROCESSING = "pre_transitions_processing" + PROCESSING = "pre_transitions_processing" # Redefine shortcuts diff --git a/dff/script/core/normalization.py b/dff/script/core/normalization.py index 78eb69f79..9aeeaa79c 100644 --- a/dff/script/core/normalization.py +++ b/dff/script/core/normalization.py @@ -9,7 +9,7 @@ from typing import Union, Callable, Any, Dict, Optional, ForwardRef -from .keywords import GLOBAL, Keywords +from .keywords import Keywords from .context import Context from .types import NodeLabel3Type, NodeLabelType, ConditionType, LabelType from .message import Message @@ -21,7 +21,6 @@ Pipeline = ForwardRef("Pipeline") -@validate_arguments def normalize_label(label: NodeLabelType, default_flow_label: LabelType = "") -> Union[Callable, NodeLabel3Type]: """ The function that is used for normalization of @@ -43,6 +42,8 @@ def get_label_handler(ctx: Context, pipeline: Pipeline, *args, **kwargs) -> Node node = pipeline.script.get(flow_label, {}).get(node_label) if not node: raise Exception(f"Unknown transitions {new_label} for pipeline.script={pipeline.script}") + if node_label in [Keywords.LOCAL, Keywords.GLOBAL]: + raise Exception(f"Invalid transition: can't transition to {flow_label}:{node_label}") except Exception as exc: new_label = None logger.error(f"Exception {exc} of function {label}", exc_info=exc) @@ -61,7 +62,6 @@ def get_label_handler(ctx: Context, pipeline: Pipeline, *args, **kwargs) -> Node return (flow_label, label[1], label[2]) -@validate_arguments def normalize_condition(condition: ConditionType) -> Callable: """ The function that is used to normalize `condition` @@ -146,44 +146,6 @@ def processing_handler(ctx: Context, pipeline: Pipeline, *args, **kwargs) -> Con return processing_handler -@validate_arguments -def map_deprecated_key(key: str) -> str: - """ - This function is used to map deprecated keyword to new one. - - :param key: A keyword of a node. - :return: A mapped keyword of a node. - """ - if key == "processing": - logger.warning( - "Use the new key 'PRE_RESPONSE_PROCESSING instead of the deprecated key 'PROCESSING'," - " which will be removed in future versions." - ) - return "pre_response_processing" - return key - - -@validate_arguments -def normalize_keywords( - script: Dict[LabelType, Dict[LabelType, Dict[Keywords, Any]]] -) -> Dict[LabelType, Dict[LabelType, Dict[str, Any]]]: - """ - This function is used to normalize keywords in the script. - - :param script: :py:class:`.Script`, containing all transitions between states based in the keywords. - :return: :py:class:`.Script` with the normalized keywords. - """ - - script = { - flow_label: { - node_label: {map_deprecated_key(key.name.lower()): val for key, val in node.items()} - for node_label, node in flow.items() - } - for flow_label, flow in script.items() - } - return script - - @validate_arguments def normalize_script(script: Dict[LabelType, Any]) -> Dict[LabelType, Dict[LabelType, Dict[str, Any]]]: """ @@ -196,6 +158,6 @@ def normalize_script(script: Dict[LabelType, Any]) -> Dict[LabelType, Dict[Label :return: Normalized :py:class:`.Script`. """ if isinstance(script, dict): - if GLOBAL in script and all([isinstance(item, Keywords) for item in script[GLOBAL].keys()]): - script[GLOBAL] = {GLOBAL: script[GLOBAL]} - return normalize_keywords(script) + if Keywords.GLOBAL in script and all([isinstance(item, Keywords) for item in script[Keywords.GLOBAL].keys()]): + script[Keywords.GLOBAL] = {Keywords.GLOBAL: script[Keywords.GLOBAL]} + return script diff --git a/tests/script/core/test_normalization.py b/tests/script/core/test_normalization.py index 0f27077f3..e2df4b526 100644 --- a/tests/script/core/test_normalization.py +++ b/tests/script/core/test_normalization.py @@ -18,7 +18,6 @@ from dff.script import ( normalize_condition, - normalize_keywords, normalize_label, normalize_script, normalize_processing, @@ -113,16 +112,10 @@ def false_processing_func(ctx: Context, pipeline: Pipeline, *args, **kwargs) -> def test_normalize_keywords(): - # TODO: Add full check for functions - subtest_normalize_keywords(PRE_RESPONSE_PROCESSING) - - -def subtest_normalize_keywords(pre_response_processing): - # TODO: Add full check for functions node_template = { TRANSITIONS: {"node": std_func}, RESPONSE: Message(text="text"), - pre_response_processing: {1: std_func}, + PRE_RESPONSE_PROCESSING: {1: std_func}, PRE_TRANSITIONS_PROCESSING: {1: std_func}, MISC: {"key": "val"}, } @@ -134,7 +127,6 @@ def subtest_normalize_keywords(pre_response_processing): MISC.name.lower(): {"key": "val"}, } script = {"flow": {"node": node_template.copy()}} - script = normalize_keywords(script) assert isinstance(script, dict) assert script["flow"]["node"] == node_template_gold diff --git a/tutorials/pipeline/3_pipeline_dict_with_services_full.py b/tutorials/pipeline/3_pipeline_dict_with_services_full.py index b92f4bc22..e87b5db9f 100644 --- a/tutorials/pipeline/3_pipeline_dict_with_services_full.py +++ b/tutorials/pipeline/3_pipeline_dict_with_services_full.py @@ -93,7 +93,7 @@ def prepreprocess(ctx: Context): def preprocess(ctx: Context, _, info: ServiceRuntimeInfo): logger.info( f"another preprocession web-based annotator Service" - f"(defined as a callable), named '{info['name']}'" + f"(defined as a callable), named '{info.name}'" ) with urllib.request.urlopen("https://example.com/") as webpage: web_content = webpage.read().decode(webpage.headers.get_content_charset()) diff --git a/tutorials/pipeline/4_groups_and_conditions_basic.py b/tutorials/pipeline/4_groups_and_conditions_basic.py index 171aaa63b..4ef8cb6ae 100644 --- a/tutorials/pipeline/4_groups_and_conditions_basic.py +++ b/tutorials/pipeline/4_groups_and_conditions_basic.py @@ -72,16 +72,16 @@ # %% def always_running_service(_, __, info: ServiceRuntimeInfo): - logger.info(f"Service '{info['name']}' is running...") + logger.info(f"Service '{info.name}' is running...") def never_running_service(_, __, info: ServiceRuntimeInfo): - raise Exception(f"Oh no! The '{info['name']}' service is running!") + raise Exception(f"Oh no! The '{info.name}' service is running!") def runtime_info_printing_service(_, __, info: ServiceRuntimeInfo): logger.info( - f"Service '{info['name']}' runtime execution info:" + f"Service '{info.name}' runtime execution info:" f"{json.dumps(info, indent=4, default=str)}" ) diff --git a/tutorials/pipeline/4_groups_and_conditions_full.py b/tutorials/pipeline/4_groups_and_conditions_full.py index a6953aa8e..12d69bed1 100644 --- a/tutorials/pipeline/4_groups_and_conditions_full.py +++ b/tutorials/pipeline/4_groups_and_conditions_full.py @@ -7,7 +7,6 @@ # %% -import json import logging from dff.pipeline import ( @@ -142,17 +141,16 @@ # %% def simple_service(_, __, info: ServiceRuntimeInfo): - logger.info(f"Service '{info['name']}' is running...") + logger.info(f"Service '{info.name}' is running...") def never_running_service(_, __, info: ServiceRuntimeInfo): - raise Exception(f"Oh no! The '{info['name']}' service is running!") + raise Exception(f"Oh no! The '{info.name}' service is running!") def runtime_info_printing_service(_, __, info: ServiceRuntimeInfo): logger.info( - f"Service '{info['name']}' runtime execution info:" - f"{json.dumps(info, indent=4, default=str)}" + f"Service '{info.name}' runtime execution info:" f"{info.json(indent=4, default=str)}" ) diff --git a/tutorials/pipeline/5_asynchronous_groups_and_services_full.py b/tutorials/pipeline/5_asynchronous_groups_and_services_full.py index 44b5d96f7..970abecb1 100644 --- a/tutorials/pipeline/5_asynchronous_groups_and_services_full.py +++ b/tutorials/pipeline/5_asynchronous_groups_and_services_full.py @@ -78,7 +78,7 @@ # %% async def simple_asynchronous_service(_, __, info: ServiceRuntimeInfo): - logger.info(f"Service '{info['name']}' is running") + logger.info(f"Service '{info.name}' is running") async def time_consuming_service(_): @@ -101,7 +101,7 @@ async def web_querying_service(ctx: Context, _, info: ServiceRuntimeInfo): f":photo_number_{photo_number}": json.loads(web_content)["title"] } ) - logger.info(f"Service '{info['name']}' has completed HTTPS request") + logger.info(f"Service '{info.name}' has completed HTTPS request") return web_querying_service diff --git a/tutorials/pipeline/7_extra_handlers_basic.py b/tutorials/pipeline/7_extra_handlers_basic.py index 410632ce7..e9417979d 100644 --- a/tutorials/pipeline/7_extra_handlers_basic.py +++ b/tutorials/pipeline/7_extra_handlers_basic.py @@ -47,13 +47,11 @@ # %% def collect_timestamp_before(ctx: Context, _, info: ExtraHandlerRuntimeInfo): - ctx.misc.update({f"{info['component']['name']}": datetime.now()}) + ctx.misc.update({f"{info.component.name}": datetime.now()}) def collect_timestamp_after(ctx: Context, _, info: ExtraHandlerRuntimeInfo): - ctx.misc.update( - {f"{info['component']['name']}": datetime.now() - ctx.misc[f"{info['component']['name']}"]} - ) + ctx.misc.update({f"{info.component.name}": datetime.now() - ctx.misc[f"{info.component.name}"]}) async def heavy_service(_): diff --git a/tutorials/pipeline/7_extra_handlers_full.py b/tutorials/pipeline/7_extra_handlers_full.py index e7962efc4..341a9cf86 100644 --- a/tutorials/pipeline/7_extra_handlers_full.py +++ b/tutorials/pipeline/7_extra_handlers_full.py @@ -87,7 +87,7 @@ def get_extra_handler_misc_field( info: ExtraHandlerRuntimeInfo, postfix: str ) -> str: # This method calculates `misc` field name dedicated to extra handler # based on its and its service name - return f"{info['component']['name']}-{postfix}" + return f"{info.component.name}-{postfix}" def time_measure_before_handler(ctx, _, info): @@ -145,7 +145,7 @@ def heavy_service(ctx: Context): after_handler=[json_converter_after_handler], ) def logging_service(ctx: Context, _, info: ServiceRuntimeInfo): - str_misc = ctx.misc[f"{info['name']}-str"] + str_misc = ctx.misc[f"{info.name}-str"] assert isinstance(str_misc, str) print(f"Stringified misc: {str_misc}") diff --git a/tutorials/pipeline/8_extra_handlers_and_extensions.py b/tutorials/pipeline/8_extra_handlers_and_extensions.py index 1bb85be9f..f08902b2f 100644 --- a/tutorials/pipeline/8_extra_handlers_and_extensions.py +++ b/tutorials/pipeline/8_extra_handlers_and_extensions.py @@ -80,33 +80,33 @@ def before_all(_, __, info: ExtraHandlerRuntimeInfo): global start_times, pipeline_info now = datetime.now() pipeline_info = {"start_time": now} - start_times = {info["component"]["path"]: now} + start_times = {info.component.path: now} def before(_, __, info: ExtraHandlerRuntimeInfo): - start_times.update({info["component"]["path"]: datetime.now()}) + start_times.update({info.component.path: datetime.now()}) def after(_, __, info: ExtraHandlerRuntimeInfo): - start_time = start_times[info["component"]["path"]] + start_time = start_times[info.component.path] pipeline_info.update( { - f"{info['component']['path']}_duration": datetime.now() - start_time, - f"{info['component']['path']}_success": info["component"]["execution_state"].get( - info["component"]["path"], ComponentExecutionState.NOT_RUN.name + f"{info.component.path}_duration": datetime.now() - start_time, + f"{info.component.path}_success": info.component.execution_state.get( + info.component.path, ComponentExecutionState.NOT_RUN ), } ) def after_all(_, __, info: ExtraHandlerRuntimeInfo): - pipeline_info.update({"total_time": datetime.now() - start_times[info["component"]["path"]]}) + pipeline_info.update({"total_time": datetime.now() - start_times[info.component.path]}) logger.info(f"Pipeline stats: {json.dumps(pipeline_info, indent=4, default=str)}") async def long_service(_, __, info: ServiceRuntimeInfo): timeout = random.randint(0, 5) / 100 - logger.info(f"Service {info['name']} is going to sleep for {timeout} seconds.") + logger.info(f"Service {info.name} is going to sleep for {timeout} seconds.") await asyncio.sleep(timeout)