Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python SDK workflow updates #4563

Draft
wants to merge 1 commit into
base: v1.15
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,31 @@ The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar si

<!--python-->

Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`.
Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates task chaining activities that receive input

```python
def hello_act(ctx: WorkflowActivityContext, input):
global counter
counter += input
print(f'New counter value is: {counter}!', flush=True)
@wfr.activity(name='step10')
def step1(ctx, activity_input):
print(f'Step 1: Received input: {activity_input}.')
# Do some work
return activity_input + 1


@wfr.activity
def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
return activity_input * 2


@wfr.activity
def step3(ctx, activity_input):
print(f'Step 3: Received input: {activity_input}.')
# Do some work
return activity_input ^ 2
```

[See the `hello_act` workflow activity in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL40C1-L43C59)
[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py)


{{% /codetab %}}
Expand Down Expand Up @@ -226,16 +241,19 @@ Next, register and call the activites in a workflow.

<!--python-->

The `hello_world_wf` function is derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities.
The `random_workflow` function is a task chaining workflow pattern derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities.

```python
def hello_world_wf(ctx: DaprWorkflowContext, input):
print(f'{input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.wait_for_external_event("event1")
yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
@wfr.workflow(name='random_workflow')
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
result2 = yield ctx.call_activity(step2, input=result1)
result3 = yield ctx.call_activity(step3, input=result2)
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
return [result1, result2, result3]
```

[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51)
Expand Down Expand Up @@ -409,82 +427,77 @@ Finally, compose the application using the workflow.

- A Python package called `DaprClient` to receive the Python SDK capabilities.
- A builder with extensions called:
- `WorkflowRuntime`: Allows you to register workflows and workflow activities
- `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}})
- `WorkflowRuntime`: Allows you to register the workflow runtime.
- `DaprWorkflowContext`: Allows you to [create workflows and workflow activities]({{< ref "#write-the-workflow" >}})
- `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}})
- API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow.

```python
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.clients import DaprClient

# ...

def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflowRuntime = WorkflowRuntime(host, port)
workflowRuntime = WorkflowRuntime()
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()

# Start workflow
print("==========Start Counter Increase as per Input:==========")
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")

# ...

# Pause workflow
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")

# Resume workflow
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")

sleep(1)
# Raise workflow
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
event_name=eventName, event_data=eventData)

sleep(5)
# Purge workflow
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")

# Kick off another workflow for termination purposes
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")

# Terminate workflow
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")

# Purge workflow
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")

workflowRuntime.shutdown()
from durabletask import worker, task

from dapr.ext.workflow.workflow_context import Workflow
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext
from dapr.ext.workflow.util import getAddress

from dapr.clients import DaprInternalError
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
from dapr.conf import settings
from dapr.conf.helpers import GrpcEndpoint
from dapr.ext.workflow.logger import LoggerOptions, Logger

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did you get this example? I do not think you need all these imports. Can we check with Elena?
+++++++++++++++++++++++++++++
from durabletask import worker, task

from dapr.ext.workflow.workflow_context import Workflow
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext
from dapr.ext.workflow.util import getAddress

from dapr.clients import DaprInternalError
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
from dapr.conf import settings
from dapr.conf.helpers import GrpcEndpoint
from dapr.ext.workflow.logger import LoggerOptions, Logger

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there are some unused imports here, and some indentation problems, which will make the snippet error out.
I also think it's important to show the workflow definition.

This should be a good starting point for the example: https://github.com/dapr/python-sdk/blob/release-1.15/examples/workflow/simple.py

wfr = wf.WorkflowRuntime()

@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
# Workflow definition...

@wfr.activity(name='hello_act')
def hello_act(ctx: WorkflowActivityContext, wf_input):
# Activity definition...

# Start workflow
wfr = WorkflowRuntime()
wfr.start()
wf_client = DaprWorkflowClient()

# ...

# Pause workflow
wf_client.pause_workflow(instance_id=instance_id)
metadata = wf_client.get_workflow_state(instance_id=instance_id)
# ... check status ...
wf_client.resume_workflow(instance_id=instance_id)

sleep(1)

# Raise workflow
wf_client.raise_workflow_event(
instance_id=instance_id,
event_name=event_name,
data=event_data
)

# Purge workflow
state = wf_client.wait_for_workflow_completion(
instance_id,
timeout_in_seconds=30
)
wf_client.purge_workflow(instance_id=instance_id)

workflowRuntime.shutdown()

if __name__ == '__main__':
main()
wfr.start()
sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42)
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)
print(f'Workflow completed! Status: {state.runtime_status}')

