Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pydantic improvements #372

Merged
merged 87 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
dc01db6
most things done except components -> before + after handler, clarifi…
ZergLev Jul 10, 2024
ace4a5f
forgot to apply a few things
ZergLev Jul 10, 2024
c49fde9
formatted + partial self-review, feedback required
ZergLev Jul 10, 2024
957d231
formatted with poetry + minor changes
ZergLev Jul 10, 2024
f3cdd51
moved state checking to component, added single_func_init to Extra Ha…
ZergLev Jul 11, 2024
0e21a3e
field_validator added to Service, minor type changes
ZergLev Jul 12, 2024
9e5b688
Several changes and fixes. Tests compile now, but 21 of them fail.
ZergLev Jul 15, 2024
3d6e29f
fixed small mistake, tests still don't pass
ZergLev Jul 15, 2024
d92292b
formatted with poetry
ZergLev Jul 17, 2024
7ba3ac4
starting removal of pipeline.from_script()
ZergLev Jul 17, 2024
40484f0
a bunch of fixes to type annotations + actor's call signature fixed
ZergLev Jul 17, 2024
5df72bd
removed method call() from Actor so that it defaults to that of Pipel…
ZergLev Jul 17, 2024
0612023
fixed validation for ServiceGroups, Services and ExtraHandlers at Pip…
ZergLev Jul 18, 2024
1e67936
an error fixed, pipeline declaration conflicts found (they sent tuple…
ZergLev Jul 18, 2024
049f8f3
replaced from_script() with Pipeline() throughout the codebase, repla…
ZergLev Jul 22, 2024
c5413b8
minor changes and formatted with poetry
ZergLev Jul 22, 2024
66909f3
mistake fixed + fully removed from_script() and set_actor()
ZergLev Jul 22, 2024
9773e63
changed components into pre- and post-services for Pipeline initializ…
ZergLev Jul 22, 2024
baa1698
lint
ZergLev Jul 22, 2024
e33bb16
minor fix
ZergLev Jul 22, 2024
43de1db
minor mistake fixed
ZergLev Jul 22, 2024
fe78e12
found and fixed minor bug
ZergLev Jul 22, 2024
5753de9
removed to_service() and Optional[] where possible, also removed comp…
ZergLev Jul 24, 2024
58af2ca
replaced Pipeline's from_dict() with model_validate()
ZergLev Jul 24, 2024
5a1b19b
minor change
ZergLev Jul 25, 2024
72fb118
added to_service back in, not everywhere yet
ZergLev Jul 25, 2024
87ad935
mistake found, all tests pass except some of my own
ZergLev Jul 26, 2024
740b711
mistake fixed + strict validation for Pipeline
ZergLev Jul 26, 2024
f3b4b07
restored to_service everywhere, finished drafting new tests, some min…
ZergLev Jul 26, 2024
0e6e70d
self-review changes + comments changed
ZergLev Jul 26, 2024
154bbad
formatted with poetry
ZergLev Jul 26, 2024
2df499d
lint
ZergLev Jul 26, 2024
3a562b1
removed a few comments
ZergLev Jul 26, 2024
8fcf1aa
Polishing changes, new tests updated
ZergLev Jul 29, 2024
f0359a8
Pipeline tutorials updated
ZergLev Jul 29, 2024
6c7fa91
slight tutorial changes
ZergLev Jul 29, 2024
627e374
lint
ZergLev Jul 29, 2024
11fe640
fixing one-symbol mistake in docs
ZergLev Jul 29, 2024
5888888
Update chatsky/pipeline/pipeline/component.py
ZergLev Aug 6, 2024
40dc5b8
review changes started
ZergLev Aug 8, 2024
7691341
reverted redundant changes for Actor
ZergLev Aug 8, 2024
d6956ef
changing validation for ExtraHandlers
ZergLev Aug 8, 2024
cf6869a
testing different doc-style
ZergLev Aug 8, 2024
f0d5474
updated validation for ServiceGroup
ZergLev Aug 8, 2024
3f046ab
lint
ZergLev Aug 8, 2024
b343291
minor fix
ZergLev Aug 8, 2024
ed6f9e8
minor fix
ZergLev Aug 8, 2024
c78dd27
lint
ZergLev Aug 8, 2024
7b799bc
strict validation added for some classes
ZergLev Aug 8, 2024
dbde442
minor fix
ZergLev Aug 8, 2024
fc26daf
formatted with poetry
ZergLev Aug 8, 2024
ac66a68
review changes mostly done
ZergLev Aug 9, 2024
2388b2c
minor changes
ZergLev Aug 9, 2024
ba5c64a
lint
ZergLev Aug 9, 2024
f5e299c
minor fix
ZergLev Aug 9, 2024
472bba8
testing new docstrings format for PipelineComponent
ZergLev Aug 9, 2024
9a7abe9
added docstrings for remaining classes and switched to conventional d…
ZergLev Aug 9, 2024
5cc6717
minor docs fix
ZergLev Aug 9, 2024
2b04dfa
lint
ZergLev Aug 9, 2024
c5ca8ba
trying to inherit PipelineComponent fields in Service, ServiceGroup
ZergLev Aug 9, 2024
a5d3b33
fixing new autodocs
ZergLev Aug 9, 2024
314b9fc
checking if conf.py is the reason for it not working
ZergLev Aug 9, 2024
3130a82
updated the basic pipeline_from_dict tutorial for now
ZergLev Aug 12, 2024
e7f1506
trying to show inherited members in the docs except Pydantic.BaseMode…
ZergLev Aug 12, 2024
4a019a7
fixing docs
ZergLev Aug 12, 2024
75cb0b8
trying with undoc-members == True
ZergLev Aug 12, 2024
3922a03
reverting redundant change
ZergLev Aug 12, 2024
2aacdbb
trying out member-order == groupwise
ZergLev Aug 12, 2024
a4b55b2
reverted change for now, didn't work as needed
ZergLev Aug 12, 2024
5cf3dbc
testing a possible docs format
ZergLev Aug 12, 2024
2ff6752
trying to inherit only fields
ZergLev Aug 12, 2024
f026b9e
documentation format fixed, but review required
ZergLev Aug 12, 2024
9278dca
minor fix
ZergLev Aug 12, 2024
63a829d
Update chatsky/pipeline/pipeline/component.py
ZergLev Aug 14, 2024
33b8de6
doing review changes, type validation bug accidentally created
ZergLev Aug 14, 2024
1c2ef43
merged github commit into my branch
ZergLev Aug 14, 2024
0bab5ba
type validation bug fixed
ZergLev Aug 14, 2024
e6eae96
lint
ZergLev Aug 14, 2024
0f6839d
minor changes + added missing links
ZergLev Aug 15, 2024
a04756f
increased test coverage for PipelineComponent
ZergLev Aug 15, 2024
64c8761
added #pragma: no cover to remove redundant lines from coverage
ZergLev Aug 15, 2024
9d322a7
Revert "added #pragma: no cover to remove redundant lines from coverage"
RLKRo Aug 18, 2024
c3f3d95
small changes to validator tests
RLKRo Aug 18, 2024
b27961e
doc style: use `:raises:` directive and rephrase docstrings as commands
RLKRo Aug 18, 2024
de9eb83
hide some validators
RLKRo Aug 18, 2024
6934726
make validator nature clear
RLKRo Aug 18, 2024
fccfc5b
small doc change
RLKRo Aug 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion chatsky/__rebuild_pydantic_models__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# flake8: noqa: F401

from chatsky.pipeline import Pipeline
from chatsky.pipeline.types import ExtraHandlerRuntimeInfo
from chatsky.pipeline.types import ExtraHandlerRuntimeInfo, StartConditionCheckerFunction
from chatsky.script import Context, Script

Pipeline.model_rebuild()
Script.model_rebuild()
Context.model_rebuild()
ExtraHandlerRuntimeInfo.model_rebuild()
13 changes: 5 additions & 8 deletions chatsky/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
ExtraHandlerRuntimeInfo,
ExtraHandlerFunction,
ServiceFunction,
ExtraHandlerBuilder,
ServiceBuilder,
ServiceGroupBuilder,
PipelineBuilder,
)

