Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Health check #670

Merged
merged 31 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
11e372c
Fixes http endpoint being overwritten by gRPC address argument in con…
elena-kolevska Oct 31, 2023
a1275e2
health decorator - first commit
elena-kolevska Feb 7, 2024
7ff3921
Fixes tests
elena-kolevska Feb 7, 2024
15a0b1a
Removes unused imports
elena-kolevska Feb 7, 2024
d51f97b
Ruff format
elena-kolevska Feb 7, 2024
3f6940b
Adds unit test
elena-kolevska Feb 8, 2024
2629231
Repalces wait() with @healthcheck decorator in examples
elena-kolevska Feb 8, 2024
8bcec71
Ruff
elena-kolevska Feb 8, 2024
3a208a2
Linter
elena-kolevska Feb 8, 2024
5bd279c
updates heathcheck decorator to use a global var
elena-kolevska Feb 9, 2024
b12beba
Fixes tests
elena-kolevska Feb 9, 2024
b8eb9de
Merge remote-tracking branch 'upstream/main' into health-decorator
elena-kolevska Feb 9, 2024
d39652d
Linter fixes
elena-kolevska Feb 9, 2024
d9a6e52
Removes healthcheck from examples
elena-kolevska Feb 9, 2024
b21df04
Ruff
elena-kolevska Feb 9, 2024
495f78b
wip
elena-kolevska Feb 10, 2024
afdda30
wip
elena-kolevska Feb 11, 2024
71686af
Merge remote-tracking branch 'upstream/main' into health-decorator
elena-kolevska Feb 11, 2024
9624795
wip
elena-kolevska Feb 11, 2024
476e856
wip
elena-kolevska Feb 11, 2024
70f948f
Set health timeout to 60 seconds
elena-kolevska Feb 12, 2024
47b9431
wip
elena-kolevska Feb 12, 2024
169db4b
Unit tests passing
elena-kolevska Feb 12, 2024
ae56663
Linter
elena-kolevska Feb 12, 2024
60001ea
Refactor and cleanup
elena-kolevska Feb 12, 2024
3ac1106
Refactors client tests for speed and readability
elena-kolevska Feb 13, 2024
2aa2f32
Small fix
elena-kolevska Feb 13, 2024
7517cf7
Add tests performance improvement for actor tests too
elena-kolevska Feb 13, 2024
94b23a0
Cosmetic touch up
elena-kolevska Feb 13, 2024
2a2f3bd
Documents the `DAPR_HEALTH_TIMEOUT` environment variable
elena-kolevska Feb 13, 2024
1e7f332
make healthcheck a static method
elena-kolevska Feb 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions dapr/actor/client/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
communication.
"""

_default_proxy_factory = ActorProxyFactory()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to the instance creation because it was getting invoked on import(dependency load), which was triggering the healthcheck before the fake test servers would start.

_default_proxy_factory = None

def __init__(
self,
Expand Down Expand Up @@ -127,6 +127,13 @@
"""Returns actor type."""
return self._actor_type

@classmethod
def _get_default_factory_instance(cls):
"""Lazily initializes and returns the default ActorProxyFactory instance."""
if cls._default_proxy_factory is None:
cls._default_proxy_factory = ActorProxyFactory()
return cls._default_proxy_factory

Check warning on line 135 in dapr/actor/client/proxy.py

View check run for this annotation

Codecov / codecov/patch

dapr/actor/client/proxy.py#L133-L135

Added lines #L133 - L135 were not covered by tests

@classmethod
def create(
cls,
Expand All @@ -146,8 +153,11 @@

Returns:
:class:`ActorProxy': new Actor Proxy client.
@param actor_proxy_factory:
"""
factory = cls._default_proxy_factory if not actor_proxy_factory else actor_proxy_factory
factory = (
actor_proxy_factory if actor_proxy_factory else cls._get_default_factory_instance()
)
return factory.create(actor_type, actor_id, actor_interface)

