Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
026c572
Creation of DTS example and passing of completionToken
RyanLettieri Jan 22, 2025
136a3d0
Adressing review feedback
RyanLettieri Jan 22, 2025
6df1064
Reverting dapr readme
RyanLettieri Jan 22, 2025
f731c0d
Adding accessTokenManager class for refreshing credential token
RyanLettieri Jan 24, 2025
eb98416
Adding comments to the example
RyanLettieri Jan 24, 2025
0de338d
Adding in requirement for azure-identity
RyanLettieri Jan 24, 2025
6050771
Moving dts logic into its own module
RyanLettieri Jan 28, 2025
f4f98ee
Fixing whitesapce
RyanLettieri Jan 28, 2025
ea837d0
Updating dts client to refresh token
RyanLettieri Jan 29, 2025
f8d79d3
Cleaning up construction of dts objects and improving examples
RyanLettieri Jan 29, 2025
1e67651
Migrating shared access token logic to new grpc class
RyanLettieri Feb 4, 2025
6b1bfd2
Adding log statements to access_token_manager
RyanLettieri Feb 5, 2025
bd56a35
breaking for loop when setting interceptors
RyanLettieri Feb 5, 2025
efc0146
Removing changes to client.py and adding additional steps to readme.md
RyanLettieri Feb 7, 2025
3fd0b08
Refactoring client and worker to pass around interceptors
RyanLettieri Feb 11, 2025
4260d02
Fixing import for DefaultClientInterceptorImpl
RyanLettieri Feb 11, 2025
ec4617c
Adressing round 1 of feedback
RyanLettieri Feb 11, 2025
ed733ea
Fixing interceptor issue
RyanLettieri Feb 12, 2025
99f62d7
Moving some files around to remove dependencies
RyanLettieri Feb 12, 2025
f9d55ab
Adressing more feedback
RyanLettieri Feb 12, 2025
ba1ac4f
More review feedback
RyanLettieri Feb 12, 2025
2c251ea
Passing token credential as an argument rather than 2 strings
RyanLettieri Feb 13, 2025
9c65176
More review feedback for token passing
RyanLettieri Feb 13, 2025
877dabb
Addressing None comment and using correct metadata
RyanLettieri Feb 13, 2025
b39ffad
Updating unit tests
RyanLettieri Feb 13, 2025
33c8b11
Fixing the type for the unit test
RyanLettieri Feb 13, 2025
1da819e
Fixing grpc calls
RyanLettieri Feb 13, 2025
f690264
Merge branch 'main' into durabletask-scheduler
RyanLettieri Feb 13, 2025
6142220
Fix linter errors and update documentation
cgillum Feb 14, 2025
58f4f93
Specifying version reqiuirement for pyproject.toml
RyanLettieri Feb 18, 2025
d82c1b7
Updating README
RyanLettieri Feb 18, 2025
b3a099e
Adding comment for credential type
RyanLettieri Feb 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Added `set_custom_status` orchestrator API ([#31](https://github.com/microsoft/durabletask-python/pull/31)) - contributed by [@famarting](https://github.com/famarting)
- Added `purge_orchestration` client API ([#34](https://github.com/microsoft/durabletask-python/pull/34)) - contributed by [@famarting](https://github.com/famarting)
- Added new `durabletask-azuremanaged` package for use with the [Durable Task Scheduler](https://techcommunity.microsoft.com/blog/appsonazureblog/announcing-limited-early-access-of-the-durable-task-scheduler-for-azure-durable-/4286526) - by [@RyanLettieri](https://github.com/RyanLettieri)

### Changes

Expand Down
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
# Durable Task Client SDK for Python
# Durable Task SDK for Python

[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Build Validation](https://github.com/microsoft/durabletask-python/actions/workflows/pr-validation.yml/badge.svg)](https://github.com/microsoft/durabletask-python/actions/workflows/pr-validation.yml)
[![PyPI version](https://badge.fury.io/py/durabletask.svg)](https://badge.fury.io/py/durabletask)

This repo contains a Python client SDK for use with the [Durable Task Framework for Go](https://github.com/microsoft/durabletask-go) and [Dapr Workflow](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code.
This repo contains a Python SDK for use with the [Azure Durable Task Scheduler](https://techcommunity.microsoft.com/blog/appsonazureblog/announcing-limited-early-access-of-the-durable-task-scheduler-for-azure-durable-/4286526) and the [Durable Task Framework for Go](https://github.com/microsoft/durabletask-go). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code.

⚠️ **This SDK is currently under active development and is not yet ready for production use.** ⚠️

> Note that this project is **not** currently affiliated with the [Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) project for Azure Functions. If you are looking for a Python SDK for Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python).

> Note that this SDK is **not** currently compatible with [Azure Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview). If you are looking for a Python SDK for Azure Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python).

## Supported patterns

Expand Down
Empty file.
Empty file.
30 changes: 30 additions & 0 deletions durabletask-azuremanaged/durabletask/azuremanaged/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from azure.core.credentials import TokenCredential

from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import \
DTSDefaultClientInterceptorImpl
from durabletask.client import TaskHubGrpcClient


# Client class used for Durable Task Scheduler (DTS)
class DurableTaskSchedulerClient(TaskHubGrpcClient):
def __init__(self, *,
host_address: str,
taskhub: str,
token_credential: TokenCredential,
secure_channel: bool = True):

if not taskhub:
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")

interceptors = [DTSDefaultClientInterceptorImpl(token_credential, taskhub)]

# We pass in None for the metadata so we don't construct an additional interceptor in the parent class
# Since the parent class doesn't use anything metadata for anything else, we can set it as None
super().__init__(
host_address=host_address,
secure_channel=secure_channel,
metadata=None,
interceptors=interceptors)
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from datetime import datetime, timedelta, timezone
from typing import Optional

from azure.core.credentials import AccessToken, TokenCredential

import durabletask.internal.shared as shared


# By default, when there's 10minutes left before the token expires, refresh the token
class AccessTokenManager:

_token: Optional[AccessToken]

def __init__(self, token_credential: Optional[TokenCredential], refresh_interval_seconds: int = 600):
self._scope = "https://durabletask.io/.default"
self._refresh_interval_seconds = refresh_interval_seconds
self._logger = shared.get_logger("token_manager")

self._credential = token_credential

if self._credential is not None:
self._token = self._credential.get_token(self._scope)
self.expiry_time = datetime.fromtimestamp(self._token.expires_on, tz=timezone.utc)
else:
self._token = None
self.expiry_time = None

def get_access_token(self) -> Optional[AccessToken]:
if self._token is None or self.is_token_expired():
self.refresh_token()
return self._token

# Checks if the token is expired, or if it will expire in the next "refresh_interval_seconds" seconds.
# For example, if the token is created to have a lifespan of 2 hours, and the refresh buffer is set to 30 minutes,
# We will grab a new token when there're 30minutes left on the lifespan of the token
def is_token_expired(self) -> bool:
if self.expiry_time is None:
return True
return datetime.now(timezone.utc) >= (self.expiry_time - timedelta(seconds=self._refresh_interval_seconds))

def refresh_token(self):
if self._credential is not None:
self._token = self._credential.get_token(self._scope)

# Convert UNIX timestamp to timezone-aware datetime
self.expiry_time = datetime.fromtimestamp(self._token.expires_on, tz=timezone.utc)
self._logger.debug(f"Token refreshed. Expires at: {self.expiry_time}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import grpc
from azure.core.credentials import TokenCredential

from durabletask.azuremanaged.internal.access_token_manager import \
AccessTokenManager
from durabletask.internal.grpc_interceptor import (
DefaultClientInterceptorImpl, _ClientCallDetails)


class DTSDefaultClientInterceptorImpl (DefaultClientInterceptorImpl):
"""The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor,
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
interceptor to add additional headers to all calls as needed."""

def __init__(self, token_credential: TokenCredential, taskhub_name: str):
self._metadata = [("taskhub", taskhub_name)]
super().__init__(self._metadata)

if token_credential is not None:
self._token_credential = token_credential
self._token_manager = AccessTokenManager(token_credential=self._token_credential)
access_token = self._token_manager.get_access_token()
if access_token is not None:
self._metadata.append(("authorization", f"Bearer {access_token.token}"))

def _intercept_call(
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
"""Internal intercept_call implementation which adds metadata to grpc metadata in the RPC
call details."""
# Refresh the auth token if it is present and needed
if self._metadata is not None:
for i, (key, _) in enumerate(self._metadata):
if key.lower() == "authorization": # Ensure case-insensitive comparison
new_token = self._token_manager.get_access_token() # Get the new token
if new_token is not None:
self._metadata[i] = ("authorization", f"Bearer {new_token.token}") # Update the token

return super()._intercept_call(client_call_details)
30 changes: 30 additions & 0 deletions durabletask-azuremanaged/durabletask/azuremanaged/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from azure.core.credentials import TokenCredential

from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import \
DTSDefaultClientInterceptorImpl
from durabletask.worker import TaskHubGrpcWorker


# Worker class used for Durable Task Scheduler (DTS)
class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
def __init__(self, *,
host_address: str,
taskhub: str,
token_credential: TokenCredential,
secure_channel: bool = True):

if not taskhub:
raise ValueError("The taskhub value cannot be empty.")

interceptors = [DTSDefaultClientInterceptorImpl(token_credential, taskhub)]

# We pass in None for the metadata so we don't construct an additional interceptor in the parent class
# Since the parent class doesn't use anything metadata for anything else, we can set it as None
super().__init__(
host_address=host_address,
secure_channel=secure_channel,
metadata=None,
interceptors=interceptors)
41 changes: 41 additions & 0 deletions durabletask-azuremanaged/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

# For more information on pyproject.toml, see https://peps.python.org/pep-0621/

[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "durabletask.azuremanaged"
version = "0.1b1"
description = "Extensions for the Durable Task Python SDK for integrating with the Durable Task Scheduler in Azure"
keywords = [
"durable",
"task",
"workflow",
"azure"
]
classifiers = [
"Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
]
requires-python = ">=3.9"
license = {file = "LICENSE"}
readme = "README.md"
dependencies = [
"durabletask>=0.2.0",
"azure-identity>=1.19.0"
]

[project.urls]
repository = "https://github.com/microsoft/durabletask-python"
changelog = "https://github.com/microsoft/durabletask-python/blob/main/CHANGELOG.md"

[tool.setuptools.packages.find]
include = ["durabletask.azuremanaged", "durabletask.azuremanaged.*"]

[tool.pytest.ini_options]
minversion = "6.0"
26 changes: 22 additions & 4 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Optional, TypeVar, Union
from typing import Any, Optional, Sequence, TypeVar, Union

import grpc
from google.protobuf import wrappers_pb2
Expand All @@ -16,6 +16,7 @@
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
import durabletask.internal.shared as shared
from durabletask import task
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl

TInput = TypeVar('TInput')
TOutput = TypeVar('TOutput')
Expand Down Expand Up @@ -96,8 +97,25 @@ def __init__(self, *,
metadata: Optional[list[tuple[str, str]]] = None,
log_handler: Optional[logging.Handler] = None,
log_formatter: Optional[logging.Formatter] = None,
secure_channel: bool = False):
channel = shared.get_grpc_channel(host_address, metadata, secure_channel=secure_channel)
secure_channel: bool = False,
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None):

# If the caller provided metadata, we need to create a new interceptor for it and
# add it to the list of interceptors.
if interceptors is not None:
interceptors = list(interceptors)
if metadata is not None:
interceptors.append(DefaultClientInterceptorImpl(metadata))
elif metadata is not None:
interceptors = [DefaultClientInterceptorImpl(metadata)]
else:
interceptors = None

channel = shared.get_grpc_channel(
host_address=host_address,
secure_channel=secure_channel,
interceptors=interceptors
)
self._stub = stubs.TaskHubSidecarServiceStub(channel)
self._logger = shared.get_logger("client", log_handler, log_formatter)

Expand All @@ -116,7 +134,7 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=wrappers_pb2.StringValue(value=""),
orchestrationIdReusePolicy=reuse_id_policy,
)
)

self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
Expand Down
12 changes: 6 additions & 6 deletions durabletask/internal/grpc_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,28 @@ class _ClientCallDetails(


class DefaultClientInterceptorImpl (
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
"""The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor,
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
interceptor to add additional headers to all calls as needed."""

def __init__(self, metadata: list[tuple[str, str]]):
super().__init__()
self._metadata = metadata

def _intercept_call(
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
"""Internal intercept_call implementation which adds metadata to grpc metadata in the RPC
call details."""
if self._metadata is None:
return client_call_details

if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
else:
metadata = []

metadata.extend(self._metadata)
client_call_details = _ClientCallDetails(
client_call_details.method, client_call_details.timeout, metadata,
Expand Down
22 changes: 15 additions & 7 deletions durabletask/internal/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
import json
import logging
from types import SimpleNamespace
from typing import Any, Optional
from typing import Any, Optional, Sequence, Union

import grpc

from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
ClientInterceptor = Union[
grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor
]

# Field name used to indicate that an object was automatically serialized
# and should be deserialized as a SimpleNamespace
Expand All @@ -25,8 +30,9 @@ def get_default_host_address() -> str:

def get_grpc_channel(
host_address: Optional[str],
metadata: Optional[list[tuple[str, str]]],
secure_channel: bool = False) -> grpc.Channel:
secure_channel: bool = False,
interceptors: Optional[Sequence[ClientInterceptor]] = None) -> grpc.Channel:

if host_address is None:
host_address = get_default_host_address()

Expand All @@ -44,16 +50,18 @@ def get_grpc_channel(
host_address = host_address[len(protocol):]
break

# Create the base channel
if secure_channel:
channel = grpc.secure_channel(host_address, grpc.ssl_channel_credentials())
else:
channel = grpc.insecure_channel(host_address)

if metadata is not None and len(metadata) > 0:
interceptors = [DefaultClientInterceptorImpl(metadata)]
# Apply interceptors ONLY if they exist
if interceptors:
channel = grpc.intercept_channel(channel, *interceptors)
return channel


def get_logger(
name_suffix: str,
log_handler: Optional[logging.Handler] = None,
Expand Down Expand Up @@ -98,7 +106,7 @@ def default(self, obj):
if dataclasses.is_dataclass(obj):
# Dataclasses are not serializable by default, so we convert them to a dict and mark them for
# automatic deserialization by the receiver
d = dataclasses.asdict(obj) # type: ignore
d = dataclasses.asdict(obj) # type: ignore
d[AUTO_SERIALIZED] = True
return d
elif isinstance(obj, SimpleNamespace):
Expand Down
Loading
Loading