from .pipeline.pipeline import Pipeline, ACTOR

from .service.extra import BeforeHandler, AfterHandler
from .service.group import ServiceGroup
from .service.extra import BeforeHandler, AfterHandler, ComponentExtraHandler
from .service.service import Service, to_service
from .service.group import ServiceGroup

from .pipeline.actor import Actor
from .pipeline.pipeline import Pipeline
132 changes: 82 additions & 50 deletions chatsky/pipeline/pipeline/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import logging
import asyncio
from typing import Union, Callable, Optional, Dict, List, TYPE_CHECKING
from pydantic import Field, model_validator
import copy

from chatsky.pipeline.pipeline.component import PipelineComponent
from chatsky.utils.turn_caching import cache_clear
from chatsky.script.core.types import ActorStage, NodeLabel2Type, NodeLabel3Type, LabelType
from chatsky.script.core.message import Message
Expand All @@ -45,58 +47,101 @@
from chatsky.pipeline.pipeline.pipeline import Pipeline


class Actor:
# Had to define this earlier, because when Pydantic starts it's __init__ it thinks of this function as
# being referenced before assignment
async def default_condition_handler(
condition: Callable, ctx: Context, pipeline: Pipeline
) -> Callable[[Context, Pipeline], bool]:
"""
The simplest and quickest condition handler for trivial condition handling returns the callable condition:

:param condition: Condition to copy.
:param ctx: Context of current condition.
:param pipeline: Pipeline we use in this condition.
"""
return await wrap_sync_function_in_async(condition, ctx, pipeline)


