-
Notifications
You must be signed in to change notification settings - Fork 19
Creation of DTS example and passing of completionToken #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
RyanLettieri
merged 32 commits into
microsoft:main
from
RyanLettieri:durabletask-scheduler
Feb 18, 2025
Merged
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
026c572
Creation of DTS example and passing of completionToken
RyanLettieri 136a3d0
Adressing review feedback
RyanLettieri 6df1064
Reverting dapr readme
RyanLettieri f731c0d
Adding accessTokenManager class for refreshing credential token
RyanLettieri eb98416
Adding comments to the example
RyanLettieri 0de338d
Adding in requirement for azure-identity
RyanLettieri 6050771
Moving dts logic into its own module
RyanLettieri f4f98ee
Fixing whitesapce
RyanLettieri ea837d0
Updating dts client to refresh token
RyanLettieri f8d79d3
Cleaning up construction of dts objects and improving examples
RyanLettieri 1e67651
Migrating shared access token logic to new grpc class
RyanLettieri 6b1bfd2
Adding log statements to access_token_manager
RyanLettieri bd56a35
breaking for loop when setting interceptors
RyanLettieri efc0146
Removing changes to client.py and adding additional steps to readme.md
RyanLettieri 3fd0b08
Refactoring client and worker to pass around interceptors
RyanLettieri 4260d02
Fixing import for DefaultClientInterceptorImpl
RyanLettieri ec4617c
Adressing round 1 of feedback
RyanLettieri ed733ea
Fixing interceptor issue
RyanLettieri 99f62d7
Moving some files around to remove dependencies
RyanLettieri f9d55ab
Adressing more feedback
RyanLettieri ba1ac4f
More review feedback
RyanLettieri 2c251ea
Passing token credential as an argument rather than 2 strings
RyanLettieri 9c65176
More review feedback for token passing
RyanLettieri 877dabb
Addressing None comment and using correct metadata
RyanLettieri b39ffad
Updating unit tests
RyanLettieri 33c8b11
Fixing the type for the unit test
RyanLettieri 1da819e
Fixing grpc calls
RyanLettieri f690264
Merge branch 'main' into durabletask-scheduler
RyanLettieri 6142220
Fix linter errors and update documentation
cgillum 58f4f93
Specifying version reqiuirement for pyproject.toml
RyanLettieri d82c1b7
Updating README
RyanLettieri b3a099e
Adding comment for credential type
RyanLettieri File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Empty file.
Empty file.
29 changes: 29 additions & 0 deletions
29
durabletask-azuremanaged/durabletask/azuremanaged/client.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
|
|
||
| from typing import Optional | ||
| from durabletask.client import TaskHubGrpcClient, OrchestrationStatus | ||
| from durabletask.azuremanaged.internal.access_token_manager import AccessTokenManager | ||
| from durabletask.azuremanaged.durabletask_grpc_interceptor import DTSDefaultClientInterceptorImpl | ||
| from azure.core.credentials import TokenCredential | ||
|
|
||
| # Client class used for Durable Task Scheduler (DTS) | ||
| class DurableTaskSchedulerClient(TaskHubGrpcClient): | ||
| def __init__(self, *, | ||
| host_address: str, | ||
| taskhub: str, | ||
| token_credential: TokenCredential = None, | ||
| secure_channel: Optional[bool] = True): | ||
|
|
||
| if taskhub == None: | ||
| raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub") | ||
|
|
||
| self._interceptors = [DTSDefaultClientInterceptorImpl(token_credential, taskhub)] | ||
|
|
||
| # We pass in None for the metadata so we don't construct an additional interceptor in the parent class | ||
| # Since the parent class doesn't use anything metadata for anything else, we can set it as None | ||
| super().__init__( | ||
| host_address=host_address, | ||
| secure_channel=secure_channel, | ||
| metadata=None, | ||
| interceptors=self._interceptors) | ||
35 changes: 35 additions & 0 deletions
35
durabletask-azuremanaged/durabletask/azuremanaged/durabletask_grpc_interceptor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
|
|
||
| from durabletask.internal.grpc_interceptor import _ClientCallDetails, DefaultClientInterceptorImpl | ||
| from durabletask.azuremanaged.internal.access_token_manager import AccessTokenManager | ||
| from azure.core.credentials import TokenCredential | ||
| import grpc | ||
|
|
||
| class DTSDefaultClientInterceptorImpl (DefaultClientInterceptorImpl): | ||
| """The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, | ||
| StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an | ||
| interceptor to add additional headers to all calls as needed.""" | ||
|
|
||
| def __init__(self, token_credential: TokenCredential, taskhub_name: str): | ||
| metadata = [("taskhub", taskhub_name)] | ||
RyanLettieri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| super().__init__(metadata) | ||
|
|
||
| if token_credential is not None: | ||
| self._token_credential = token_credential | ||
| self._token_manager = AccessTokenManager(token_credential=self._token_credential) | ||
| token = self._token_manager.get_access_token() | ||
| self._metadata.append(("authorization", token)) | ||
|
|
||
| def _intercept_call( | ||
| self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails: | ||
| """Internal intercept_call implementation which adds metadata to grpc metadata in the RPC | ||
| call details.""" | ||
| # Refresh the auth token if it is present and needed | ||
| if self._metadata is not None: | ||
| for i, (key, _) in enumerate(self._metadata): | ||
| if key.lower() == "authorization": # Ensure case-insensitive comparison | ||
| new_token = self._token_manager.get_access_token() # Get the new token | ||
| self._metadata[i] = ("authorization", new_token) # Update the token | ||
|
|
||
| return super()._intercept_call(client_call_details) | ||
40 changes: 40 additions & 0 deletions
40
durabletask-azuremanaged/durabletask/azuremanaged/internal/access_token_manager.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
| from azure.identity import DefaultAzureCredential, ManagedIdentityCredential | ||
| from datetime import datetime, timedelta, timezone | ||
| from typing import Optional | ||
| import durabletask.internal.shared as shared | ||
| from azure.core.credentials import TokenCredential | ||
|
|
||
| # By default, when there's 10minutes left before the token expires, refresh the token | ||
| class AccessTokenManager: | ||
| def __init__(self, refresh_interval_seconds: int = 600, token_credential: TokenCredential = None): | ||
| self._scope = "https://durabletask.io/.default" | ||
| self._refresh_interval_seconds = refresh_interval_seconds | ||
| self._logger = shared.get_logger("token_manager") | ||
|
|
||
| self._credential = token_credential | ||
|
|
||
| self._token = self._credential.get_token(self._scope) | ||
| self.expiry_time = None | ||
|
|
||
| def get_access_token(self) -> str: | ||
| if self._token is None or self.is_token_expired(): | ||
| self.refresh_token() | ||
| return self._token | ||
|
|
||
| # Checks if the token is expired, or if it will expire in the next "refresh_interval_seconds" seconds. | ||
| # For example, if the token is created to have a lifespan of 2 hours, and the refresh buffer is set to 30 minutes, | ||
| # We will grab a new token when there're 30minutes left on the lifespan of the token | ||
| def is_token_expired(self) -> bool: | ||
| if self.expiry_time is None: | ||
| return True | ||
| return datetime.now(timezone.utc) >= (self.expiry_time - timedelta(seconds=self._refresh_interval_seconds)) | ||
|
|
||
| def refresh_token(self): | ||
| new_token = self._credential.get_token(self._scope) | ||
| self._token = f"Bearer {new_token.token}" | ||
|
|
||
| # Convert UNIX timestamp to timezone-aware datetime | ||
| self.expiry_time = datetime.fromtimestamp(new_token.expires_on, tz=timezone.utc) | ||
| self._logger.debug(f"Token refreshed. Expires at: {self.expiry_time}") |
29 changes: 29 additions & 0 deletions
29
durabletask-azuremanaged/durabletask/azuremanaged/worker.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
|
|
||
| from typing import Optional | ||
| from durabletask.worker import TaskHubGrpcWorker | ||
| from durabletask.azuremanaged.internal.access_token_manager import AccessTokenManager | ||
| from durabletask.azuremanaged.durabletask_grpc_interceptor import DTSDefaultClientInterceptorImpl | ||
| from azure.core.credentials import TokenCredential | ||
|
|
||
| # Worker class used for Durable Task Scheduler (DTS) | ||
| class DurableTaskSchedulerWorker(TaskHubGrpcWorker): | ||
| def __init__(self, *, | ||
| host_address: str, | ||
| taskhub: str, | ||
| token_credential: TokenCredential = None, | ||
RyanLettieri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| secure_channel: Optional[bool] = True): | ||
|
|
||
| if taskhub == None: | ||
| raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub") | ||
|
|
||
| interceptors = [DTSDefaultClientInterceptorImpl(token_credential, taskhub)] | ||
|
|
||
| # We pass in None for the metadata so we don't construct an additional interceptor in the parent class | ||
| # Since the parent class doesn't use anything metadata for anything else, we can set it as None | ||
| super().__init__( | ||
| host_address=host_address, | ||
| secure_channel=secure_channel, | ||
| metadata=None, | ||
| interceptors=interceptors) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
|
|
||
| # For more information on pyproject.toml, see https://peps.python.org/pep-0621/ | ||
|
|
||
| [build-system] | ||
| requires = ["setuptools", "wheel"] | ||
| build-backend = "setuptools.build_meta" | ||
|
|
||
| [project] | ||
| name = "durabletask.azuremanaged" | ||
| version = "0.1b1" | ||
| description = "Extensions for the Durable Task Python SDK for integrating with the Durable Task Scheduler in Azure" | ||
| keywords = [ | ||
| "durable", | ||
| "task", | ||
| "workflow", | ||
| "azure" | ||
| ] | ||
| classifiers = [ | ||
| "Development Status :: 3 - Alpha", | ||
| "Programming Language :: Python :: 3", | ||
| "License :: OSI Approved :: MIT License", | ||
| ] | ||
| requires-python = ">=3.9" | ||
| license = {file = "LICENSE"} | ||
| readme = "README.md" | ||
| dependencies = [ | ||
| "durabletask", | ||
RyanLettieri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "azure-identity" | ||
RyanLettieri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ] | ||
|
|
||
| [project.urls] | ||
| repository = "https://github.com/microsoft/durabletask-python" | ||
| changelog = "https://github.com/microsoft/durabletask-python/blob/main/CHANGELOG.md" | ||
|
|
||
| [tool.setuptools.packages.find] | ||
| include = ["durabletask.azuremanaged", "durabletask.azuremanaged.*"] | ||
|
|
||
| [tool.pytest.ini_options] | ||
| minversion = "6.0" | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.