Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: wrap zeebe responses, return dataclass in zeebe adapter #443

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,4 @@ Table Of Contents
Channels <channels>
Decorators <decorators>
Exceptions <errors>
Zeebe Adapter <zeebe_adapter>
8 changes: 8 additions & 0 deletions docs/zeebe_adapter.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
=============
Zeebe Adapter
=============

.. toctree::
:name: zeebe_adapter

Reference <zeebe_adapter_reference>
56 changes: 56 additions & 0 deletions docs/zeebe_adapter_reference.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
==========================
Zeebe Adapter Reference
==========================

.. autoclass:: pyzeebe.grpc_internals.zeebe_adapter.ZeebeAdapter
:members:
:undoc-members:

==========================
Zeebe GRPC Responses
==========================

.. autoclass:: pyzeebe.grpc_internals.types.CreateProcessInstanceResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.CreateProcessInstanceWithResultResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.CancelProcessInstanceResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.DeployProcessResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.DeployResourceResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.PublishMessageResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.CompleteJobResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.FailJobResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.ThrowErrorResponse
:members:
:undoc-members:
:member-order: bysource
50 changes: 34 additions & 16 deletions pyzeebe/client/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
from typing import Any, Dict, Iterable, Optional, Tuple
from typing import Iterable, Optional

import grpc
from typing_extensions import deprecated

from pyzeebe.grpc_internals.types import (
CancelProcessInstanceResponse,
CreateProcessInstanceResponse,
CreateProcessInstanceWithResultResponse,
DeployProcessResponse,
DeployResourceResponse,
PublishMessageResponse,
)
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter
from pyzeebe.types import Variables

Expand All @@ -25,7 +33,7 @@ async def run_process(
variables: Optional[Variables] = None,
version: int = -1,
tenant_id: Optional[str] = None,
) -> int:
) -> CreateProcessInstanceResponse:
"""
Run process

Expand All @@ -36,7 +44,7 @@ async def run_process(
tenant_id (str): The tenant ID of the process definition. New in Zeebe 8.3.

Returns:
int: process_instance_key, the unique id of the running process generated by Zeebe.
CreateProcessInstanceResponse: response from Zeebe.

Raises:
ProcessDefinitionNotFoundError: No process with bpmn_process_id exists
Expand All @@ -60,7 +68,7 @@ async def run_process_with_result(
timeout: int = 0,
variables_to_fetch: Optional[Iterable[str]] = None,
tenant_id: Optional[str] = None,
) -> Tuple[int, Dict[str, Any]]:
) -> CreateProcessInstanceWithResultResponse:
"""
Run process and wait for the result.

Expand All @@ -73,7 +81,7 @@ async def run_process_with_result(
tenant_id (str): The tenant ID of the process definition. New in Zeebe 8.3.

Returns:
tuple: (The process instance key, A dictionary of the end state of the process instance)
CreateProcessInstanceWithResultResponse: response from Zeebe.

Raises:
ProcessDefinitionNotFoundError: No process with bpmn_process_id exists
Expand All @@ -95,15 +103,15 @@ async def run_process_with_result(
tenant_id=tenant_id,
)

async def cancel_process_instance(self, process_instance_key: int) -> int:
async def cancel_process_instance(self, process_instance_key: int) -> CancelProcessInstanceResponse:
"""
Cancel a running process instance

Args:
process_instance_key (int): The key of the running process to cancel

Returns:
int: The process_instance_key
CancelProcessInstanceResponse: response from Zeebe.

Raises:
ProcessInstanceNotFoundError: If no process instance with process_instance_key exists
Expand All @@ -113,17 +121,19 @@ async def cancel_process_instance(self, process_instance_key: int) -> int:
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code

"""
await self.zeebe_adapter.cancel_process_instance(process_instance_key=process_instance_key)
return process_instance_key
return await self.zeebe_adapter.cancel_process_instance(process_instance_key=process_instance_key)

@deprecated("Deprecated since Zeebe 8.0. Use deploy_resource instead")
async def deploy_process(self, *process_file_path: str) -> None:
async def deploy_process(self, *process_file_path: str) -> DeployProcessResponse:
"""
Deploy one or more processes

Args:
process_file_path (str): The file path to a process definition file (bpmn/yaml)

Returns:
DeployProcessResponse: response from Zeebe.

Raises:
ProcessInvalidError: If one of the process file definitions is invalid
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
Expand All @@ -132,9 +142,11 @@ async def deploy_process(self, *process_file_path: str) -> None:
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code

"""
await self.zeebe_adapter.deploy_process(*process_file_path)
return await self.zeebe_adapter.deploy_process(*process_file_path)

async def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[str] = None) -> None:
async def deploy_resource(
self, *resource_file_path: str, tenant_id: Optional[str] = None
) -> DeployResourceResponse:
"""
Deploy one or more processes

Expand All @@ -144,6 +156,9 @@ async def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[st
resource_file_path (str): The file path to a resource definition file (bpmn/dmn/form)
tenant_id (str): The tenant ID of the resources to deploy. New in Zeebe 8.3.

Returns:
DeployResourceResponse: response from Zeebe.

Raises:
ProcessInvalidError: If one of the process file definitions is invalid
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
Expand All @@ -152,7 +167,7 @@ async def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[st
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code

"""
await self.zeebe_adapter.deploy_resource(*resource_file_path, tenant_id=tenant_id)
return await self.zeebe_adapter.deploy_resource(*resource_file_path, tenant_id=tenant_id)

