Skip to content

Commit

Permalink
feat: support stream job
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhi committed Mar 18, 2024
1 parent 5294412 commit ab3de2a
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 4 deletions.
37 changes: 34 additions & 3 deletions pyzeebe/grpc_internals/zeebe_job_adapter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import logging
from typing import AsyncGenerator, Dict, List, Optional
Expand All @@ -11,6 +12,7 @@
FailJobResponse,
ThrowErrorRequest,
ThrowErrorResponse,
StreamActivatedJobsRequest,
)

from pyzeebe.errors import (
Expand Down Expand Up @@ -57,7 +59,35 @@ async def activate_jobs(
raise ActivateJobsRequestInvalidError(task_type, worker, timeout, max_jobs_to_activate) from grpc_error
await self._handle_grpc_error(grpc_error)

def _create_job_from_raw_job(self, response) -> Job:
async def openStream(
self,
task_type: str,
worker: str,
timeout: int,
variables_to_fetch: List[str],
queue: asyncio.Queue,
tenant_ids: Optional[List[str]] = None,
):
try:
stream = self._gateway_stub.StreamActivatedJobs(
StreamActivatedJobsRequest(
type=task_type,
worker=worker,
timeout=timeout,
fetchVariable=variables_to_fetch,
tenantIds=tenant_ids
)
)

async for job in stream:
raw_job = self._create_job_from_raw_job(job, True)
await queue.put(raw_job)
except grpc.aio.AioRpcError as grpc_error:
if is_error_status(grpc_error, grpc.StatusCode.INVALID_ARGUMENT):
raise ActivateJobsRequestInvalidError(task_type, worker, timeout) from grpc_error
await self._handle_grpc_error(grpc_error)

def _create_job_from_raw_job(self, response, stream_job: Optional[bool] = False) -> Job:
return Job(
key=response.key,
type=response.type,
Expand All @@ -74,6 +104,7 @@ def _create_job_from_raw_job(self, response) -> Job:
variables=json.loads(response.variables),
tenant_id=response.tenantId,
zeebe_adapter=self,
stream_job=stream_job,
)

async def complete_job(self, job_key: int, variables: Dict) -> CompleteJobResponse:
Expand All @@ -89,7 +120,7 @@ async def complete_job(self, job_key: int, variables: Dict) -> CompleteJobRespon
await self._handle_grpc_error(grpc_error)

async def fail_job(
self, job_key: int, retries: int, message: str, retry_back_off_ms: int, variables: Dict
self, job_key: int, retries: int, message: str, retry_back_off_ms: int, variables: Dict
) -> FailJobResponse:
try:
return await self._gateway_stub.FailJob(
Expand All @@ -109,7 +140,7 @@ async def fail_job(
await self._handle_grpc_error(grpc_error)

async def throw_error(
self, job_key: int, message: str, variables: Dict, error_code: str = ""
self, job_key: int, message: str, variables: Dict, error_code: str = ""
) -> ThrowErrorResponse:
try:
return await self._gateway_stub.ThrowError(
Expand Down
1 change: 1 addition & 0 deletions pyzeebe/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Job:
tenant_id: Optional[str] = None
status: JobStatus = JobStatus.Running
zeebe_adapter: Optional["ZeebeAdapter"] = None # type: ignore
stream_job: Optional[bool] = False

async def set_running_after_decorators_status(self) -> None:
"""
Expand Down
2 changes: 2 additions & 0 deletions pyzeebe/task/task_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(
variable_name: str,
before: List[TaskDecorator],
after: List[TaskDecorator],
stream: Optional[bool] = False,
):
if single_value and not variable_name:
raise NoVariableNameGivenError(type)
Expand All @@ -34,6 +35,7 @@ def __init__(
self.variables_to_fetch = variables_to_fetch
self.single_value = single_value
self.variable_name = variable_name
self.stream = stream
self.before = async_tools.asyncify_all_functions(before)
self.after = async_tools.asyncify_all_functions(after)
self.job_parameter_name: Optional[str] = None
Expand Down
2 changes: 2 additions & 0 deletions pyzeebe/worker/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ async def execute(self) -> None:
while self.should_execute():
job = await self.get_next_job()
task = asyncio.create_task(self.execute_one_job(job))
if job.stream_job:
continue
task.add_done_callback(create_job_callback(self, job))

async def get_next_job(self) -> Job:
Expand Down
20 changes: 20 additions & 0 deletions pyzeebe/worker/job_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,26 @@ async def poll(self):
while self.should_poll():
await self.activate_max_jobs()

async def openStream(self):
try:
await self.zeebe_adapter.openStream(
task_type=self.task.type,
worker=self.worker_name,
timeout=self.task.config.timeout_ms,
variables_to_fetch=self.task.config.variables_to_fetch,
queue=self.queue,
tenant_ids=self.tenant_ids,
)
except ActivateJobsRequestInvalidError:
logger.warning("Activate job requests was invalid for task %s", self.task.type)
raise
except (ZeebeBackPressureError, ZeebeGatewayUnavailableError, ZeebeInternalError) as error:
logger.warning(
"Failed to activate jobs from the gateway. Exception: %s. Retrying in 5 seconds...",
repr(error),
)
await asyncio.sleep(5)

async def activate_max_jobs(self):
if self.calculate_max_jobs_to_activate() > 0:
await self.poll_once()
Expand Down
3 changes: 3 additions & 0 deletions pyzeebe/worker/task_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def task(
after: Optional[List[TaskDecorator]] = None,
single_value: bool = False,
variable_name: Optional[str] = None,
stream: Optional[bool] = False,
):
"""
Decorator to create a task
Expand Down Expand Up @@ -80,6 +81,7 @@ def task_wrapper(task_function: Callable):
variable_name or "",
before or [],
after or [],
stream
)
config_with_decorators = self._add_decorators_to_config(config)

Expand All @@ -103,6 +105,7 @@ def _add_decorators_to_config(self, config: TaskConfig) -> TaskConfig:
variables_to_fetch=config.variables_to_fetch,
single_value=config.single_value,
variable_name=config.variable_name,
stream=config.stream,
before=self._before + config.before, # type: ignore
after=config.after + self._after, # type: ignore
)
Expand Down
2 changes: 1 addition & 1 deletion pyzeebe/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async def work(self) -> None:
self._job_pollers.append(poller)
self._job_executors.append(executor)

coroutines = [poller.poll() for poller in self._job_pollers] + [
coroutines = [poller.openStream() if poller.task.config.stream else poller.poll() for poller in self._job_pollers] + [
executor.execute() for executor in self._job_executors
]

Expand Down

0 comments on commit ab3de2a

Please sign in to comment.