class Actor(PipelineComponent):
"""
The class which is used to process :py:class:`~chatsky.script.Context`
according to the :py:class:`~chatsky.script.Script`.
"""

script: Union[Script, Dict]
"""
The dialog scenario: a graph described by the :py:class:`.Keywords`.
While the graph is being initialized, it is validated and then used for the dialog.
"""
start_label: NodeLabel2Type
"""
The start node of :py:class:`~chatsky.script.Script`. The execution begins with it.
"""
fallback_label: Optional[NodeLabel2Type] = None
"""
The label of :py:class:`~chatsky.script.Script`.
Dialog comes into that label if all other transitions failed,
or there was an error while executing the scenario. Defaults to `None`.
"""
label_priority: float = 1.0
"""
Default priority value for all :py:const:`labels <chatsky.script.ConstLabel>`
where there is no priority. Defaults to `1.0`.
"""
condition_handler: Callable = Field(default=default_condition_handler)
"""
Handler that processes a call of condition functions. Defaults to `None`.
"""
handlers: Dict[ActorStage, List[Callable]] = Field(default_factory=dict)
"""
This variable is responsible for the usage of external handlers on
the certain stages of work of :py:class:`~chatsky.script.Actor`.

- key (:py:class:`~chatsky.script.ActorStage`) - Stage in which the handler is called.
- value (`List[Callable]`) - The list of called handlers for each stage. Defaults to an empty `dict`.

:param script: The dialog scenario: a graph described by the :py:class:`.Keywords`.
While the graph is being initialized, it is validated and then used for the dialog.
:param start_label: The start node of :py:class:`~chatsky.script.Script`. The execution begins with it.
:param fallback_label: The label of :py:class:`~chatsky.script.Script`.
Dialog comes into that label if all other transitions failed,
or there was an error while executing the scenario.
Defaults to `None`.
:param label_priority: Default priority value for all :py:const:`labels <chatsky.script.ConstLabel>`
where there is no priority. Defaults to `1.0`.
:param condition_handler: Handler that processes a call of condition functions. Defaults to `None`.
:param handlers: This variable is responsible for the usage of external handlers on
the certain stages of work of :py:class:`~chatsky.script.Actor`.

- key (:py:class:`~chatsky.script.ActorStage`) - Stage in which the handler is called.
- value (List[Callable]) - The list of called handlers for each stage. Defaults to an empty `dict`.
"""
# NB! The following API is highly experimental and may be removed at ANY time WITHOUT FURTHER NOTICE!!
_clean_turn_cache: bool = True