async def publish_message(
self,
Expand All @@ -162,7 +177,7 @@ async def publish_message(
time_to_live_in_milliseconds: int = 60000,
message_id: Optional[str] = None,
tenant_id: Optional[str] = None,
) -> None:
) -> PublishMessageResponse:
"""
Publish a message

Expand All @@ -175,15 +190,18 @@ async def publish_message(
active, a MessageAlreadyExists will be raised.
tenant_id (str): The tenant ID of the message. New in Zeebe 8.3.

Returns:
PublishMessageResponse: response from Zeebe.

Raises:
MessageAlreadyExistError: If a message with message_id already exists
MessageAlreadyExistsError: If a message with message_id already exists
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code

"""
await self.zeebe_adapter.publish_message(
return await self.zeebe_adapter.publish_message(
name=name,
correlation_key=correlation_key,
time_to_live_in_milliseconds=time_to_live_in_milliseconds,
Expand Down
22 changes: 15 additions & 7 deletions pyzeebe/client/sync_client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import asyncio
from typing import Any, Dict, List, Optional, Tuple
from typing import List, Optional

import grpc
from typing_extensions import deprecated

from pyzeebe import ZeebeClient
from pyzeebe.grpc_internals.types import (
CancelProcessInstanceResponse,
CreateProcessInstanceResponse,
CreateProcessInstanceWithResultResponse,
DeployProcessResponse,
DeployResourceResponse,
PublishMessageResponse,
)
from pyzeebe.types import Variables


Expand All @@ -19,7 +27,7 @@ def run_process(
variables: Optional[Variables] = None,
version: int = -1,
tenant_id: Optional[str] = None,
) -> int:
) -> CreateProcessInstanceResponse:
return self.loop.run_until_complete(self.client.run_process(bpmn_process_id, variables, version, tenant_id))

def run_process_with_result(
Expand All @@ -30,21 +38,21 @@ def run_process_with_result(
timeout: int = 0,
variables_to_fetch: Optional[List[str]] = None,
tenant_id: Optional[str] = None,
) -> Tuple[int, Dict[str, Any]]:
) -> CreateProcessInstanceWithResultResponse:
return self.loop.run_until_complete(
self.client.run_process_with_result(
bpmn_process_id, variables, version, timeout, variables_to_fetch, tenant_id
)
)

def cancel_process_instance(self, process_instance_key: int) -> int:
def cancel_process_instance(self, process_instance_key: int) -> CancelProcessInstanceResponse:
return self.loop.run_until_complete(self.client.cancel_process_instance(process_instance_key))

@deprecated("Deprecated since Zeebe 8.0. Use deploy_resource instead")
def deploy_process(self, *process_file_path: str) -> None:
def deploy_process(self, *process_file_path: str) -> DeployProcessResponse:
return self.loop.run_until_complete(self.client.deploy_process(*process_file_path))

def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[str] = None) -> None:
def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[str] = None) -> DeployResourceResponse:
return self.loop.run_until_complete(self.client.deploy_resource(*resource_file_path, tenant_id=tenant_id))

def publish_message(
Expand All @@ -55,7 +63,7 @@ def publish_message(
time_to_live_in_milliseconds: int = 60000,
message_id: Optional[str] = None,
tenant_id: Optional[str] = None,
) -> None:
) -> PublishMessageResponse:
return self.loop.run_until_complete(
self.client.publish_message(
name,
Expand Down
Loading