async def invoke_method(self, method: str, raw_body: Optional[bytes] = None) -> bytes:
Expand Down
3 changes: 3 additions & 0 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from dapr.clients.exceptions import DaprInternalError, DaprGrpcError
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.health import DaprHealth
from dapr.conf.helpers import GrpcEndpoint
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
Expand Down Expand Up @@ -129,6 +130,8 @@
max_grpc_messsage_length (int, optional): The maximum grpc send and receive
message length in bytes.
"""
DaprHealth.wait_until_ready()

useragent = f'dapr-sdk-python/{__version__}'
if not max_grpc_message_length:
options = [
Expand Down Expand Up @@ -713,8 +716,8 @@
call = self._stub.SaveState(req, metadata=metadata)
await call
return DaprResponse(headers=await call.initial_metadata())
except AioRpcError as e:
raise DaprInternalError(e.details()) from e

Check warning on line 720 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L719-L720

Added lines #L719 - L720 were not covered by tests

async def save_bulk_state(
self, store_name: str, states: List[StateItem], metadata: Optional[MetadataTuple] = None
Expand Down Expand Up @@ -1562,8 +1565,8 @@
try:
call = self._stub.GetMetadata(GrpcEmpty())
_resp = await call
except AioRpcError as err:
raise DaprGrpcError(err) from err

Check warning on line 1569 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L1568-L1569

Added lines #L1568 - L1569 were not covered by tests

response: api_v1.GetMetadataResponse = _resp # type alias
# Convert to more pythonic formats
Expand Down
2 changes: 2 additions & 0 deletions dapr/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dapr.conf import settings
from google.protobuf.message import Message as GrpcMessage


__all__ = [
'DaprClient',
'DaprActorClientBase',
Expand All @@ -32,6 +33,7 @@
'ERROR_CODE_UNKNOWN',
]


from grpc import ( # type: ignore
UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor,
Expand Down
3 changes: 3 additions & 0 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from dapr.clients.exceptions import DaprInternalError, DaprGrpcError
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.health import DaprHealth
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
Expand Down Expand Up @@ -127,6 +128,8 @@ def __init__(
max_grpc_messsage_length (int, optional): The maximum grpc send and receive
message length in bytes.
"""
DaprHealth.wait_until_ready()