def __init__(
self,
script: Union[Script, dict],
start_label: NodeLabel2Type,
fallback_label: Optional[NodeLabel2Type] = None,
label_priority: float = 1.0,
condition_handler: Optional[Callable] = None,
handlers: Optional[Dict[ActorStage, List[Callable]]] = None,
):
self.script = script if isinstance(script, Script) else Script(script=script)
self.label_priority = label_priority

self.start_label = normalize_label(start_label)
@model_validator(mode="after")
def __tick_async_flag__(self):
self.calculated_async_flag = False
return self

@model_validator(mode="after")
def __start_label_validator__(self):
"""
Validate :py:data:`~.Actor.start_label`.

:raises ValueError: If `start_label` doesn't exist in the given :py:class:`~.Script`.
"""
if not isinstance(self.script, Script):
self.script = Script(script=self.script)
self.start_label = normalize_label(self.start_label)
if self.script.get(self.start_label[0], {}).get(self.start_label[1]) is None:
raise ValueError(f"Unknown start_label={self.start_label}")
return self

if fallback_label is None:
@model_validator(mode="after")
def __fallback_label_validator__(self):
"""
Validate :py:data:`~.Actor.fallback_label`.
:raises ValueError: If `fallback_label` doesn't exist in the given :py:class:`~.Script`.
"""
if self.fallback_label is None:
self.fallback_label = self.start_label
else:
self.fallback_label = normalize_label(fallback_label)
self.fallback_label = normalize_label(self.fallback_label)
if self.script.get(self.fallback_label[0], {}).get(self.fallback_label[1]) is None:
raise ValueError(f"Unknown fallback_label={self.fallback_label}")
ZergLev marked this conversation as resolved.
Show resolved Hide resolved
self.condition_handler = default_condition_handler if condition_handler is None else condition_handler

self.handlers = {} if handlers is None else handlers
return self

# NB! The following API is highly experimental and may be removed at ANY time WITHOUT FURTHER NOTICE!!
self._clean_turn_cache = True
@property
def computed_name(self) -> str:
return "actor"

async def __call__(self, pipeline: Pipeline, ctx: Context):
async def run_component(self, ctx: Context, pipeline: Pipeline) -> None:
await self._run_handlers(ctx, pipeline, ActorStage.CONTEXT_INIT)

# get previous node
Expand Down Expand Up @@ -364,16 +409,3 @@ def _choose_label(
else:
chosen_label = self.fallback_label
return chosen_label


async def default_condition_handler(
condition: Callable, ctx: Context, pipeline: Pipeline
) -> Callable[[Context, Pipeline], bool]:
"""
The simplest and quickest condition handler for trivial condition handling returns the callable condition:

:param condition: Condition to copy.
:param ctx: Context of current condition.
:param pipeline: Pipeline we use in this condition.
"""
return await wrap_sync_function_in_async(condition, ctx, pipeline)
153 changes: 90 additions & 63 deletions chatsky/pipeline/pipeline/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import abc
import asyncio
from typing import Optional, Awaitable, TYPE_CHECKING
from pydantic import BaseModel, Field, model_validator

from chatsky.script import Context

Expand All @@ -26,84 +27,79 @@
GlobalExtraHandlerType,
ExtraHandlerFunction,
ExtraHandlerType,
ExtraHandlerBuilder,
)
from ...utils.devel import wrap_sync_function_in_async

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
from chatsky.pipeline.pipeline.pipeline import Pipeline


