Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
Add WorkflowClient.new_workflow_stub_from_workflow_id()
Browse files Browse the repository at this point in the history
  • Loading branch information
firdaus committed Mar 1, 2020
1 parent a9653fb commit 566b976
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 5 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ Mar
- [x] newRandom
- [x] UUID
- [x] Workflow Versioning
- [ ] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId);
- [x] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId);

1.1
- [ ] ActivityStub and Workflow.newUntypedActivityStub
Expand Down
75 changes: 75 additions & 0 deletions cadence/tests/test_stub_workflow_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import time

import pytest

from cadence.exceptions import QueryFailureException
from cadence.workerfactory import WorkerFactory
from cadence.workflow import workflow_method, signal_method, Workflow, WorkflowClient, query_method

TASK_LIST = "TestStubWorkflowId"
DOMAIN = "sample"


class GreetingException(Exception):
pass


class TestStubWorkflowId:

@query_method()
async def get_message(self) -> str:
raise NotImplementedError

@query_method()
async def get_message_fail(self) -> str:
raise NotImplementedError

@signal_method()
async def put_message(self, message):
raise NotImplementedError

@workflow_method(task_list=TASK_LIST)
async def get_greetings(self) -> list:
raise NotImplementedError


class TestStubWorkflowIdImpl(TestStubWorkflowId):

def __init__(self):
self.message = ""

async def get_message(self) -> str:
return self.message

async def get_message_fail(self) -> str:
raise GreetingException("error from query")

async def put_message(self, message):
self.message = message

async def get_greetings(self) -> list:
self.message = "initial-message"
await Workflow.await_till(lambda: self.message == "done")
return "finished"


def test_stub_workflow_id():
factory = WorkerFactory("localhost", 7933, DOMAIN)
worker = factory.new_worker(TASK_LIST)
worker.register_workflow_implementation_type(TestStubWorkflowIdImpl)
factory.start()

client = WorkflowClient.new_client(domain=DOMAIN)
workflow: TestStubWorkflowId = client.new_workflow_stub(TestStubWorkflowId)
context = WorkflowClient.start(workflow.get_greetings)

stub: TestStubWorkflowId = client.new_workflow_stub_from_workflow_id(TestStubWorkflowId,
workflow_id=context.workflow_execution.workflow_id)
stub.put_message("abc")
assert stub.get_message() == "abc"

stub.put_message("done")
assert client.wait_for_close_with_workflow_id(context.workflow_execution.workflow_id) == "finished"

print("Stopping workers")
worker.stop()
23 changes: 19 additions & 4 deletions cadence/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,24 @@ def new_workflow_stub(self, cls: Type, workflow_options: WorkflowOptions = None)
stub_cls = type(cls.__name__, (WorkflowStub,), attrs)
return stub_cls()

def new_workflow_stub_from_workflow_id(self, cls: Type, workflow_id: str):
"""
Use it to send signals or queries to a running workflow.
Do not call workflow methods on it
"""
stub_instance = self.new_workflow_stub(cls)
execution = WorkflowExecution(workflow_id=workflow_id, run_id=None)
stub_instance._execution = execution
return stub_instance

def wait_for_close(self, context: WorkflowExecutionContext) -> object:
return self.wait_for_close_with_workflow_id(workflow_id=context.workflow_execution.workflow_id,
run_id=context.workflow_execution.run_id,
workflow_type=context.workflow_type)

def wait_for_close_with_workflow_id(self, workflow_id: str, run_id: str = None, workflow_type: str = None):
while True:
history_request = create_close_history_event_request(self, context.workflow_execution.workflow_id,
context.workflow_execution.run_id)
history_request = create_close_history_event_request(self, workflow_id, run_id)
history_response, err = self.service.get_workflow_execution_history(history_request)
if err:
raise Exception(err)
Expand All @@ -152,8 +166,9 @@ def wait_for_close(self, context: WorkflowExecutionContext) -> object:
exception = deserialize_exception(attributes.details)
if isinstance(exception, ActivityFailureException):
exception.set_cause()
raise WorkflowFailureException(workflow_type=context.workflow_type,
execution=context.workflow_execution) from exception
workflow_execution = WorkflowExecution(workflow_id=workflow_id, run_id=run_id)
raise WorkflowFailureException(workflow_type=workflow_type,
execution=workflow_execution) from exception
else:
details: Dict = json.loads(attributes.details)
detail_message = details.get("detailMessage", "")
Expand Down

0 comments on commit 566b976

Please sign in to comment.