useragent = f'dapr-sdk-python/{__version__}'
if not max_grpc_message_length:
options = [
Expand Down
54 changes: 54 additions & 0 deletions dapr/clients/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-

"""
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import urllib.request
import urllib.error
import time

from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, USER_AGENT_HEADER, DAPR_USER_AGENT
from dapr.clients.http.helpers import get_api_url
from dapr.conf import settings


class DaprHealth:
@staticmethod
def wait_until_ready():
health_url = f'{get_api_url()}/healthz/outbound'
headers = {USER_AGENT_HEADER: DAPR_USER_AGENT}
if settings.DAPR_API_TOKEN is not None:
headers[DAPR_API_TOKEN_HEADER] = settings.DAPR_API_TOKEN
timeout = settings.DAPR_HEALTH_TIMEOUT

start = time.time()
while True:
try:
req = urllib.request.Request(health_url, headers=headers)
with urllib.request.urlopen(req, context=DaprHealth.get_ssl_context()) as response:
if 200 <= response.status < 300:
break
except urllib.error.URLError as e:
print(f'Health check on {health_url} failed: {e.reason}')
except Exception as e:
print(f'Unexpected error during health check: {e}')

Check warning on line 43 in dapr/clients/health.py

View check run for this annotation

Codecov / codecov/patch

dapr/clients/health.py#L40-L43

Added lines #L40 - L43 were not covered by tests

remaining = (start + timeout) - time.time()
if remaining <= 0:
raise TimeoutError(f'Dapr health check timed out, after {timeout}.')
time.sleep(min(1, remaining))

Check warning on line 48 in dapr/clients/health.py

View check run for this annotation

Codecov / codecov/patch

dapr/clients/health.py#L45-L48

Added lines #L45 - L48 were not covered by tests

@staticmethod
def get_ssl_context():
# This method is used (overwritten) from tests
# to return context for self-signed certificates
return None

Check warning on line 54 in dapr/clients/health.py

View check run for this annotation

Codecov / codecov/patch

dapr/clients/health.py#L54

Added line #L54 was not covered by tests
24 changes: 10 additions & 14 deletions dapr/clients/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@

from typing import Callable, Mapping, Dict, Optional, Union, Tuple, TYPE_CHECKING

from dapr.clients.http.conf import (
DAPR_API_TOKEN_HEADER,
USER_AGENT_HEADER,
DAPR_USER_AGENT,
CONTENT_TYPE_HEADER,
)
from dapr.clients.health import DaprHealth

if TYPE_CHECKING:
from dapr.serializers import Serializer

from dapr.conf import settings
from dapr.clients.base import DEFAULT_JSON_CONTENT_TYPE
from dapr.clients.exceptions import DaprInternalError, ERROR_CODE_DOES_NOT_EXIST, ERROR_CODE_UNKNOWN
from dapr.version import __version__

CONTENT_TYPE_HEADER = 'content-type'
DAPR_API_TOKEN_HEADER = 'dapr-api-token'
USER_AGENT_HEADER = 'User-Agent'
DAPR_USER_AGENT = f'dapr-sdk-python/{__version__}'


class DaprHttpClient:
Expand All @@ -47,18 +49,12 @@ def __init__(
timeout (int, optional): Timeout in seconds, defaults to 60.
headers_callback (lambda: Dict[str, str]], optional): Generates header for each request.
"""
DaprHealth.wait_until_ready()

self._timeout = aiohttp.ClientTimeout(total=timeout)
self._serializer = message_serializer
self._headers_callback = headers_callback

def get_api_url(self) -> str:
if settings.DAPR_HTTP_ENDPOINT:
return '{}/{}'.format(settings.DAPR_HTTP_ENDPOINT, settings.DAPR_API_VERSION)

return 'http://{}:{}/{}'.format(
settings.DAPR_RUNTIME_HOST, settings.DAPR_HTTP_PORT, settings.DAPR_API_VERSION
)

async def send_bytes(
self,
method: str,
Expand Down
21 changes: 21 additions & 0 deletions dapr/clients/http/conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from dapr.version import __version__

CONTENT_TYPE_HEADER = 'content-type'
DAPR_API_TOKEN_HEADER = 'dapr-api-token'
USER_AGENT_HEADER = 'User-Agent'
DAPR_USER_AGENT = f'dapr-sdk-python/{__version__}'
4 changes: 3 additions & 1 deletion dapr/clients/http/dapr_actor_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from typing import Callable, Dict, Optional, Union, TYPE_CHECKING

from dapr.clients.http.helpers import get_api_url

if TYPE_CHECKING:
from dapr.serializers import Serializer

Expand Down Expand Up @@ -145,4 +147,4 @@ async def unregister_timer(self, actor_type: str, actor_id: str, name: str) -> N
await self._client.send_bytes(method='DELETE', url=url, data=None)

def _get_base_url(self, actor_type: str, actor_id: str) -> str:
return '{}/actors/{}/{}'.format(self._client.get_api_url(), actor_type, actor_id)
return '{}/actors/{}/{}'.format(get_api_url(), actor_type, actor_id)
6 changes: 4 additions & 2 deletions dapr/clients/http/dapr_invocation_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
from typing import Callable, Dict, Optional, Union
from multidict import MultiDict

from dapr.clients.http.client import DaprHttpClient, CONTENT_TYPE_HEADER
from dapr.clients.http.client import DaprHttpClient
from dapr.clients.grpc._helpers import MetadataTuple, GrpcMessage
from dapr.clients.grpc._response import InvokeMethodResponse
from dapr.clients.http.conf import CONTENT_TYPE_HEADER
from dapr.clients.http.helpers import get_api_url
from dapr.serializers import DefaultJSONSerializer
from dapr.version import __version__

Expand Down Expand Up @@ -88,7 +90,7 @@ async def invoke_method_async(

headers[USER_AGENT_HEADER] = DAPR_USER_AGENT

url = f'{self._client.get_api_url()}/invoke/{app_id}/method/{method_name}'
url = f'{get_api_url()}/invoke/{app_id}/method/{method_name}'

if isinstance(data, GrpcMessage):
body = data.SerializeToString()
Expand Down
25 changes: 25 additions & 0 deletions dapr/clients/http/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from dapr.conf import settings


def get_api_url() -> str:
if settings.DAPR_HTTP_ENDPOINT:
return '{}/{}'.format(settings.DAPR_HTTP_ENDPOINT, settings.DAPR_API_VERSION)

return 'http://{}:{}/{}'.format(
settings.DAPR_RUNTIME_HOST, settings.DAPR_HTTP_PORT, settings.DAPR_API_VERSION
)
1 change: 1 addition & 0 deletions dapr/conf/global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
DAPR_HTTP_PORT = 3500
DAPR_GRPC_PORT = 50001
DAPR_API_VERSION = 'v1.0'
DAPR_HEALTH_TIMEOUT = 60 # seconds

DAPR_API_METHOD_INVOCATION_PROTOCOL = 'http'

Expand Down
8 changes: 8 additions & 0 deletions daprdocs/content/en/python-sdk-docs/python-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ If your Dapr instance is configured to require the `DAPR_API_TOKEN` environment
set it in the environment and the client will use it automatically.
You can read more about Dapr API token authentication [here](https://docs.dapr.io/operations/security/api-token/).

##### Health timeout
On client initialisation, a health check is performed against the Dapr sidecar (`/healthz/outboud`).
The client will wait for the sidecar to be up and running before proceeding.

The default timeout is 60 seconds, but it can be overridden by setting the `DAPR_HEALTH_TIMEOUT`
environment variable.


## Error handling
Initially, errors in Dapr followed the [Standard gRPC error model](https://grpc.io/docs/guides/error/#standard-error-model). However, to provide more detailed and informative error messages, in version 1.13 an enhanced error model has been introduced which aligns with the gRPC [Richer error model](https://grpc.io/docs/guides/error/#richer-error-model). In response, the Python SDK implemented `DaprGrpcError`, a custom exception class designed to improve the developer experience.
It's important to note that the transition to using `DaprGrpcError` for all gRPC status exceptions is a work in progress. As of now, not every API call in the SDK has been updated to leverage this custom exception. We are actively working on this enhancement and welcome contributions from the community.
Expand Down
23 changes: 12 additions & 11 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@

These examples demonstrate how to use the Dapr Python SDK:

| Example | Description |
|---------|-------------|
| [Service invocation](./invoke-simple) | Invoke service by passing bytes data
| Example | Description |
|-------------------------------------------------------|-------------|
| [Service invocation](./invoke-simple) | Invoke service by passing bytes data
| [Service invocation (advanced)](./invoke-custom-data) | Invoke service by using custom protobuf message
| [State management](./state_store) | Save and get state to/from the state store
| [Publish & subscribe](./pubsub-simple) | Publish and subscribe to events
| [Bindings](./invoke-binding) | Invoke an output binding to interact with external resources
| [Virtual actors](./demo_actor) | Try Dapr virtual actor features
| [Secrets](./secret_store) | Get secrets from a defined secret store
| [Distributed tracing](./w3c-tracing) | Leverage Dapr's built-in tracing support
| [Distributed lock](./distributed_lock) | Keep your application safe from race conditions by using distributed locks
| [Workflow](./demo_workflow) | Run a workflow to simulate an order processor
| [State management](./state_store) | Save and get state to/from the state store
| [Publish & subscribe](./pubsub-simple) | Publish and subscribe to events
| [Error handling](./error_handling) | Error handling
| [Bindings](./invoke-binding) | Invoke an output binding to interact with external resources
| [Virtual actors](./demo_actor) | Try Dapr virtual actor features
| [Secrets](./secret_store) | Get secrets from a defined secret store
| [Distributed tracing](./w3c-tracing) | Leverage Dapr's built-in tracing support
| [Distributed lock](./distributed_lock) | Keep your application safe from race conditions by using distributed locks
| [Workflow](./demo_workflow) | Run a workflow to simulate an order processor

## More information

Expand Down
3 changes: 0 additions & 3 deletions examples/configuration/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ async def executeConfiguration():

keys = ['orderId1', 'orderId2']

# Wait for sidecar to be up within 20 seconds.
d.wait(20)

global configuration

# Get one configuration by key.
Expand Down
4 changes: 1 addition & 3 deletions examples/error_handling/error_handling.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
from dapr.clients import DaprClient
from dapr.clients.exceptions import DaprGrpcError


with DaprClient() as d:
storeName = 'statestore'

key = 'key||'
value = 'value_1'

# Wait for sidecar to be up within 5 seconds.
d.wait(5)

# Save single state.
try:
d.save_state(store_name=storeName, key=key, value=value)
Expand Down
7 changes: 2 additions & 5 deletions examples/state_store/state_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType
from dapr.clients.grpc._state import StateItem


with DaprClient() as d:
storeName = 'statestore'

Expand All @@ -22,9 +23,6 @@
yet_another_key = 'key_3'
yet_another_value = 'value_3'

# Wait for sidecar to be up within 5 seconds.
d.wait(5)

# Save single state.
d.save_state(store_name=storeName, key=key, value=value)
print(f'State store has successfully saved {value} with {key} as key')
Expand Down Expand Up @@ -63,8 +61,7 @@
# StatusCode should be StatusCode.ABORTED.
print(f'Cannot save bulk due to bad etags. ErrorCode={err.code()}')

# For detailed error messages from the dapr runtime:
# print(f"Details={err.details()})
# For detailed error messages from the dapr runtime: # print(f"Details={err.details()})

# Get one state by key.
state = d.get_state(store_name=storeName, key=key, state_metadata={'metakey': 'metavalue'})
Expand Down
Loading
Loading