class PipelineComponent(abc.ABC):
class PipelineComponent(abc.ABC, BaseModel, extra="forbid", arbitrary_types_allowed=True):
ZergLev marked this conversation as resolved.
Show resolved Hide resolved
"""
This class represents a pipeline component, which is a service or a service group.
It contains some fields that they have in common.
"""

before_handler: BeforeHandler = Field(default_factory=BeforeHandler)
RLKRo marked this conversation as resolved.
Show resolved Hide resolved
"""
:py:class:`~.BeforeHandler`, associated with this component.
"""
after_handler: AfterHandler = Field(default_factory=AfterHandler)
"""
:py:class:`~.AfterHandler`, associated with this component.
"""
timeout: Optional[float] = None
"""
(for asynchronous only!) Maximum component execution time (in seconds),
if it exceeds this time, it is interrupted.
"""
requested_async_flag: Optional[bool] = None
"""
Requested asynchronous property; if not defined,
:py:attr:`~.PipelineComponent.calculated_async_flag` is used instead.
"""
calculated_async_flag: bool = False
"""
Whether the component can be asynchronous or not.

:param before_handler: :py:class:`~.BeforeHandler`, associated with this component.
:type before_handler: Optional[:py:data:`~.ExtraHandlerBuilder`]
:param after_handler: :py:class:`~.AfterHandler`, associated with this component.
:type after_handler: Optional[:py:data:`~.ExtraHandlerBuilder`]
:param timeout: (for asynchronous only!) Maximum component execution time (in seconds),
if it exceeds this time, it is interrupted.
:param requested_async_flag: Requested asynchronous property;
if not defined, `calculated_async_flag` is used instead.
:param calculated_async_flag: Whether the component can be asynchronous or not
1) for :py:class:`~.pipeline.service.service.Service`: whether its `handler` is asynchronous or not,
2) for :py:class:`~.pipeline.service.group.ServiceGroup`: whether all its `services` are asynchronous or not.
1) for :py:class:`~.pipeline.service.service.Service`: whether its `handler` is asynchronous or not,
2) for :py:class:`~.pipeline.service.group.ServiceGroup`: whether all its `services` are asynchronous or not.

:param start_condition: StartConditionCheckerFunction that is invoked before each component execution;
component is executed only if it returns `True`.
:type start_condition: Optional[:py:data:`~.StartConditionCheckerFunction`]
:param name: Component name (should be unique in single :py:class:`~.pipeline.service.group.ServiceGroup`),
should not be blank or contain `.` symbol.
:param path: Separated by dots path to component, is universally unique.
"""
start_condition: StartConditionCheckerFunction = Field(default=always_start_condition)
"""
:py:class:`~.pipeline.types.StartConditionCheckerFunction` that is invoked before each component execution;
component is executed only if it returns `True`.
"""
name: Optional[str] = None
"""
Component name (should be unique in single :py:class:`~.pipeline.service.group.ServiceGroup`),
should not be blank or contain the ``.`` character.
"""
path: Optional[str] = None
"""
Separated by dots path to component, is universally unique.
"""

def __init__(
self,
before_handler: Optional[ExtraHandlerBuilder] = None,
after_handler: Optional[ExtraHandlerBuilder] = None,
timeout: Optional[float] = None,
requested_async_flag: Optional[bool] = None,
calculated_async_flag: bool = False,
start_condition: Optional[StartConditionCheckerFunction] = None,
name: Optional[str] = None,
path: Optional[str] = None,
):
self.timeout = timeout
"""
Maximum component execution time (in seconds),
if it exceeds this time, it is interrupted (for asynchronous only!).
"""
self.requested_async_flag = requested_async_flag
"""Requested asynchronous property; if not defined, :py:attr:`~requested_async_flag` is used instead."""
self.calculated_async_flag = calculated_async_flag
"""Calculated asynchronous property, whether the component can be asynchronous or not."""
self.start_condition = always_start_condition if start_condition is None else start_condition
"""
Component start condition that is invoked before each component execution;
component is executed only if it returns `True`.
@model_validator(mode="after")
def __pipeline_component_validator__(self):
"""
self.name = name
"""
Component name (should be unique in single :py:class:`~pipeline.service.group.ServiceGroup`),
should not be blank or contain '.' symbol.
"""
self.path = path
"""
Dot-separated path to component (is universally unique).
This attribute is set in :py:func:`~chatsky.pipeline.pipeline.utils.finalize_service_group`.
"""

