Skip to content

Commit a31886d

Browse files
authored
Serialization context (#1102)
# Problem We must make it possible for users to create custom data converters, the methods of which have access to relevant context. In other words, when implementing any of the above interfaces, the user must be able to implement methods that have access to a context object. # Solution 1. At every point where the data converter is used, we construct a context-aware data converter by calling `data_converter._with_context(context)`. 2. Users take advantage of this by implementing `with_context` methods on any of the following components: - payload converter - failure converter - payload codec 3. In addition, users may implement `with_context` on an `EncodingPayloadConverter`. These are used by the SDK’s default payload converter. 4. In addition, *we* provide a `with_context` method on `AsyncActivityHandle` that *they* can call their own activity context. 5. Add CommandAwarePayloadVisitor that sets command context when visiting payloads; use this for workflow activation decoding and encoding of the activation completion.
1 parent 4a70f3d commit a31886d

File tree

16 files changed

+3058
-256
lines changed

16 files changed

+3058
-256
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ jobs:
148148
with:
149149
submodules: recursive
150150
- uses: dtolnay/rust-toolchain@stable
151+
with:
152+
components: "clippy"
151153
- uses: Swatinem/rust-cache@v2
152154
with:
153155
workspaces: temporalio/bridge -> target

scripts/gen_payload_visitor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import subprocess
22
import sys
33
from pathlib import Path
4-
from typing import Optional, Tuple
4+
from typing import Optional
55

66
from google.protobuf.descriptor import Descriptor, FieldDescriptor
77

@@ -89,6 +89,7 @@ def generate(self, roots: list[Descriptor]) -> str:
8989
9090
from temporalio.api.common.v1.message_pb2 import Payload
9191
92+
9293
class VisitorFunctions(abc.ABC):
9394
\"\"\"Set of functions which can be called by the visitor.
9495
Allows handling payloads as a sequence.

temporalio/activity.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ class _CompleteAsyncError(BaseException):
470470
def payload_converter() -> temporalio.converter.PayloadConverter:
471471
"""Get the payload converter for the current activity.
472472
473+
The returned converter has :py:class:`temporalio.converter.ActivitySerializationContext` set.
473474
This is often used for dynamic activities to convert payloads.
474475
"""
475476
return _Context.current().payload_converter

temporalio/bridge/worker.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@
77

88
from dataclasses import dataclass
99
from typing import (
10-
TYPE_CHECKING,
1110
Awaitable,
1211
Callable,
1312
List,
14-
Mapping,
1513
MutableSequence,
1614
Optional,
1715
Sequence,
@@ -20,7 +18,6 @@
2018
Union,
2119
)
2220

23-
import google.protobuf.internal.containers
2421
from typing_extensions import TypeAlias
2522

2623
import temporalio.api.common.v1
@@ -35,12 +32,13 @@
3532
import temporalio.bridge.temporal_sdk_bridge
3633
import temporalio.converter
3734
import temporalio.exceptions
38-
from temporalio.api.common.v1.message_pb2 import Payload, Payloads
39-
from temporalio.bridge._visitor import PayloadVisitor, VisitorFunctions
35+
from temporalio.api.common.v1.message_pb2 import Payload
36+
from temporalio.bridge._visitor import VisitorFunctions
4037
from temporalio.bridge.temporal_sdk_bridge import (
4138
CustomSlotSupplier as BridgeCustomSlotSupplier,
4239
)
4340
from temporalio.bridge.temporal_sdk_bridge import PollShutdownError # type: ignore
41+
from temporalio.worker._command_aware_visitor import CommandAwarePayloadVisitor
4442

4543

4644
@dataclass
@@ -299,22 +297,22 @@ async def visit_payloads(self, payloads: MutableSequence[Payload]) -> None:
299297

300298

301299
async def decode_activation(
302-
act: temporalio.bridge.proto.workflow_activation.WorkflowActivation,
300+
activation: temporalio.bridge.proto.workflow_activation.WorkflowActivation,
303301
codec: temporalio.converter.PayloadCodec,
304302
decode_headers: bool,
305303
) -> None:
306-
"""Decode the given activation with the codec."""
307-
await PayloadVisitor(
304+
"""Decode all payloads in the activation."""
305+
await CommandAwarePayloadVisitor(
308306
skip_search_attributes=True, skip_headers=not decode_headers
309-
).visit(_Visitor(codec.decode), act)
307+
).visit(_Visitor(codec.decode), activation)
310308

311309

312310
async def encode_completion(
313-
comp: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion,
311+
completion: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion,
314312
codec: temporalio.converter.PayloadCodec,
315313
encode_headers: bool,
316314
) -> None:
317-
"""Recursively encode the given completion with the codec."""
318-
await PayloadVisitor(
315+
"""Encode all payloads in the completion."""
316+
await CommandAwarePayloadVisitor(
319317
skip_search_attributes=True, skip_headers=not encode_headers
320-
).visit(_Visitor(codec.encode), comp)
318+
).visit(_Visitor(codec.encode), completion)

0 commit comments

Comments
 (0)