Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/runtime info class #146

Merged
merged 4 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions dff/pipeline/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ def service_successful_condition(path: Optional[str] = None) -> StartConditionCh
"""

def check_service_state(ctx: Context, _: Pipeline):
state = ctx.framework_states[PIPELINE_STATE_KEY].get(path, ComponentExecutionState.NOT_RUN.name)
state = ctx.framework_states[PIPELINE_STATE_KEY].get(path, ComponentExecutionState.NOT_RUN)
return ComponentExecutionState[state] == ComponentExecutionState.FINISHED

return check_service_state


def not_condition(function: StartConditionCheckerFunction) -> StartConditionCheckerFunction:
def not_condition(func: StartConditionCheckerFunction) -> StartConditionCheckerFunction:
"""
Condition that returns opposite boolean value to the one returned by incoming function.
Returns :py:data:`~.StartConditionCheckerFunction`.

:param function: The function to return opposite of.
:param func: The function to return opposite of.
"""

def not_function(ctx: Context, pipeline: Pipeline):
return not function(ctx, pipeline)
return not func(ctx, pipeline)

return not_function

Expand All @@ -70,7 +70,7 @@ def aggregate_condition(
"""

def aggregation_function(ctx: Context, pipeline: Pipeline):
return aggregator([function(ctx, pipeline) for function in functions])
return aggregator([func(ctx, pipeline) for func in functions])

return aggregation_function

