Skip to content

Commit

Permalink
Add data converter features in Python (#81) (#306)
Browse files Browse the repository at this point in the history
Progress towards closing #81
  • Loading branch information
antlai-temporal authored Jul 10, 2023
1 parent b40fd1c commit 8e8b101
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ node_modules

# Python stuff
__pycache__
pyrightconfig.json

# Build stuff
bin
Expand Down
66 changes: 66 additions & 0 deletions features/data_converter/binary_protobuf/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import dataclasses

from temporalio import workflow
from temporalio.api.common.v1 import DataBlob
from temporalio.client import WorkflowHandle
from temporalio.converter import (
BinaryNullPayloadConverter,
BinaryProtoPayloadConverter,
CompositePayloadConverter,
DataConverter,
)

from harness.python.feature import (
Runner,
get_workflow_argument_payload,
get_workflow_result_payload,
register_feature,
)

EXPECTED_RESULT = DataBlob(data=bytes.fromhex("deadbeef"))

# An echo workflow
@workflow.defn
class Workflow:
@workflow.run
async def run(self, res: DataBlob) -> DataBlob:
return res


async def check_result(_: Runner, handle: WorkflowHandle) -> None:
# verify client result is DataBlob `0xdeadbeef`
result = await handle.result()
assert result == EXPECTED_RESULT
payload = await get_workflow_result_payload(handle)

encoding = payload.metadata["encoding"].decode("utf-8")
assert encoding == "binary/protobuf"

message_type = payload.metadata["messageType"].decode("utf-8")
assert message_type == "temporal.api.common.v1.DataBlob"

result_in_history = DataBlob()
result_in_history.ParseFromString(payload.data)
assert result == result_in_history

payload_arg = await get_workflow_argument_payload(handle)
assert payload == payload_arg


class DefaultBinProtoPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
super().__init__(
# Disable ByteSlice, ProtoJSON, and JSON converters
BinaryNullPayloadConverter(),
BinaryProtoPayloadConverter(),
)


register_feature(
workflows=[Workflow],
check_result=check_result,
start_options={"arg": EXPECTED_RESULT},
data_converter=dataclasses.replace(
DataConverter.default, payload_converter_class=DefaultBinProtoPayloadConverter
),
)
89 changes: 89 additions & 0 deletions features/data_converter/codec/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import base64
import dataclasses
import json
from typing import Dict, List, Sequence

from temporalio import workflow
from temporalio.api.common.v1 import Payload
from temporalio.client import WorkflowHandle
from temporalio.converter import DataConverter, PayloadCodec

from harness.python.feature import (
Runner,
get_workflow_argument_payload,
get_workflow_result_payload,
register_feature,
)

Result = Dict[str, bool]

EXPECTED_RESULT: Result = {"spec": True}

CODEC_ENCODING = "my_encoding"

# An echo workflow
@workflow.defn
class Workflow:
@workflow.run
async def run(self, res: Result) -> Result:
return res


async def check_result(_: Runner, handle: WorkflowHandle) -> None:
# verify client result is `{"spec": true}`
result = await handle.result()
assert result == EXPECTED_RESULT
payload = await get_workflow_result_payload(handle)

encoding = payload.metadata["encoding"].decode("utf-8")
assert encoding == CODEC_ENCODING

extractedData = base64.b64decode(payload.data)
innerPayload = Payload()
innerPayload.ParseFromString(extractedData)

encoding = innerPayload.metadata["encoding"].decode("utf-8")
assert encoding == "json/plain"

result_in_history = json.loads(innerPayload.data)
assert result == result_in_history

payload_arg = await get_workflow_argument_payload(handle)
assert payload == payload_arg


# Based on samples-python/encryption/codec.py
class Base64PayloadCodec(PayloadCodec):
def __init__(self) -> None:
super().__init__()

async def encode(self, payloads: Sequence[Payload]) -> List[Payload]:
return [
Payload(
metadata={
"encoding": b"my_encoding",
},
data=base64.b64encode(p.SerializeToString()),
)
for p in payloads
]

async def decode(self, payloads: Sequence[Payload]) -> List[Payload]:
ret: List[Payload] = []
for p in payloads:
if p.metadata.get("encoding", b"").decode() != CODEC_ENCODING:
ret.append(p)
continue
ret.append(Payload.FromString(base64.b64decode(p.data)))
return ret


register_feature(
workflows=[Workflow],
check_result=check_result,
start_options={"arg": EXPECTED_RESULT},
data_converter=dataclasses.replace(
DataConverter.default,
payload_codec=Base64PayloadCodec(),
),
)
46 changes: 46 additions & 0 deletions features/data_converter/json/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json
from typing import Dict

from temporalio import workflow
from temporalio.client import WorkflowHandle

from harness.python.feature import (
Runner,
get_workflow_argument_payload,
get_workflow_result_payload,
register_feature,
)

Result = Dict[str, bool]

EXPECTED_RESULT: Result = {"spec": True}

# An echo workflow
@workflow.defn
class Workflow:
@workflow.run
async def run(self, res: Result) -> Result:
return res


async def check_result(_: Runner, handle: WorkflowHandle) -> None:
# verify client result is `{"spec": true}`
result = await handle.result()
assert result == EXPECTED_RESULT
payload = await get_workflow_result_payload(handle)

encoding = payload.metadata["encoding"].decode("utf-8")
assert encoding == "json/plain"

result_in_history = json.loads(payload.data)
assert result == result_in_history

payload_arg = await get_workflow_argument_payload(handle)
assert payload == payload_arg


register_feature(
workflows=[Workflow],
check_result=check_result,
start_options={"arg": EXPECTED_RESULT},
)
47 changes: 47 additions & 0 deletions features/data_converter/json_protobuf/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from temporalio import workflow
from temporalio.api.common.v1 import DataBlob
from temporalio.client import WorkflowHandle
from temporalio.converter import JSONProtoPayloadConverter

from harness.python.feature import (
Runner,
get_workflow_argument_payload,
get_workflow_result_payload,
register_feature,
)

EXPECTED_RESULT = DataBlob(data=bytes.fromhex("deadbeef"))
JSONP_decoder = JSONProtoPayloadConverter()

# An echo workflow
@workflow.defn
class Workflow:
@workflow.run
async def run(self, res: DataBlob) -> DataBlob:
return res


async def check_result(_: Runner, handle: WorkflowHandle) -> None:
# verify client result is DataBlob `0xdeadbeef`
result = await handle.result()
assert result == EXPECTED_RESULT
payload = await get_workflow_result_payload(handle)

encoding = payload.metadata["encoding"].decode("utf-8")
assert encoding == "json/protobuf"

message_type = payload.metadata["messageType"].decode("utf-8")
assert message_type == "temporal.api.common.v1.DataBlob"

result_in_history = JSONP_decoder.from_payload(payload)
assert result == result_in_history

payload_arg = await get_workflow_argument_payload(handle)
assert payload == payload_arg


register_feature(
workflows=[Workflow],
check_result=check_result,
start_options={"arg": EXPECTED_RESULT},
)
20 changes: 20 additions & 0 deletions harness/python/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from typing import Any, Awaitable, Callable, Dict, List, Mapping, Optional, Type, Union

from temporalio import workflow
from temporalio.api.common.v1 import Payload
from temporalio.api.enums.v1 import EventType
from temporalio.client import Client, WorkflowFailureError, WorkflowHandle
from temporalio.converter import DataConverter
from temporalio.exceptions import ActivityError, ApplicationError
Expand Down Expand Up @@ -60,6 +62,24 @@ def register_feature(
)


async def get_workflow_result_payload(handle: WorkflowHandle) -> Payload:
event = await anext(
e
async for e in handle.fetch_history_events()
if e.event_type == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED
)
return event.workflow_execution_completed_event_attributes.result.payloads[0]


async def get_workflow_argument_payload(handle: WorkflowHandle) -> Payload:
event = await anext(
e
async for e in handle.fetch_history_events()
if e.event_type == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
)
return event.workflow_execution_started_event_attributes.input.payloads[0]


@dataclass
class Feature:
file: str
Expand Down

0 comments on commit 8e8b101

Please sign in to comment.