Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data converter features in Python (#81) #306

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ node_modules

# Python stuff
__pycache__
pyrightconfig.json

# Build stuff
bin
Expand Down
66 changes: 66 additions & 0 deletions features/data_converter/binary_protobuf/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import dataclasses

from temporalio import workflow
from temporalio.api.common.v1 import DataBlob
from temporalio.client import WorkflowHandle
from temporalio.converter import (
BinaryNullPayloadConverter,
BinaryProtoPayloadConverter,
CompositePayloadConverter,
DataConverter,
)

from harness.python.feature import (
Runner,
get_workflow_argument_payload,
get_workflow_result_payload,
register_feature,
)

EXPECTED_RESULT = DataBlob(data=bytes.fromhex("deadbeef"))

# An echo workflow
@workflow.defn
class Workflow:
@workflow.run
async def run(self, res: DataBlob) -> DataBlob:
return res


async def check_result(_: Runner, handle: WorkflowHandle) -> None:
# verify client result is DataBlob `0xdeadbeef`
result = await handle.result()
assert result == EXPECTED_RESULT
payload = await get_workflow_result_payload(handle)

encoding = payload.metadata["encoding"].decode("utf-8")
assert encoding == "binary/protobuf"

message_type = payload.metadata["messageType"].decode("utf-8")
assert message_type == "temporal.api.common.v1.DataBlob"

result_in_history = DataBlob()
result_in_history.ParseFromString(payload.data)
assert result == result_in_history

payload_arg = await get_workflow_argument_payload(handle)
assert payload == payload_arg


class DefaultBinProtoPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
super().__init__(
# Disable ByteSlice, ProtoJSON, and JSON converters
BinaryNullPayloadConverter(),
BinaryProtoPayloadConverter(),
)


register_feature(
workflows=[Workflow],
check_result=check_result,
start_options={"arg": EXPECTED_RESULT},
data_converter=dataclasses.replace(
DataConverter.default, payload_converter_class=DefaultBinProtoPayloadConverter
),
)
89 changes: 89 additions & 0 deletions features/data_converter/codec/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import base64
import dataclasses
import json
from typing import Dict, List, Sequence

from temporalio import workflow
from temporalio.api.common.v1 import Payload
from temporalio.client import WorkflowHandle
from temporalio.converter import DataConverter, PayloadCodec

from harness.python.feature import (
Runner,
get_workflow_argument_payload,
get_workflow_result_payload,
register_feature,
)

Result = Dict[str, bool]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably don't need this alias


EXPECTED_RESULT: Result = {"spec": True}

CODEC_ENCODING = "my_encoding"

# An echo workflow
@workflow.defn
class Workflow:
@workflow.run
async def run(self, res: Result) -> Result:
return res


async def check_result(_: Runner, handle: WorkflowHandle) -> None:
# verify client result is `{"spec": true}`
result = await handle.result()
assert result == EXPECTED_RESULT
payload = await get_workflow_result_payload(handle)

encoding = payload.metadata["encoding"].decode("utf-8")
assert encoding == CODEC_ENCODING

extractedData = base64.b64decode(payload.data)
innerPayload = Payload()
innerPayload.ParseFromString(extractedData)

encoding = innerPayload.metadata["encoding"].decode("utf-8")
assert encoding == "json/plain"

result_in_history = json.loads(innerPayload.data)
assert result == result_in_history

payload_arg = await get_workflow_argument_payload(handle)
assert payload == payload_arg


# Based on samples-python/encryption/codec.py
class Base64PayloadCodec(PayloadCodec):
def __init__(self) -> None:
super().__init__()

async def encode(self, payloads: Sequence[Payload]) -> List[Payload]:
return [
Payload(
metadata={
"encoding": b"my_encoding",
},
data=base64.b64encode(p.SerializeToString()),
)
for p in payloads
]

async def decode(self, payloads: Sequence[Payload]) -> List[Payload]:
ret: List[Payload] = []
for p in payloads:
if p.metadata.get("encoding", b"").decode() != CODEC_ENCODING:
ret.append(p)
continue
ret.append(Payload.FromString(base64.b64decode(p.data)))
return ret


register_feature(
workflows=[Workflow],
check_result=check_result,
start_options={"arg": EXPECTED_RESULT},
data_converter=dataclasses.replace(
DataConverter.default,
payload_codec=Base64PayloadCodec(),
),
)
46 changes: 46 additions & 0 deletions features/data_converter/json/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json
from typing import Dict

from temporalio import workflow
from temporalio.client import WorkflowHandle

from harness.python.feature import (
Runner,
get_workflow_argument_payload,
get_workflow_result_payload,
register_feature,
)

Result = Dict[str, bool]

EXPECTED_RESULT: Result = {"spec": True}

# An echo workflow
@workflow.defn
class Workflow:
@workflow.run
async def run(self, res: Result) -> Result:
return res


async def check_result(_: Runner, handle: WorkflowHandle) -> None:
# verify client result is `{"spec": true}`
result = await handle.result()
assert result == EXPECTED_RESULT
payload = await get_workflow_result_payload(handle)

encoding = payload.metadata["encoding"].decode("utf-8")
assert encoding == "json/plain"

result_in_history = json.loads(payload.data)
assert result == result_in_history

payload_arg = await get_workflow_argument_payload(handle)
assert payload == payload_arg


register_feature(
workflows=[Workflow],
check_result=check_result,
start_options={"arg": EXPECTED_RESULT},
)
47 changes: 47 additions & 0 deletions features/data_converter/json_protobuf/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from temporalio import workflow
from temporalio.api.common.v1 import DataBlob
from temporalio.client import WorkflowHandle
from temporalio.converter import JSONProtoPayloadConverter

from harness.python.feature import (
Runner,
get_workflow_argument_payload,
get_workflow_result_payload,
register_feature,
)

EXPECTED_RESULT = DataBlob(data=bytes.fromhex("deadbeef"))
JSONP_decoder = JSONProtoPayloadConverter()

# An echo workflow
@workflow.defn
class Workflow:
@workflow.run
async def run(self, res: DataBlob) -> DataBlob:
return res


async def check_result(_: Runner, handle: WorkflowHandle) -> None:
# verify client result is DataBlob `0xdeadbeef`
result = await handle.result()
assert result == EXPECTED_RESULT
payload = await get_workflow_result_payload(handle)

encoding = payload.metadata["encoding"].decode("utf-8")
assert encoding == "json/protobuf"

message_type = payload.metadata["messageType"].decode("utf-8")
assert message_type == "temporal.api.common.v1.DataBlob"

result_in_history = JSONP_decoder.from_payload(payload)
assert result == result_in_history

payload_arg = await get_workflow_argument_payload(handle)
assert payload == payload_arg


register_feature(
workflows=[Workflow],
check_result=check_result,
start_options={"arg": EXPECTED_RESULT},
)
20 changes: 20 additions & 0 deletions harness/python/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from typing import Any, Awaitable, Callable, Dict, List, Mapping, Optional, Type, Union

from temporalio import workflow
from temporalio.api.common.v1 import Payload
from temporalio.api.enums.v1 import EventType
from temporalio.client import Client, WorkflowFailureError, WorkflowHandle
from temporalio.converter import DataConverter
from temporalio.exceptions import ActivityError, ApplicationError
Expand Down Expand Up @@ -60,6 +62,24 @@ def register_feature(
)


async def get_workflow_result_payload(handle: WorkflowHandle) -> Payload:
event = await anext(
e
async for e in handle.fetch_history_events()
if e.event_type == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED
)
return event.workflow_execution_completed_event_attributes.result.payloads[0]


async def get_workflow_argument_payload(handle: WorkflowHandle) -> Payload:
event = await anext(
e
async for e in handle.fetch_history_events()
if e.event_type == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
)
return event.workflow_execution_started_event_attributes.input.payloads[0]


@dataclass
class Feature:
file: str
Expand Down