Expand Down
26 changes: 12 additions & 14 deletions dff/pipeline/pipeline/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def _set_state(self, ctx: Context, value: ComponentExecutionState):
"""
if PIPELINE_STATE_KEY not in ctx.framework_states:
ctx.framework_states[PIPELINE_STATE_KEY] = {}
ctx.framework_states[PIPELINE_STATE_KEY][self.path] = value.name
ctx.framework_states[PIPELINE_STATE_KEY][self.path] = value

def get_state(self, ctx: Context, default: Optional[ComponentExecutionState] = None) -> ComponentExecutionState:
"""
Expand All @@ -127,9 +127,7 @@ def get_state(self, ctx: Context, default: Optional[ComponentExecutionState] = N
(usually it's :py:attr:`~.pipeline.types.ComponentExecutionState.NOT_RUN`).
:return: :py:class:`~pipeline.types.ComponentExecutionState` of this service or default if not found.
"""
return ComponentExecutionState[
ctx.framework_states[PIPELINE_STATE_KEY].get(self.path, default if default is not None else None)
]
return ctx.framework_states[PIPELINE_STATE_KEY].get(self.path, default if default is not None else None)

@property
def asynchronous(self) -> bool:
Expand Down Expand Up @@ -162,7 +160,7 @@ async def run_extra_handler(self, stage: ExtraHandlerType, ctx: Context, pipelin
if extra_handler.asynchronous and isinstance(extra_handler_result, Awaitable):
await extra_handler_result
except asyncio.TimeoutError:
logger.warning(f"{type(self).__name__} '{self.name}' {extra_handler.stage.name} extra handler timed out!")
logger.warning(f"{type(self).__name__} '{self.name}' {extra_handler.stage} extra handler timed out!")

@abc.abstractmethod
async def _run(self, ctx: Context, pipeline: Optional[Pipeline] = None) -> Optional[Context]:
Expand Down Expand Up @@ -213,15 +211,15 @@ def _get_runtime_info(self, ctx: Context) -> ServiceRuntimeInfo:

:param ctx: Current dialog :py:class:`~.Context`.
:return: :py:class:`~.dff.script.typing.ServiceRuntimeInfo`
dict where all not set fields are replaced with `[None]`.
"""
return {
"name": self.name if self.name is not None else "[None]",
"path": self.path if self.path is not None else "[None]",
"timeout": self.timeout,
"asynchronous": self.asynchronous,
"execution_state": copy.deepcopy(ctx.framework_states[PIPELINE_STATE_KEY]),
}
object where all not set fields are replaced with `[None]`.
"""
return ServiceRuntimeInfo(
name=self.name if self.name is not None else "[None]",
path=self.path if self.path is not None else "[None]",
timeout=self.timeout,
asynchronous=self.asynchronous,
execution_state=copy.deepcopy(ctx.framework_states[PIPELINE_STATE_KEY]),
)

@property
def info_dict(self) -> dict:
Expand Down
36 changes: 15 additions & 21 deletions dff/pipeline/service/extra.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(
self.functions = functions
self.timeout = timeout
self.requested_async_flag = asynchronous
self.calculated_async_flag = all([asyncio.iscoroutinefunction(function) for function in self.functions])
self.calculated_async_flag = all([asyncio.iscoroutinefunction(func) for func in self.functions])
self.stage = stage
else:
raise Exception(f"Unknown type for {type(self).__name__} {functions}")
Expand All @@ -93,24 +93,20 @@ def asynchronous(self) -> bool:
return self.calculated_async_flag if self.requested_async_flag is None else self.requested_async_flag

async def _run_function(
self, function: ExtraHandlerFunction, ctx: Context, pipeline: Pipeline, component_info: ServiceRuntimeInfo
self, func: ExtraHandlerFunction, ctx: Context, pipeline: Pipeline, component_info: ServiceRuntimeInfo
):
handler_params = len(inspect.signature(function).parameters)
handler_params = len(inspect.signature(func).parameters)
if handler_params == 1:
await wrap_sync_function_in_async(function, ctx)
await wrap_sync_function_in_async(func, ctx)
elif handler_params == 2:
await wrap_sync_function_in_async(function, ctx, pipeline)
await wrap_sync_function_in_async(func, ctx, pipeline)
elif handler_params == 3:
extra_handler_runtime_info: ExtraHandlerRuntimeInfo = {
"function": function,
"stage": self.stage,
"component": component_info,
}
await wrap_sync_function_in_async(function, ctx, pipeline, extra_handler_runtime_info)
extra_handler_runtime_info = ExtraHandlerRuntimeInfo(func=func, stage=self.stage, component=component_info)
await wrap_sync_function_in_async(func, ctx, pipeline, extra_handler_runtime_info)
else:
raise Exception(
f"Too many parameters required for component {component_info['name']} {self.stage.name}"
f" wrapper handler '{function.__name__}': {handler_params}!"
f"Too many parameters required for component {component_info.name} {self.stage}"
f" wrapper handler '{func.__name__}': {handler_params}!"
)

async def _run(self, ctx: Context, pipeline: Pipeline, component_info: ServiceRuntimeInfo):
Expand All @@ -126,18 +122,16 @@ async def _run(self, ctx: Context, pipeline: Pipeline, component_info: ServiceRu
"""

if self.asynchronous:
futures = [self._run_function(function, ctx, pipeline, component_info) for function in self.functions]
for function, future in zip(self.functions, asyncio.as_completed(futures)):
futures = [self._run_function(func, ctx, pipeline, component_info) for func in self.functions]
for func, future in zip(self.functions, asyncio.as_completed(futures)):
try:
await future
except asyncio.TimeoutError:
logger.warning(
f"Component {component_info['name']} {self.stage.name} wrapper '{function.__name__}' timed out!"
)
logger.warning(f"Component {component_info.name} {self.stage} wrapper '{func.__name__}' timed out!")

else:
for function in self.functions:
await self._run_function(function, ctx, pipeline, component_info)
for func in self.functions:
await self._run_function(func, ctx, pipeline, component_info)

async def __call__(self, ctx: Context, pipeline: Pipeline, component_info: ServiceRuntimeInfo):
"""
Expand Down Expand Up @@ -167,7 +161,7 @@ def info_dict(self) -> dict:
"type": type(self).__name__,
"timeout": self.timeout,
"asynchronous": self.asynchronous,
"functions": [function.__name__ for function in self.functions],
"functions": [func.__name__ for func in self.functions],
}


Expand Down
2 changes: 1 addition & 1 deletion dff/pipeline/service/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def _create_components(services: ServiceGroupBuilder) -> List[Union[Service, "Se
:type services: :py:data:`~.ServiceGroupBuilder`
:return: List of services and service groups.
"""
handled_services = []
handled_services: List[Union[Service, "ServiceGroup"]] = []
for service in services:
if isinstance(service, List) or isinstance(service, ServiceGroup):
handled_services.append(ServiceGroup(service))
Expand Down
1 change: 1 addition & 0 deletions dff/pipeline/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ async def _run(self, ctx: Context, pipeline: Optional[Pipeline] = None) -> Optio

if isinstance(self.handler, str) and self.handler == "ACTOR":
return ctx
return None

@property
def info_dict(self) -> dict:
Expand Down
12 changes: 6 additions & 6 deletions dff/pipeline/service/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@
from typing import Callable, Any, Optional, Tuple, Mapping


async def wrap_sync_function_in_async(function: Callable, *args, **kwargs) -> Any:
async def wrap_sync_function_in_async(func: Callable, *args, **kwargs) -> Any:
"""
Utility function, that wraps both functions and coroutines in coroutines.
Invokes `function` if it is just a callable and awaits, if this is a coroutine.
Invokes `func` if it is just a callable and awaits, if this is a coroutine.

:param function: Callable to wrap.
:param func: Callable to wrap.
:param \\*args: Function args.
:param \\**kwargs: Function kwargs.
:return: What function returns.
"""
if asyncio.iscoroutinefunction(function):
return await function(*args, **kwargs)
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return function(*args, **kwargs)
return func(*args, **kwargs)


def _get_attrs_with_updates(
Expand Down
108 changes: 53 additions & 55 deletions dff/pipeline/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,27 @@
data structures, and other types that are defined for type hinting.
"""
from abc import ABC
from enum import unique, Enum, auto
from typing import Callable, Union, Awaitable, Dict, List, Optional, NewType, Iterable
from enum import unique, Enum
from typing import Callable, Union, Awaitable, Dict, List, Optional, NewType, Iterable, Any

from dff.context_storages import DBContextStorage
from dff.script import Context, ActorStage, NodeLabel2Type, Script
from typing_extensions import NotRequired, TypedDict, TypeAlias
from pydantic import BaseModel


_ForwardPipeline = NewType("Pipeline", None)
_ForwardPipelineComponent = NewType("PipelineComponent", None)
_ForwardPipeline = NewType("Pipeline", Any)
_ForwardPipelineComponent = NewType("PipelineComponent", Any)
_ForwardService = NewType("Service", _ForwardPipelineComponent)
_ForwardServiceBuilder = NewType("ServiceBuilder", None)
_ForwardServiceBuilder = NewType("ServiceBuilder", Any)
_ForwardServiceGroup = NewType("ServiceGroup", _ForwardPipelineComponent)
_ForwardComponentExtraHandler = NewType("_ComponentExtraHandler", None)
_ForwardComponentExtraHandler = NewType("_ComponentExtraHandler", Any)
_ForwardProvider = NewType("ABCProvider", ABC)
_ForwardExtraHandlerFunction = NewType("ExtraHandlerFunction", None)
_ForwardExtraHandlerRuntimeInfo = NewType("ExtraHandlerRuntimeInfo", Any)


@unique
class ComponentExecutionState(Enum):
class ComponentExecutionState(str, Enum):
"""
Enum, representing pipeline component execution state.
These states are stored in `ctx.framework_keys[PIPELINE_STATE_KEY]`,
Expand All @@ -38,14 +39,14 @@ class ComponentExecutionState(Enum):
- FAILED: component execution failed.
"""

NOT_RUN = auto()
RUNNING = auto()
FINISHED = auto()
FAILED = auto()
NOT_RUN = "NOT_RUN"
RUNNING = "RUNNING"
FINISHED = "FINISHED"
FAILED = "FAILED"


@unique
class GlobalExtraHandlerType(Enum):
class GlobalExtraHandlerType(str, Enum):
"""
Enum, representing types of global wrappers, that can be set applied for a pipeline.
The following types are supported:
Expand All @@ -56,25 +57,26 @@ class GlobalExtraHandlerType(Enum):
- AFTER_ALL: function called after each pipeline call.
"""

BEFORE_ALL = auto()
BEFORE = auto()
AFTER = auto()
AFTER_ALL = auto()
BEFORE_ALL = "BEFORE_ALL"
BEFORE = "BEFORE"
AFTER = "AFTER"
AFTER_ALL = "AFTER_ALL"


@unique
class ExtraHandlerType(Enum):
class ExtraHandlerType(str, Enum):
"""
Enum, representing wrapper type, pre- or postprocessing.
Enum, representing wrapper execution stage: before or after the wrapped function.
The following types are supported:

- PREPROCESSING: wrapper function called before component,
- POSTPROCESSING: wrapper function called after component.
- UNDEFINED: wrapper function with undetermined execution stage,
- BEFORE: wrapper function called before component,
- AFTER: wrapper function called after component.
"""

UNDEFINED = auto()
BEFORE = auto()
AFTER = auto()
UNDEFINED = "UNDEFINED"
BEFORE = "BEFORE"
AFTER = "AFTER"


PIPELINE_STATE_KEY = "PIPELINE"
Expand Down Expand Up @@ -107,47 +109,43 @@ class ExtraHandlerType(Enum):
"""


ServiceRuntimeInfo: TypeAlias = TypedDict(
"ServiceRuntimeInfo",
{
"name": str,
"path": str,
"timeout": Optional[float],
"asynchronous": bool,
"execution_state": Dict[str, ComponentExecutionState],
},
)
class ServiceRuntimeInfo(BaseModel):
name: str
path: str
timeout: Optional[float]
asynchronous: bool
execution_state: Dict[str, ComponentExecutionState]


"""
Type of dictionary, that is passed to components in runtime.
Type of object, that is passed to components in runtime.
Contains current component info (`name`, `path`, `timeout`, `asynchronous`).
Also contains `execution_state` - a dictionary,
containing other pipeline components execution stats mapped to their paths.
containing execution states of other components mapped to their paths.
"""


ExtraHandlerRuntimeInfo: TypeAlias = TypedDict(
"ExtraHandlerRuntimeInfo",
{
"function": _ForwardExtraHandlerFunction,
"stage": ExtraHandlerType,
"component": ServiceRuntimeInfo,
},
)
ExtraHandlerFunction: TypeAlias = Union[
Callable[[Context], Any],
Callable[[Context, _ForwardPipeline], Any],
Callable[[Context, _ForwardPipeline, _ForwardExtraHandlerRuntimeInfo], Any],
]
"""
Type of dictionary, that is passed to wrappers in runtime.
Contains current wrapper info (`name`, `stage`).
Also contains `component` - runtime info dictionary of the component this wrapper is attached to.
A function type for creating wrappers (before and after functions).
Can accept current dialog context, pipeline, and current wrapper info.
"""


ExtraHandlerFunction: TypeAlias = Union[
Callable[[Context], None],
Callable[[Context, _ForwardPipeline], None],
Callable[[Context, _ForwardPipeline, ExtraHandlerRuntimeInfo], None],
]
class ExtraHandlerRuntimeInfo(BaseModel):
func: ExtraHandlerFunction
stage: ExtraHandlerType
component: ServiceRuntimeInfo


"""
A function type for creating wrappers (before and after functions).
Can accept current dialog context, pipeline, and current wrapper info dictionary.
Type of object, that is passed to wrappers in runtime.
Contains current wrapper info (`name`, `stage`).
Also contains `component` - runtime info of the component this wrapper is attached to.
"""


Expand All @@ -161,7 +159,7 @@ class ExtraHandlerType(Enum):
]
"""
A function type for creating service handlers.
Can accept current dialog context, pipeline, and current service info dictionary.
Can accept current dialog context, pipeline, and current service info.
Can be both synchronous and asynchronous.
"""

Expand Down
1 change: 0 additions & 1 deletion dff/script/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
normalize_transitions,
normalize_response,
normalize_processing,
normalize_keywords,
normalize_script,
)
from .core.script import Node, Script
Expand Down
Loading