diff --git a/pyzeebe/grpc_internals/zeebe_job_adapter.py b/pyzeebe/grpc_internals/zeebe_job_adapter.py index 67d03ea2..c4f7a7ba 100644 --- a/pyzeebe/grpc_internals/zeebe_job_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_job_adapter.py @@ -1,3 +1,4 @@ +import asyncio import json import logging from typing import AsyncGenerator, Dict, List, Optional @@ -11,6 +12,7 @@ FailJobResponse, ThrowErrorRequest, ThrowErrorResponse, + StreamActivatedJobsRequest, ) from pyzeebe.errors import ( @@ -57,7 +59,35 @@ async def activate_jobs( raise ActivateJobsRequestInvalidError(task_type, worker, timeout, max_jobs_to_activate) from grpc_error await self._handle_grpc_error(grpc_error) - def _create_job_from_raw_job(self, response) -> Job: + async def openStream( + self, + task_type: str, + worker: str, + timeout: int, + variables_to_fetch: List[str], + queue: asyncio.Queue, + tenant_ids: Optional[List[str]] = None, + ): + try: + stream = self._gateway_stub.StreamActivatedJobs( + StreamActivatedJobsRequest( + type=task_type, + worker=worker, + timeout=timeout, + fetchVariable=variables_to_fetch, + tenantIds=tenant_ids + ) + ) + + async for job in stream: + raw_job = self._create_job_from_raw_job(job, True) + await queue.put(raw_job) + except grpc.aio.AioRpcError as grpc_error: + if is_error_status(grpc_error, grpc.StatusCode.INVALID_ARGUMENT): + raise ActivateJobsRequestInvalidError(task_type, worker, timeout) from grpc_error + await self._handle_grpc_error(grpc_error) + + def _create_job_from_raw_job(self, response, stream_job: Optional[bool] = False) -> Job: return Job( key=response.key, type=response.type, @@ -74,6 +104,7 @@ def _create_job_from_raw_job(self, response) -> Job: variables=json.loads(response.variables), tenant_id=response.tenantId, zeebe_adapter=self, + stream_job=stream_job, ) async def complete_job(self, job_key: int, variables: Dict) -> CompleteJobResponse: @@ -89,7 +120,7 @@ async def complete_job(self, job_key: int, variables: Dict) -> CompleteJobRespon await self._handle_grpc_error(grpc_error) async def fail_job( - self, job_key: int, retries: int, message: str, retry_back_off_ms: int, variables: Dict + self, job_key: int, retries: int, message: str, retry_back_off_ms: int, variables: Dict ) -> FailJobResponse: try: return await self._gateway_stub.FailJob( @@ -109,7 +140,7 @@ async def fail_job( await self._handle_grpc_error(grpc_error) async def throw_error( - self, job_key: int, message: str, variables: Dict, error_code: str = "" + self, job_key: int, message: str, variables: Dict, error_code: str = "" ) -> ThrowErrorResponse: try: return await self._gateway_stub.ThrowError( diff --git a/pyzeebe/job/job.py b/pyzeebe/job/job.py index 8410026b..6e585ca6 100644 --- a/pyzeebe/job/job.py +++ b/pyzeebe/job/job.py @@ -24,6 +24,7 @@ class Job: tenant_id: Optional[str] = None status: JobStatus = JobStatus.Running zeebe_adapter: Optional["ZeebeAdapter"] = None # type: ignore + stream_job: Optional[bool] = False async def set_running_after_decorators_status(self) -> None: """ diff --git a/pyzeebe/task/task_config.py b/pyzeebe/task/task_config.py index c93a8341..cb31dcbf 100644 --- a/pyzeebe/task/task_config.py +++ b/pyzeebe/task/task_config.py @@ -22,6 +22,7 @@ def __init__( variable_name: str, before: List[TaskDecorator], after: List[TaskDecorator], + stream: Optional[bool] = False, ): if single_value and not variable_name: raise NoVariableNameGivenError(type) @@ -34,6 +35,7 @@ def __init__( self.variables_to_fetch = variables_to_fetch self.single_value = single_value self.variable_name = variable_name + self.stream = stream self.before = async_tools.asyncify_all_functions(before) self.after = async_tools.asyncify_all_functions(after) self.job_parameter_name: Optional[str] = None diff --git a/pyzeebe/worker/job_executor.py b/pyzeebe/worker/job_executor.py index 40959996..c9e32b41 100644 --- a/pyzeebe/worker/job_executor.py +++ b/pyzeebe/worker/job_executor.py @@ -23,6 +23,8 @@ async def execute(self) -> None: while self.should_execute(): job = await self.get_next_job() task = asyncio.create_task(self.execute_one_job(job)) + if job.stream_job: + continue task.add_done_callback(create_job_callback(self, job)) async def get_next_job(self) -> Job: diff --git a/pyzeebe/worker/job_poller.py b/pyzeebe/worker/job_poller.py index fbb634b3..f48eee92 100644 --- a/pyzeebe/worker/job_poller.py +++ b/pyzeebe/worker/job_poller.py @@ -41,6 +41,26 @@ async def poll(self): while self.should_poll(): await self.activate_max_jobs() + async def openStream(self): + try: + await self.zeebe_adapter.openStream( + task_type=self.task.type, + worker=self.worker_name, + timeout=self.task.config.timeout_ms, + variables_to_fetch=self.task.config.variables_to_fetch, + queue=self.queue, + tenant_ids=self.tenant_ids, + ) + except ActivateJobsRequestInvalidError: + logger.warning("Activate job requests was invalid for task %s", self.task.type) + raise + except (ZeebeBackPressureError, ZeebeGatewayUnavailableError, ZeebeInternalError) as error: + logger.warning( + "Failed to activate jobs from the gateway. Exception: %s. Retrying in 5 seconds...", + repr(error), + ) + await asyncio.sleep(5) + async def activate_max_jobs(self): if self.calculate_max_jobs_to_activate() > 0: await self.poll_once() diff --git a/pyzeebe/worker/task_router.py b/pyzeebe/worker/task_router.py index 7a250af8..63f803c2 100644 --- a/pyzeebe/worker/task_router.py +++ b/pyzeebe/worker/task_router.py @@ -44,6 +44,7 @@ def task( after: Optional[List[TaskDecorator]] = None, single_value: bool = False, variable_name: Optional[str] = None, + stream: Optional[bool] = False, ): """ Decorator to create a task @@ -80,6 +81,7 @@ def task_wrapper(task_function: Callable): variable_name or "", before or [], after or [], + stream ) config_with_decorators = self._add_decorators_to_config(config) @@ -103,6 +105,7 @@ def _add_decorators_to_config(self, config: TaskConfig) -> TaskConfig: variables_to_fetch=config.variables_to_fetch, single_value=config.single_value, variable_name=config.variable_name, + stream=config.stream, before=self._before + config.before, # type: ignore after=config.after + self._after, # type: ignore ) diff --git a/pyzeebe/worker/worker.py b/pyzeebe/worker/worker.py index 76a0e4b8..9c4b1f5a 100644 --- a/pyzeebe/worker/worker.py +++ b/pyzeebe/worker/worker.py @@ -87,7 +87,7 @@ async def work(self) -> None: self._job_pollers.append(poller) self._job_executors.append(executor) - coroutines = [poller.poll() for poller in self._job_pollers] + [ + coroutines = [poller.openStream() if poller.task.config.stream else poller.poll() for poller in self._job_pollers] + [ executor.execute() for executor in self._job_executors ]