diff --git a/common/nexus/payload_serializer.go b/common/nexus/payload_serializer.go new file mode 100644 index 00000000000..c41f2d469ae --- /dev/null +++ b/common/nexus/payload_serializer.go @@ -0,0 +1,185 @@ +// The MIT License +// +// Copyright (c) 2023 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package nexus + +import ( + "errors" + "fmt" + "maps" + "mime" + + "github.com/nexus-rpc/sdk-go/nexus" + commonpb "go.temporal.io/api/common/v1" +) + +type payloadSerializer struct{} + +var errSerializer = errors.New("serializer error") + +// Deserialize implements nexus.Serializer. +func (payloadSerializer) Deserialize(content *nexus.Content, v any) error { + payloadRef, ok := v.(**commonpb.Payload) + if !ok { + return fmt.Errorf("%w: cannot deserialize into %v", errSerializer, v) + } + + payload := &commonpb.Payload{} + *payloadRef = payload + payload.Metadata = make(map[string][]byte) + payload.Data = content.Data + + h := maps.Clone(content.Header) + // We assume that encoding is handled by the transport layer and the content is decoded. + delete(h, "encoding") + // Length can safely be ignored. + delete(h, "length") + + if len(h) > 1 { + setUnknownNexusContent(h, payload.Metadata) + return nil + } + + contentType := h.Get("type") + if contentType == "" { + if len(h) == 0 && len(content.Data) == 0 { + payload.Metadata["encoding"] = []byte("binary/null") + } else { + setUnknownNexusContent(h, payload.Metadata) + } + return nil + } + + mediaType, params, err := mime.ParseMediaType(contentType) + if err != nil { + setUnknownNexusContent(h, payload.Metadata) + return nil + } + + switch mediaType { + case "application/x-temporal-payload": + err := payload.Unmarshal(content.Data) + if err != nil { + return err + } + case "application/json": + if len(params) == 0 { + payload.Metadata["encoding"] = []byte("json/plain") + } else if len(params) == 2 && params["format"] == "protobuf" && params["message-type"] != "" { + payload.Metadata["encoding"] = []byte("json/protobuf") + payload.Metadata["messageType"] = []byte(params["message-type"]) + } else { + setUnknownNexusContent(h, payload.Metadata) + } + case "application/x-protobuf": + if len(params) == 1 && params["message-type"] != "" { + payload.Metadata["encoding"] = []byte("binary/protobuf") + payload.Metadata["messageType"] = []byte(params["message-type"]) + } else { + setUnknownNexusContent(h, payload.Metadata) + } + case "application/octet-stream": + if len(params) == 0 { + payload.Metadata["encoding"] = []byte("binary/plain") + } else { + setUnknownNexusContent(h, payload.Metadata) + } + default: + setUnknownNexusContent(h, payload.Metadata) + } + return nil +} + +func setUnknownNexusContent(nexusHeader nexus.Header, payloadMetadata map[string][]byte) { + for k, v := range nexusHeader { + payloadMetadata[k] = []byte(v) + } + payloadMetadata["encoding"] = []byte("unknown/nexus-content") +} + +// Serialize implements nexus.Serializer. +func (payloadSerializer) Serialize(v any) (*nexus.Content, error) { + payload, ok := v.(*commonpb.Payload) + if !ok { + return nil, fmt.Errorf("%w: cannot serialize %v", errSerializer, v) + } + + if payload == nil { + return &nexus.Content{}, nil + } + + if payload.GetMetadata() == nil { + return xTemporalPayload(payload) + } + + content := nexus.Content{Header: nexus.Header{}, Data: payload.Data} + encoding := string(payload.Metadata["encoding"]) + messageType := string(payload.Metadata["messageType"]) + + switch encoding { + case "unknown/nexus-content": + for k, v := range payload.Metadata { + if k != "encoding" { + content.Header[k] = string(v) + } + } + case "json/protobuf": + if len(payload.Metadata) != 2 || messageType == "" { + return xTemporalPayload(payload) + } + content.Header["type"] = fmt.Sprintf("application/json; format=protobuf; message-type=%q", messageType) + case "binary/protobuf": + if len(payload.Metadata) != 2 || messageType == "" { + return xTemporalPayload(payload) + } + content.Header["type"] = fmt.Sprintf("application/x-protobuf; message-type=%q", messageType) + case "json/plain": + content.Header["type"] = "application/json" + case "binary/null": + if len(payload.Metadata) != 1 { + return xTemporalPayload(payload) + } + // type is unset + case "binary/plain": + if len(payload.Metadata) != 1 { + return xTemporalPayload(payload) + } + content.Header["type"] = "application/octet-stream" + default: + return xTemporalPayload(payload) + } + + return &content, nil +} + +func xTemporalPayload(payload *commonpb.Payload) (*nexus.Content, error) { + data, err := payload.Marshal() + if err != nil { + return nil, fmt.Errorf("%w: payload marshal error: %w", errSerializer, err) + } + return &nexus.Content{ + Header: nexus.Header{"type": "application/x-temporal-payload"}, + Data: data, + }, nil +} + +var _ nexus.Serializer = payloadSerializer{} diff --git a/common/nexus/payload_serializer_test.go b/common/nexus/payload_serializer_test.go new file mode 100644 index 00000000000..a960781c65c --- /dev/null +++ b/common/nexus/payload_serializer_test.go @@ -0,0 +1,221 @@ +// The MIT License +// +// Copyright (c) 2023 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package nexus + +import ( + "testing" + + "github.com/nexus-rpc/sdk-go/nexus" + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/converter" +) + +func mustToPayload(t *testing.T, v any) *commonpb.Payload { + conv := converter.GetDefaultDataConverter() + payload, err := conv.ToPayload(v) + require.NoError(t, err) + return payload +} + +func TestNexusPayloadSerializer(t *testing.T) { + t.Parallel() + + type testcase struct { + name string + inputPayload *commonpb.Payload + // defaults to inputPayload + expectedPayload *commonpb.Payload + header nexus.Header + } + cases := []testcase{ + { + name: "json", + inputPayload: mustToPayload(t, "foo"), + header: nexus.Header{"type": "application/json"}, + }, + { + name: "bytes", + inputPayload: mustToPayload(t, []byte("foo")), + header: nexus.Header{"type": "application/octet-stream"}, + }, + { + name: "nil", + inputPayload: mustToPayload(t, nil), + header: nexus.Header{}, + }, + { + name: "json proto", + inputPayload: mustToPayload(t, commonpb.RetryPolicy{}), + header: nexus.Header{ + "type": `application/json; format=protobuf; message-type="temporal.api.common.v1.RetryPolicy"`, + }, + }, + { + name: "binary proto with no messageType", + inputPayload: &commonpb.Payload{ + Data: []byte("ignored"), + Metadata: map[string][]byte{ + "encoding": []byte("binary/protobuf"), + }, + }, + header: nexus.Header{ + "type": "application/x-temporal-payload", + }, + }, + { + name: "binary proto with messageType", + inputPayload: &commonpb.Payload{ + Data: []byte("ignored"), + Metadata: map[string][]byte{ + "encoding": []byte("binary/protobuf"), + "messageType": []byte("temporal.api.common.v1.RetryPolicy"), + }, + }, + header: nexus.Header{ + "type": `application/x-protobuf; message-type="temporal.api.common.v1.RetryPolicy"`, + }, + }, + { + name: "nil pointer", + inputPayload: nil, + expectedPayload: &commonpb.Payload{ + Metadata: map[string][]byte{ + "encoding": []byte("binary/null"), + }, + }, + // Yes this is the default value, but this test should have an explicit expectation. + header: nil, + }, + { + name: "nil metadata", + inputPayload: &commonpb.Payload{}, + expectedPayload: &commonpb.Payload{ + Metadata: map[string][]byte{}, + Data: []byte{}, + }, + header: nexus.Header{"type": "application/x-temporal-payload"}, + }, + { + name: "non-standard encoding", + inputPayload: &commonpb.Payload{ + Data: []byte("some-data"), + Metadata: map[string][]byte{ + "encoding": []byte("non-standard"), + }, + }, + header: nexus.Header{"type": "application/x-temporal-payload"}, + }, + { + name: "non-standard metadata field", + inputPayload: &commonpb.Payload{ + Data: []byte("some-data"), + Metadata: map[string][]byte{ + "encoding": []byte("binary/plain"), + "non-standard": []byte("value"), + }, + }, + header: nexus.Header{"type": "application/x-temporal-payload"}, + }, + { + name: "nexus content with non-standard header", + inputPayload: &commonpb.Payload{ + Metadata: map[string][]byte{ + "encoding": []byte("unknown/nexus-content"), + "type": []byte("application/json"), + "non-standard": []byte("value"), + }, + Data: []byte(`"data"`), + }, + header: nexus.Header{"non-standard": "value", "type": "application/json"}, + }, + { + name: "nexus content with non-standard content params", + inputPayload: &commonpb.Payload{ + Metadata: map[string][]byte{ + "encoding": []byte("unknown/nexus-content"), + "type": []byte("application/json; something=nonstandard"), + }, + Data: []byte(`"data"`), + }, + header: nexus.Header{"type": "application/json; something=nonstandard"}, + }, + { + name: "nexus content with non-standard media type", + inputPayload: &commonpb.Payload{ + Metadata: map[string][]byte{ + "encoding": []byte("unknown/nexus-content"), + "type": []byte("application/x-www-form-urlencoded"), + }, + Data: []byte(`"data"`), + }, + header: nexus.Header{"type": "application/x-www-form-urlencoded"}, + }, + { + name: "nexus content with unparsable content params", + inputPayload: &commonpb.Payload{ + Metadata: map[string][]byte{ + "encoding": []byte("unknown/nexus-content"), + "type": []byte("application/"), + }, + Data: []byte(`"data"`), + }, + header: nexus.Header{"type": "application/"}, + }, + { + name: "nexus content with length header", + inputPayload: &commonpb.Payload{ + Metadata: map[string][]byte{ + "encoding": []byte("unknown/nexus-content"), + "type": []byte("application/json"), + "length": []byte("4"), + }, + Data: []byte(`"data"`), + }, + expectedPayload: &commonpb.Payload{ + Metadata: map[string][]byte{ + "encoding": []byte("json/plain"), + }, + Data: []byte(`"data"`), + }, + header: nexus.Header{"type": "application/json", "length": "4"}, + }, + } + for _, c := range cases { + c := c + t.Run(c.name, func(t *testing.T) { + t.Parallel() + s := payloadSerializer{} + content, err := s.Serialize(c.inputPayload) + require.NoError(t, err) + require.Equal(t, c.header, content.Header) + var outputPayload *commonpb.Payload + require.NoError(t, s.Deserialize(content, &outputPayload)) + expectedPayload := c.expectedPayload + if expectedPayload == nil { + expectedPayload = c.inputPayload + } + require.Equal(t, expectedPayload, outputPayload) + }) + } +} diff --git a/go.mod b/go.mod index 1085b7ed448..1ed07802778 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/iancoleman/strcase v0.3.0 github.com/jmoiron/sqlx v1.3.4 github.com/lib/pq v1.10.9 + github.com/nexus-rpc/sdk-go v0.0.2 github.com/olekukonko/tablewriter v0.0.5 github.com/olivere/elastic/v7 v7.0.32 github.com/pborman/uuid v1.2.1 @@ -65,6 +66,7 @@ require ( ) require ( + github.com/gorilla/mux v1.8.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect ) diff --git a/go.sum b/go.sum index 471e8174370..7e22641e945 100644 --- a/go.sum +++ b/go.sum @@ -173,6 +173,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfF github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= @@ -246,6 +248,8 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nexus-rpc/sdk-go v0.0.2 h1:7VxbXX6UdSkB6gB4nZH8MqJTx8poIkUa5RA5Cj57ghM= +github.com/nexus-rpc/sdk-go v0.0.2/go.mod h1:uBe8fX151zUW9DhXxwHjji19d6YfnNUqJ81tR3JbwHY= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=