From 3eb3e6ebccc12c9de9956f9a99a278c61ae94ead Mon Sep 17 00:00:00 2001 From: Antonio Lain Date: Fri, 7 Jul 2023 19:07:50 -0700 Subject: [PATCH] Add data converter features in Python (#81) --- .gitignore | 1 + .../data_converter/binary_protobuf/feature.py | 66 ++++++++++++++ features/data_converter/codec/feature.py | 89 +++++++++++++++++++ features/data_converter/json/feature.py | 46 ++++++++++ .../data_converter/json_protobuf/feature.py | 47 ++++++++++ harness/python/feature.py | 20 +++++ 6 files changed, 269 insertions(+) create mode 100644 features/data_converter/binary_protobuf/feature.py create mode 100644 features/data_converter/codec/feature.py create mode 100644 features/data_converter/json/feature.py create mode 100644 features/data_converter/json_protobuf/feature.py diff --git a/.gitignore b/.gitignore index 520e9cbe..98106050 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ node_modules # Python stuff __pycache__ +pyrightconfig.json # Build stuff bin diff --git a/features/data_converter/binary_protobuf/feature.py b/features/data_converter/binary_protobuf/feature.py new file mode 100644 index 00000000..c6397b77 --- /dev/null +++ b/features/data_converter/binary_protobuf/feature.py @@ -0,0 +1,66 @@ +import dataclasses + +from temporalio import workflow +from temporalio.api.common.v1 import DataBlob +from temporalio.client import WorkflowHandle +from temporalio.converter import ( + BinaryNullPayloadConverter, + BinaryProtoPayloadConverter, + CompositePayloadConverter, + DataConverter, +) + +from harness.python.feature import ( + Runner, + get_workflow_argument_payload, + get_workflow_result_payload, + register_feature, +) + +EXPECTED_RESULT = DataBlob(data=bytes.fromhex("deadbeef")) + +# An echo workflow +@workflow.defn +class Workflow: + @workflow.run + async def run(self, res: DataBlob) -> DataBlob: + return res + + +async def check_result(_: Runner, handle: WorkflowHandle) -> None: + # verify client result is DataBlob `0xdeadbeef` + result = await handle.result() + assert result == EXPECTED_RESULT + payload = await get_workflow_result_payload(handle) + + encoding = payload.metadata["encoding"].decode("utf-8") + assert encoding == "binary/protobuf" + + message_type = payload.metadata["messageType"].decode("utf-8") + assert message_type == "temporal.api.common.v1.DataBlob" + + result_in_history = DataBlob() + result_in_history.ParseFromString(payload.data) + assert result == result_in_history + + payload_arg = await get_workflow_argument_payload(handle) + assert payload == payload_arg + + +class DefaultBinProtoPayloadConverter(CompositePayloadConverter): + def __init__(self) -> None: + super().__init__( + # Disable ByteSlice, ProtoJSON, and JSON converters + BinaryNullPayloadConverter(), + BinaryProtoPayloadConverter(), + ) + + +register_feature( + workflows=[Workflow], + check_result=check_result, + start_options={"arg": EXPECTED_RESULT}, + data_converter=dataclasses.replace( + DataConverter.default, payload_converter_class=DefaultBinProtoPayloadConverter + ), +) diff --git a/features/data_converter/codec/feature.py b/features/data_converter/codec/feature.py new file mode 100644 index 00000000..0e8d4678 --- /dev/null +++ b/features/data_converter/codec/feature.py @@ -0,0 +1,89 @@ +import base64 +import dataclasses +import json +from typing import Dict, List, Sequence + +from temporalio import workflow +from temporalio.api.common.v1 import Payload +from temporalio.client import WorkflowHandle +from temporalio.converter import DataConverter, PayloadCodec + +from harness.python.feature import ( + Runner, + get_workflow_argument_payload, + get_workflow_result_payload, + register_feature, +) + +Result = Dict[str, bool] + +EXPECTED_RESULT: Result = {"spec": True} + +CODEC_ENCODING = "my_encoding" + +# An echo workflow +@workflow.defn +class Workflow: + @workflow.run + async def run(self, res: Result) -> Result: + return res + + +async def check_result(_: Runner, handle: WorkflowHandle) -> None: + # verify client result is `{"spec": true}` + result = await handle.result() + assert result == EXPECTED_RESULT + payload = await get_workflow_result_payload(handle) + + encoding = payload.metadata["encoding"].decode("utf-8") + assert encoding == CODEC_ENCODING + + extractedData = base64.b64decode(payload.data) + innerPayload = Payload() + innerPayload.ParseFromString(extractedData) + + encoding = innerPayload.metadata["encoding"].decode("utf-8") + assert encoding == "json/plain" + + result_in_history = json.loads(innerPayload.data) + assert result == result_in_history + + payload_arg = await get_workflow_argument_payload(handle) + assert payload == payload_arg + + +# Based on samples-python/encryption/codec.py +class Base64PayloadCodec(PayloadCodec): + def __init__(self) -> None: + super().__init__() + + async def encode(self, payloads: Sequence[Payload]) -> List[Payload]: + return [ + Payload( + metadata={ + "encoding": b"my_encoding", + }, + data=base64.b64encode(p.SerializeToString()), + ) + for p in payloads + ] + + async def decode(self, payloads: Sequence[Payload]) -> List[Payload]: + ret: List[Payload] = [] + for p in payloads: + if p.metadata.get("encoding", b"").decode() != CODEC_ENCODING: + ret.append(p) + continue + ret.append(Payload.FromString(base64.b64decode(p.data))) + return ret + + +register_feature( + workflows=[Workflow], + check_result=check_result, + start_options={"arg": EXPECTED_RESULT}, + data_converter=dataclasses.replace( + DataConverter.default, + payload_codec=Base64PayloadCodec(), + ), +) diff --git a/features/data_converter/json/feature.py b/features/data_converter/json/feature.py new file mode 100644 index 00000000..002cbd33 --- /dev/null +++ b/features/data_converter/json/feature.py @@ -0,0 +1,46 @@ +import json +from typing import Dict + +from temporalio import workflow +from temporalio.client import WorkflowHandle + +from harness.python.feature import ( + Runner, + get_workflow_argument_payload, + get_workflow_result_payload, + register_feature, +) + +Result = Dict[str, bool] + +EXPECTED_RESULT: Result = {"spec": True} + +# An echo workflow +@workflow.defn +class Workflow: + @workflow.run + async def run(self, res: Result) -> Result: + return res + + +async def check_result(_: Runner, handle: WorkflowHandle) -> None: + # verify client result is `{"spec": true}` + result = await handle.result() + assert result == EXPECTED_RESULT + payload = await get_workflow_result_payload(handle) + + encoding = payload.metadata["encoding"].decode("utf-8") + assert encoding == "json/plain" + + result_in_history = json.loads(payload.data) + assert result == result_in_history + + payload_arg = await get_workflow_argument_payload(handle) + assert payload == payload_arg + + +register_feature( + workflows=[Workflow], + check_result=check_result, + start_options={"arg": EXPECTED_RESULT}, +) diff --git a/features/data_converter/json_protobuf/feature.py b/features/data_converter/json_protobuf/feature.py new file mode 100644 index 00000000..cbfb060a --- /dev/null +++ b/features/data_converter/json_protobuf/feature.py @@ -0,0 +1,47 @@ +from temporalio import workflow +from temporalio.api.common.v1 import DataBlob +from temporalio.client import WorkflowHandle +from temporalio.converter import JSONProtoPayloadConverter + +from harness.python.feature import ( + Runner, + get_workflow_argument_payload, + get_workflow_result_payload, + register_feature, +) + +EXPECTED_RESULT = DataBlob(data=bytes.fromhex("deadbeef")) +JSONP_decoder = JSONProtoPayloadConverter() + +# An echo workflow +@workflow.defn +class Workflow: + @workflow.run + async def run(self, res: DataBlob) -> DataBlob: + return res + + +async def check_result(_: Runner, handle: WorkflowHandle) -> None: + # verify client result is DataBlob `0xdeadbeef` + result = await handle.result() + assert result == EXPECTED_RESULT + payload = await get_workflow_result_payload(handle) + + encoding = payload.metadata["encoding"].decode("utf-8") + assert encoding == "json/protobuf" + + message_type = payload.metadata["messageType"].decode("utf-8") + assert message_type == "temporal.api.common.v1.DataBlob" + + result_in_history = JSONP_decoder.from_payload(payload) + assert result == result_in_history + + payload_arg = await get_workflow_argument_payload(handle) + assert payload == payload_arg + + +register_feature( + workflows=[Workflow], + check_result=check_result, + start_options={"arg": EXPECTED_RESULT}, +) diff --git a/harness/python/feature.py b/harness/python/feature.py index 8ff1008a..33f913cc 100644 --- a/harness/python/feature.py +++ b/harness/python/feature.py @@ -10,6 +10,8 @@ from typing import Any, Awaitable, Callable, Dict, List, Mapping, Optional, Type, Union from temporalio import workflow +from temporalio.api.common.v1 import Payload +from temporalio.api.enums.v1 import EventType from temporalio.client import Client, WorkflowFailureError, WorkflowHandle from temporalio.converter import DataConverter from temporalio.exceptions import ActivityError, ApplicationError @@ -60,6 +62,24 @@ def register_feature( ) +async def get_workflow_result_payload(handle: WorkflowHandle) -> Payload: + event = await anext( + e + async for e in handle.fetch_history_events() + if e.event_type == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED + ) + return event.workflow_execution_completed_event_attributes.result.payloads[0] + + +async def get_workflow_argument_payload(handle: WorkflowHandle) -> Payload: + event = await anext( + e + async for e in handle.fetch_history_events() + if e.event_type == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED + ) + return event.workflow_execution_started_event_attributes.input.payloads[0] + + @dataclass class Feature: file: str