Skip to content

Commit

Permalink
Update Python readme and examples
Browse files Browse the repository at this point in the history
  • Loading branch information
gvdongen committed Jan 8, 2025
1 parent f91da29 commit 34787ed
Showing 1 changed file with 110 additions and 111 deletions.
221 changes: 110 additions & 111 deletions python/patterns-use-cases/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
Common tasks and patterns implemented with Restate:

#### Communication
- **[Durable RPC](#microservices-durable-rpc)**, Idempotency \& Concurrency: Restate persists requests and makes sure they execute exactly-once. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/durablerpc/client.py)
- **[(Delayed) Message Queue](#async-tasks-delayed-tasks-queue)**: Use Restate as a queue. Schedule tasks for now or later and ensure the task is only executed once. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/queue/task_submitter.py)
- **[Convert Sync Tasks to Async](#async-tasks-async-data-upload)**: Kick off a synchronous task (e.g. data upload) and turn it into an asynchronous one if it takes too long. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/dataupload/client.py)
- **[Durable RPC, Idempotency and Concurrency](#durable-rpc-idempotency-and-concurrency)**, Idempotency \& Concurrency: Restate persists requests and makes sure they execute exactly-once. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/durablerpc/client.py)
- **[(Delayed) Message Queue](#delayed-message-queue)**: Use Restate as a queue. Schedule tasks for now or later and ensure the task is only executed once. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/queue/task_submitter.py)
- **[Convert Sync Tasks to Async](#convert-sync-tasks-to-async)**: Kick off a synchronous task (e.g. data upload) and turn it into an asynchronous one if it takes too long. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/dataupload/client.py)

#### Common patterns
- **[Sagas](#microservices-sagas)**: Preserve consistency by tracking undo actions and running them when code fails halfway through. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/sagas/booking_workflow.py)
- **[Stateful Actors and State Machines](#microservices-stateful-actors)**: State machine with a set of transitions, built as a Restate Virtual Object for automatic state persistence. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/statefulactors/machine_operator.py)
- **[Payment State Machines (Advanced)](#microservices-payment-state-machine)**: State machine example that tracks a payment process, ensuring consistent processing and cancellations. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/statemachinepayments/payment_processor.py)
- **[Sagas](#sagas)**: Preserve consistency by tracking undo actions and running them when code fails halfway through. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/sagas/booking_workflow.py)
- **[Stateful Actors and State Machines](#stateful-actors-and-state-machines)**: State machine with a set of transitions, built as a Restate Virtual Object for automatic state persistence. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/statefulactors/machine_operator.py)
- **[Payment State Machines (Advanced)](#payment-state-machines)**: State machine example that tracks a payment process, ensuring consistent processing and cancellations. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/statemachinepayments/payment_processor.py)

#### Scheduling
- **[Parallelizing Work](#async-tasks-parallelizing-work)**: Execute a list of tasks in parallel and then gather their result. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/parallelizework/fan_out_worker.py)
- **[Payment Signals (Advanced)](#async-tasks-payment-signals---combining-sync-and-async-webhook-responses-from-stripe)**: Handling async payment callbacks for slow payments, with Stripe. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/signalspayments/payment_service.py)
- **[Parallelizing Work](#parallelizing-work)**: Execute a list of tasks in parallel and then gather their result. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/parallelizework/fan_out_worker.py)
- **[Payment Signals (Advanced)](#payment-signals)**: Handling async payment callbacks for slow payments, with Stripe. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/signalspayments/payment_service.py)

#### Event processing
- **[Transactional Event Processing](#event-processing-transactional-handlers-with-durable-side-effects-and-timers)**: Processing events (from Kafka) to update various downstream systems in a transactional way. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/eventtransactions/user_feed.py)
- **[Event Enrichment / Joins](#event-processing-event-enrichment)**: Stateful functions/actors connected to Kafka and callable over RPC. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/eventenrichment/package_tracker.py)
- **[Transactional Event Processing](#transactional-event-processing)**: Processing events (from Kafka) to update various downstream systems in a transactional way. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/eventtransactions/user_feed.py)
- **[Event Enrichment / Joins](#event-enrichment--joins)**: Stateful functions/actors connected to Kafka and callable over RPC. [<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/play-button.svg" width="16" height="16">](src/eventenrichment/package_tracker.py)

To get started, create a venv and install the requirements file:

Expand All @@ -28,7 +28,6 @@ source .venv/bin/activate
pip install -r requirements.txt
```


## Durable RPC, Idempotency and Concurrency

This example shows an example of:
Expand Down Expand Up @@ -70,6 +69,106 @@ Restate deduplicated the request (with the reservation ID as idempotency key) an

</details>

## (Delayed) Message Queue

Use Restate as a queue. Schedule tasks for now or later and ensure the task is only executed once.

Files to look at:
- [Task Submitter](src/queue/task_submitter.py): schedules tasks via send requests with and idempotency key.
- The **send requests** put the tasks in Restate's queue. The task submitter does not wait for the task response.
- The **idempotency key** in the header is used by Restate to deduplicate requests.
- If a delay is set, the task will be executed later and Restate will track the timer durably, like a **delayed task queue**.
- [Async Task Worker](src/queue/async_task_worker.py): gets invoked by Restate for each task in the queue.


<details>
<summary><strong>Running the example</strong></summary>

1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server`
2. Start the service: `python -m hypercorn --config hypercorn-config.toml src/queue/async_task_worker:app`
3. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080`

Submit a task with a delay: `python src/queue/task_submitter.py task123`

You will see the task executed after
```
Submitting task with idempotency key: task123
Task submitted: {'invocationId': 'inv_1lloi4vK3cnG0T2Tsteh8rd99NrGpgtsYh', 'status': 'Accepted'}
Task result: Finished work on task: task123
```

If we resubmit the same task: `python src/queue/task_submitter.py user123`,
you will see that the task is not executed again (signals `PreviouslyAccepted`), but the same result is returned:
```
Submitting task with idempotency key: task123
Task submitted: {'invocationId': 'inv_1lloi4vK3cnG0T2Tsteh8rd99NrGpgtsYh', 'status': 'PreviouslyAccepted'}
Task result: Finished work on task: task123
```

</details>

## Convert Sync Tasks to Async

This example shows how to use the Restate SDK to **kick of a synchronous task and turn it into an asynchronous one if it takes too long**.

The example implements a [data upload service](src/dataupload/data_upload_service.py), that creates a bucket, uploads data to it, and then returns the URL.

The [client](src/dataupload/client.py) does a synchronous request to upload the file, and the server will respond with the URL.

If the upload takes too long, however, the client asks the upload service to send the URL later in an email.

<details>
<summary><strong>Running the example</strong></summary>

1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server`
2. Start the service: `python -m hypercorn --config hypercorn-config.toml src/dataupload/data_upload_service:app`
3. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080`

Run the upload client with a userId: `python src/dataupload/client.py my_user_id12`

This will submit an upload workflow to the data upload service.
The workflow will run only once per ID, so you need to provide a new ID for each run.

Have a look at the logs to see how the execution switches from synchronously waiting to the response to requesting an email:

<details>
<summary>View logs: fast upload</summary>

Client logs:
```
[2024-12-19 12:30:02,072] [667791] [INFO] - Start upload for my_user_id12
[2024-12-19 12:30:03,597] [667791] [INFO] - Fast upload: URL was https://s3-eu-central-1.amazonaws.com/282507974/
```
Workflow logs:
```
[2024-12-19 12:30:02,084] [667381] [INFO] - Creating bucket with URL https://s3-eu-central-1.amazonaws.com/282507974/
[2024-12-19 12:30:02,085] [667381] [INFO] - Uploading data to target https://s3-eu-central-1.amazonaws.com/282507974/. ETA: 1.5s
```

</details>
<details>
<summary>View logs: slow upload</summary>

Client logs:
```
[2024-12-19 12:28:33,471] [667526] [INFO] - Start upload for my_user_id123
[2024-12-19 12:28:38,477] [667526] [INFO] - Slow upload... Mail the link later
```

Workflow logs:
```
[2024-12-19 12:28:33,481] [667383] [INFO] - Creating bucket with URL https://s3-eu-central-1.amazonaws.com/23907419/
[2024-12-19 12:28:33,483] [667383] [INFO] - Uploading data to target https://s3-eu-central-1.amazonaws.com/23907419/. ETA: 10s
[2024-12-19 12:28:38,486] [667383] [INFO] - Slow upload: client requested to be notified via email
[2024-12-19 12:28:43,493] [667383] [INFO] - Sending email to my_user_id123@example.com with URL 'https://s3-eu-central-1.amazonaws.com/23907419/'
```

You see the call to `resultAsEmail` after the upload took too long, and the sending of the email.

</details>
</details>

## Sagas

An example of a trip reservation workflow, using the saga pattern to undo previous steps in case of an error.
Expand Down Expand Up @@ -275,44 +374,6 @@ restate kv get PaymentProcessor some-string-id
</details>
</details>

## (Delayed) Message Queue

Use Restate as a queue. Schedule tasks for now or later and ensure the task is only executed once.

Files to look at:
- [Task Submitter](src/queue/task_submitter.py): schedules tasks via send requests with and idempotency key.
- The **send requests** put the tasks in Restate's queue. The task submitter does not wait for the task response.
- The **idempotency key** in the header is used by Restate to deduplicate requests.
- If a delay is set, the task will be executed later and Restate will track the timer durably, like a **delayed task queue**.
- [Async Task Worker](src/queue/async_task_worker.py): gets invoked by Restate for each task in the queue.


<details>
<summary><strong>Running the example</strong></summary>

1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server`
2. Start the service: `python -m hypercorn --config hypercorn-config.toml src/queue/async_task_worker:app`
3. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080`

Submit a task with a delay: `python src/queue/task_submitter.py task123`

You will see the task executed after
```
Submitting task with idempotency key: task123
Task submitted: {'invocationId': 'inv_1lloi4vK3cnG0T2Tsteh8rd99NrGpgtsYh', 'status': 'Accepted'}
Task result: Finished work on task: task123
```

If we resubmit the same task: `python src/queue/task_submitter.py user123`,
you will see that the task is not executed again (signals `PreviouslyAccepted`), but the same result is returned:
```
Submitting task with idempotency key: task123
Task submitted: {'invocationId': 'inv_1lloi4vK3cnG0T2Tsteh8rd99NrGpgtsYh', 'status': 'PreviouslyAccepted'}
Task result: Finished work on task: task123
```

</details>

## Parallelizing work

This example shows how to use the Restate SDK to **execute a list of tasks in parallel and then gather their result**.
Expand All @@ -324,68 +385,6 @@ It then splits the task into subtasks, executes them in parallel, and then gathe
Restate guarantees and manages the execution of all the subtasks across failures.
You can run this on FaaS infrastructure, like AWS Lambda, and it will scale automatically.

## Convert Sync Tasks to Async

This example shows how to use the Restate SDK to **kick of a synchronous task and turn it into an asynchronous one if it takes too long**.

The example implements a [data upload service](src/dataupload/data_upload_service.py), that creates a bucket, uploads data to it, and then returns the URL.

The [client](src/dataupload/client.py) does a synchronous request to upload the file, and the server will respond with the URL.

If the upload takes too long, however, the client asks the upload service to send the URL later in an email.

<details>
<summary><strong>Running the example</strong></summary>

1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server`
2. Start the service: `python -m hypercorn --config hypercorn-config.toml src/dataupload/data_upload_service:app`
3. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080`

Run the upload client with a userId: `python src/dataupload/client.py my_user_id12`

This will submit an upload workflow to the data upload service.
The workflow will run only once per ID, so you need to provide a new ID for each run.

Have a look at the logs to see how the execution switches from synchronously waiting to the response to requesting an email:

<details>
<summary>View logs: fast upload</summary>

Client logs:
```
[2024-12-19 12:30:02,072] [667791] [INFO] - Start upload for my_user_id12
[2024-12-19 12:30:03,597] [667791] [INFO] - Fast upload: URL was https://s3-eu-central-1.amazonaws.com/282507974/
```
Workflow logs:
```
[2024-12-19 12:30:02,084] [667381] [INFO] - Creating bucket with URL https://s3-eu-central-1.amazonaws.com/282507974/
[2024-12-19 12:30:02,085] [667381] [INFO] - Uploading data to target https://s3-eu-central-1.amazonaws.com/282507974/. ETA: 1.5s
```

</details>
<details>
<summary>View logs: slow upload</summary>

Client logs:
```
[2024-12-19 12:28:33,471] [667526] [INFO] - Start upload for my_user_id123
[2024-12-19 12:28:38,477] [667526] [INFO] - Slow upload... Mail the link later
```

Workflow logs:
```
[2024-12-19 12:28:33,481] [667383] [INFO] - Creating bucket with URL https://s3-eu-central-1.amazonaws.com/23907419/
[2024-12-19 12:28:33,483] [667383] [INFO] - Uploading data to target https://s3-eu-central-1.amazonaws.com/23907419/. ETA: 10s
[2024-12-19 12:28:38,486] [667383] [INFO] - Slow upload: client requested to be notified via email
[2024-12-19 12:28:43,493] [667383] [INFO] - Sending email to my_user_id123@example.com with URL 'https://s3-eu-central-1.amazonaws.com/23907419/'
```

You see the call to `resultAsEmail` after the upload took too long, and the sending of the email.

</details>
</details>

## Payment Signals

This example issues a payment request to Stripe.
Expand Down

0 comments on commit 34787ed

Please sign in to comment.