wfr.shutdown()
```


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ description: Get started with the Dapr conversation building block
The conversation building block is currently in **alpha**.
{{% /alert %}}

Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it for a poem about Dapr.
Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it to define Dapr.

You can try out this conversation quickstart by either:

- [Running the application in this sample with the Multi-App Run template file]({{< ref "#run-the-app-with-the-template-file" >}}), or
- [Running the application without the template]({{< ref "#run-the-app-without-the-template" >}})

{{% alert title="Note" color="primary" %}}
Currently, only the HTTP quickstart sample is available in Python and JavaScript.
Currently, you can only use JavaScript for the quickstart sample using HTTP, not the JavaScript SDK.
{{% /alert %}}

## Run the app with the template file
Expand Down Expand Up @@ -50,7 +50,7 @@ git clone https://github.com/dapr/quickstarts.git
From the root of the Quickstarts directory, navigate into the conversation directory:

```bash
cd conversation/python/http/conversation
cd conversation/python/sdk/conversation
```

Install the dependencies:
Expand All @@ -61,7 +61,7 @@ pip3 install -r requirements.txt

### Step 3: Launch the conversation service

Navigate back to the `http` directory and start the conversation service with the following command:
Navigate back to the `sdk` directory and start the conversation service with the following command:

```bash
dapr run -f .
Expand Down Expand Up @@ -117,37 +117,28 @@ In the application code:
- The mock LLM echoes "What is dapr?".

```python
import logging
import requests
import os

logging.basicConfig(level=logging.INFO)

base_url = os.getenv('BASE_URL', 'http://localhost') + ':' + os.getenv(
'DAPR_HTTP_PORT', '3500')

CONVERSATION_COMPONENT_NAME = 'echo'

input = {
'name': 'echo',
'inputs': [{'message':'What is dapr?'}],
'parameters': {},
'metadata': {}
from dapr.clients import DaprClient
from dapr.clients.grpc._request import ConversationInput

with DaprClient() as d:
inputs = [
ConversationInput(content="What is dapr?", role='user', scrub_pii=True),
]

metadata = {
'model': 'modelname',
'key': 'authKey',
'cacheTTL': '10m',
}

# Send input to conversation endpoint
result = requests.post(
url='%s/v1.0-alpha1/conversation/%s/converse' % (base_url, CONVERSATION_COMPONENT_NAME),
json=input
)

logging.info('Input sent: What is dapr?')
print('Input sent: What is dapr?')

# Parse conversation output
data = result.json()
output = data["outputs"][0]["result"]
response = d.converse_alpha1(
name='echo', inputs=inputs, temperature=0.7, context_id='chat-123', metadata=metadata
)

logging.info('Output response: ' + output)
for output in response.outputs:
print(f'Output response: {output.result}')
```

{{% /codetab %}}
Expand Down Expand Up @@ -575,7 +566,7 @@ git clone https://github.com/dapr/quickstarts.git
From the root of the Quickstarts directory, navigate into the conversation directory:

```bash
cd conversation/python/http/conversation
cd conversation/python/sdk/conversation
```

Install the dependencies:
Expand All @@ -586,7 +577,7 @@ pip3 install -r requirements.txt

### Step 3: Launch the conversation service

Navigate back to the `http` directory and start the conversation service with the following command:
Navigate back to the `sdk` directory and start the conversation service with the following command:

```bash
dapr run --app-id conversation --resources-path ../../../components -- python3 app.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ class WorkflowConsoleApp:
if __name__ == '__main__':
app = WorkflowConsoleApp()
app.main()

```

#### `order-processor/workflow.py`
Expand All @@ -276,7 +275,6 @@ wfr = WorkflowRuntime()

logging.basicConfig(level=logging.INFO)


@wfr.workflow(name="order_processing_workflow")
def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: str):
"""Defines the order processing workflow.
Expand Down Expand Up @@ -343,7 +341,6 @@ def notify_activity(ctx: WorkflowActivityContext, input: Notification):
logger = logging.getLogger('NotifyActivity')
logger.info(input.message)


@wfr.activity(name="process_payment_activity")
def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest):
"""Defines Process Payment Activity.This is used by the workflow to process a payment"""
Expand All @@ -353,7 +350,6 @@ def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest
+' USD')
logger.info(f'Payment for request ID {input.request_id} processed successfully')


@wfr.activity(name="verify_inventory_activity")
def verify_inventory_activity(ctx: WorkflowActivityContext,
input: InventoryRequest) -> InventoryResult:
Expand All @@ -377,8 +373,6 @@ def verify_inventory_activity(ctx: WorkflowActivityContext,
return InventoryResult(True, inventory_item)
return InventoryResult(False, None)



@wfr.activity(name="update_inventory_activity")
def update_inventory_activity(ctx: WorkflowActivityContext,
input: PaymentRequest) -> InventoryResult:
Expand All @@ -401,8 +395,6 @@ def update_inventory_activity(ctx: WorkflowActivityContext,
client.save_state(store_name, input.item_being_purchased, new_val)
logger.info(f'There are now {new_quantity} {input.item_being_purchased} left in stock')



@wfr.activity(name="request_approval_activity")
def request_approval_activity(ctx: WorkflowActivityContext,
input: OrderPayload):
Expand All @@ -413,7 +405,6 @@ def request_approval_activity(ctx: WorkflowActivityContext,

logger.info('Requesting approval for payment of '+f'{input["total_cost"]}'+' USD for '
+f'{input["quantity"]}' +' ' +f'{input["item_name"]}')

```
{{% /codetab %}}

Expand Down
Loading