Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ test-unit:
test-e2e:
pytest -m e2e --verbose

coverage-clean:
rm -f .coverage .coverage.* coverage.xml

coverage-all: coverage-clean
pytest -m "not e2e" --durations=0 --cov=durabletask --cov-branch --cov-report=term-missing --cov-report=xml
pytest -m e2e --durations=0 --cov=durabletask --cov-branch --cov-report=term-missing --cov-report=xml --cov-append

install:
python3 -m pip install .

Expand All @@ -18,4 +25,4 @@ gen-proto:
python3 -m grpc_tools.protoc --proto_path=. --python_out=. --pyi_out=. --grpc_python_out=. ./durabletask/internal/orchestrator_service.proto
rm durabletask/internal/*.proto

.PHONY: init test-unit test-e2e gen-proto install
.PHONY: init test-unit test-e2e coverage-clean coverage-unit coverage-e2e coverage-all gen-proto install
56 changes: 54 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,62 @@ Orchestrations can be continued as new using the `continue_as_new` API. This API

Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost. An orchestration can also be terminated using the `terminate_orchestration` client API. Terminated orchestrations will stop processing new events and will discard any buffered events.

### Retry policies (TODO)
### Retry policies

Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.

#### Creating a retry policy

```python
from datetime import timedelta
from durabletask import task

retry_policy = task.RetryPolicy(
first_retry_interval=timedelta(seconds=1), # Initial delay before first retry
max_number_of_attempts=5, # Maximum total attempts (includes first attempt)
backoff_coefficient=2.0, # Exponential backoff multiplier (must be >= 1)
max_retry_interval=timedelta(seconds=30), # Cap on retry delay
retry_timeout=timedelta(minutes=5), # Total time limit for all retries (optional)
)
```

**Notes:**
- `max_number_of_attempts` **includes the initial attempt**. For example, `max_number_of_attempts=5` means 1 initial attempt + up to 4 retries.
- `retry_timeout` is optional. If omitted or set to `None`, retries continue until `max_number_of_attempts` is reached.
- `backoff_coefficient` controls exponential backoff: delay = `first_retry_interval * (backoff_coefficient ^ retry_number)`, capped by `max_retry_interval`.
- `non_retryable_error_types` (optional) can specify additional exception types to treat as non-retryable (e.g., `[ValueError, TypeError]`). `NonRetryableError` is always non-retryable regardless of this setting.

#### Using retry policies

Apply retry policies to activities or sub-orchestrations:

```python
def my_orchestrator(ctx: task.OrchestrationContext, input):
# Retry an activity
result = yield ctx.call_activity(my_activity, input=data, retry_policy=retry_policy)

# Retry a sub-orchestration
result = yield ctx.call_sub_orchestrator(child_orchestrator, input=data, retry_policy=retry_policy)
```

#### Non-retryable errors

For errors that should not be retried (e.g., validation failures, permanent errors), raise a `NonRetryableError`:

```python
from durabletask.task import NonRetryableError

def my_activity(ctx: task.ActivityContext, input):
if input is None:
# This error will bypass retry logic and fail immediately
raise NonRetryableError("Input cannot be None")

# Transient errors (network, timeouts, etc.) will be retried
return call_external_service(input)
```

Even with a retry policy configured, `NonRetryableError` will fail immediately without retrying.

## Getting Started

### Prerequisites
Expand Down Expand Up @@ -194,7 +246,7 @@ Certain aspects like multi-app activities require the full dapr runtime to be ru
```shell
dapr init || true

dapr run --app-id test-app --dapr-grpc-port 4001 --components-path ./examples/components/
dapr run --app-id test-app --dapr-grpc-port 4001 --resources-path ./examples/components/
```

To run the E2E tests on a specific python version (eg: 3.11), run the following command from the project root:
Expand Down
1 change: 0 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
grpcio-tools==1.62.3 # 1.62.X is the latest version before protobuf 1.26.X is used which has breaking changes for Python # supports protobuf 6.x and aligns with generated code
76 changes: 72 additions & 4 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,28 @@ def __init__(
interceptors=interceptors,
options=channel_options,
)
self._channel = channel
self._stub = stubs.TaskHubSidecarServiceStub(channel)
self._logger = shared.get_logger("client", log_handler, log_formatter)

def __enter__(self):
return self

def __exit__(self, exc_type, exc, tb):
try:
self.close()
finally:
return False

def close(self) -> None:
"""Close the underlying gRPC channel."""
try:
# grpc.Channel.close() is idempotent
self._channel.close()
except Exception:
# Best-effort cleanup
pass

def schedule_new_orchestration(
self,
orchestrator: Union[task.Orchestrator[TInput, TOutput], str],
Expand Down Expand Up @@ -188,10 +207,59 @@ def wait_for_orchestration_completion(
) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
grpc_timeout = None if timeout == 0 else timeout
self._logger.info(
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
)
# gRPC timeout mapping (pytest unit tests may pass None explicitly)
grpc_timeout = None if (timeout is None or timeout == 0) else timeout

# If timeout is None or 0, skip pre-checks/polling and call server-side wait directly
if timeout is None or timeout == 0:
self._logger.info(
f"Waiting {'indefinitely' if not timeout else f'up to {timeout}s'} for instance '{instance_id}' to complete."
)
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(
req, timeout=grpc_timeout
)
state = new_orchestration_state(req.instanceId, res)
return state

# For positive timeout, best-effort pre-check and short polling to avoid long server waits
try:
# First check if the orchestration is already completed
current_state = self.get_orchestration_state(
instance_id, fetch_payloads=fetch_payloads
)
if current_state and current_state.runtime_status in [
OrchestrationStatus.COMPLETED,
OrchestrationStatus.FAILED,
OrchestrationStatus.TERMINATED,
]:
return current_state

# Poll for completion with exponential backoff to handle eventual consistency
import time

poll_timeout = min(timeout, 10)
poll_start = time.time()
poll_interval = 0.1

while time.time() - poll_start < poll_timeout:
current_state = self.get_orchestration_state(
instance_id, fetch_payloads=fetch_payloads
)

if current_state and current_state.runtime_status in [
OrchestrationStatus.COMPLETED,
OrchestrationStatus.FAILED,
OrchestrationStatus.TERMINATED,
]:
return current_state

time.sleep(poll_interval)
poll_interval = min(poll_interval * 1.5, 1.0) # Exponential backoff, max 1s
except Exception:
# Ignore pre-check/poll issues (e.g., mocked stubs in unit tests) and fall back
pass

self._logger.info(f"Waiting up to {timeout}s for instance '{instance_id}' to complete.")
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(
req, timeout=grpc_timeout
)
Expand Down
Loading
Loading