self.before_handler = BeforeHandler([] if before_handler is None else before_handler)
self.after_handler = AfterHandler([] if after_handler is None else after_handler)
Validate this component.

if name is not None and (name == "" or "." in name):
raise Exception(f"User defined service name shouldn't be blank or contain '.' (service: {name})!")
:raises ValueError: If component's name is blank or if it contains dots.
:raises Exception: In case component can't be async, but was requested to be.
"""
if self.name is not None:
if self.name == "":
raise ValueError("Name cannot be blank.")
if "." in self.name:
raise ValueError(f"Name cannot contain '.': {self.name!r}.")

if not calculated_async_flag and requested_async_flag:
raise Exception(f"{type(self).__name__} '{name}' can't be asynchronous!")
if not self.calculated_async_flag and self.requested_async_flag:
raise Exception(f"{type(self).__name__} '{self.name}' can't be asynchronous!")
return self

def _set_state(self, ctx: Context, value: ComponentExecutionState):
"""
Expand Down Expand Up @@ -160,16 +156,47 @@ async def run_extra_handler(self, stage: ExtraHandlerType, ctx: Context, pipelin
logger.warning(f"{type(self).__name__} '{self.name}' {extra_handler.stage} extra handler timed out!")

@abc.abstractmethod
async def _run(self, ctx: Context, pipeline: Pipeline) -> None:
async def run_component(self, ctx: Context, pipeline: Pipeline) -> Optional[ComponentExecutionState]:
"""
A method for running pipeline component, it is overridden in all its children.
This method is run after the component's timeout is set (if needed).
Run this component.

:param ctx: Current dialog :py:class:`~.Context`.
:param pipeline: This :py:class:`~.Pipeline`.
"""
raise NotImplementedError
ZergLev marked this conversation as resolved.
Show resolved Hide resolved

@property
def computed_name(self) -> str:
"""
Default name that is used if :py:attr:`~.PipelineComponent.name` is not defined.
In case two components in a :py:class:`~.ServiceGroup` have the same
:py:attr:`~.PipelineComponent.computed_name` an incrementing number is appended to the name.
"""
return "noname_service"

async def _run(self, ctx: Context, pipeline: Pipeline) -> None:
"""
A method for running a pipeline component. Executes extra handlers before and after execution,
launches `run_component` method. This method is run after the component's timeout is set (if needed).

:param ctx: Current dialog :py:class:`~.Context`.
:param pipeline: This :py:class:`~.Pipeline`.
"""
try:
if await wrap_sync_function_in_async(self.start_condition, ctx, pipeline):
await self.run_extra_handler(ExtraHandlerType.BEFORE, ctx, pipeline)

self._set_state(ctx, ComponentExecutionState.RUNNING)
if await self.run_component(ctx, pipeline) is not ComponentExecutionState.FAILED:
self._set_state(ctx, ComponentExecutionState.FINISHED)
ZergLev marked this conversation as resolved.
Show resolved Hide resolved

await self.run_extra_handler(ExtraHandlerType.AFTER, ctx, pipeline)
else:
self._set_state(ctx, ComponentExecutionState.NOT_RUN)
except Exception as exc:
self._set_state(ctx, ComponentExecutionState.FAILED)
logger.error(f"Service '{self.name}' execution failed!", exc_info=exc)

async def __call__(self, ctx: Context, pipeline: Pipeline) -> Optional[Awaitable]:
"""
A method for calling pipeline components.
Expand Down
Loading
Loading