Skip to content

Commit

Permalink
Implement Nexus Serializer for Temporal Payloads (#5126)
Browse files Browse the repository at this point in the history
**What changed?**

See description

**Why?**

The server should do best effort conversion between Nexus Content and
Temporal Payloads so pure Temporal applications don't have to deal with
the separate Nexus Content concept.

**How did you test it?**

Added tests.

**Potential risks**

Non-standard content is converter into a Payload with metadata encoding
of `unknown/nexus-content`. Users will need to prepare for that or their
Nexus requests may fail.
I believe this is the best option we have to avoid lossy conversion and
expose as much information and empower the application.
I considered using `binary/plain` for non-standard content but I'd
rather not have these payloads accidentally slip into user's
applications.

**Is hotfix candidate?**

Far from it.
  • Loading branch information
bergundy authored Nov 17, 2023
1 parent 3f05eeb commit 4bd580f
Show file tree
Hide file tree
Showing 4 changed files with 412 additions and 0 deletions.
185 changes: 185 additions & 0 deletions common/nexus/payload_serializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// The MIT License
//
// Copyright (c) 2023 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package nexus

import (
"errors"
"fmt"
"maps"
"mime"

"github.com/nexus-rpc/sdk-go/nexus"
commonpb "go.temporal.io/api/common/v1"
)

type payloadSerializer struct{}

var errSerializer = errors.New("serializer error")

// Deserialize implements nexus.Serializer.
func (payloadSerializer) Deserialize(content *nexus.Content, v any) error {
payloadRef, ok := v.(**commonpb.Payload)
if !ok {
return fmt.Errorf("%w: cannot deserialize into %v", errSerializer, v)
}

payload := &commonpb.Payload{}
*payloadRef = payload
payload.Metadata = make(map[string][]byte)
payload.Data = content.Data

h := maps.Clone(content.Header)
// We assume that encoding is handled by the transport layer and the content is decoded.
delete(h, "encoding")
// Length can safely be ignored.
delete(h, "length")

if len(h) > 1 {
setUnknownNexusContent(h, payload.Metadata)
return nil
}

contentType := h.Get("type")
if contentType == "" {
if len(h) == 0 && len(content.Data) == 0 {
payload.Metadata["encoding"] = []byte("binary/null")
} else {
setUnknownNexusContent(h, payload.Metadata)
}
return nil
}

mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
setUnknownNexusContent(h, payload.Metadata)
return nil
}

switch mediaType {
case "application/x-temporal-payload":
err := payload.Unmarshal(content.Data)
if err != nil {
return err
}
case "application/json":
if len(params) == 0 {
payload.Metadata["encoding"] = []byte("json/plain")
} else if len(params) == 2 && params["format"] == "protobuf" && params["message-type"] != "" {
payload.Metadata["encoding"] = []byte("json/protobuf")
payload.Metadata["messageType"] = []byte(params["message-type"])
} else {
setUnknownNexusContent(h, payload.Metadata)
}
case "application/x-protobuf":
if len(params) == 1 && params["message-type"] != "" {
payload.Metadata["encoding"] = []byte("binary/protobuf")
payload.Metadata["messageType"] = []byte(params["message-type"])
} else {
setUnknownNexusContent(h, payload.Metadata)
}
case "application/octet-stream":
if len(params) == 0 {
payload.Metadata["encoding"] = []byte("binary/plain")
} else {
setUnknownNexusContent(h, payload.Metadata)
}
default:
setUnknownNexusContent(h, payload.Metadata)
}
return nil
}

func setUnknownNexusContent(nexusHeader nexus.Header, payloadMetadata map[string][]byte) {
for k, v := range nexusHeader {
payloadMetadata[k] = []byte(v)
}
payloadMetadata["encoding"] = []byte("unknown/nexus-content")
}

// Serialize implements nexus.Serializer.
func (payloadSerializer) Serialize(v any) (*nexus.Content, error) {
payload, ok := v.(*commonpb.Payload)
if !ok {
return nil, fmt.Errorf("%w: cannot serialize %v", errSerializer, v)
}

if payload == nil {
return &nexus.Content{}, nil
}

if payload.GetMetadata() == nil {
return xTemporalPayload(payload)
}

content := nexus.Content{Header: nexus.Header{}, Data: payload.Data}
encoding := string(payload.Metadata["encoding"])
messageType := string(payload.Metadata["messageType"])

switch encoding {
case "unknown/nexus-content":
for k, v := range payload.Metadata {
if k != "encoding" {
content.Header[k] = string(v)
}
}
case "json/protobuf":
if len(payload.Metadata) != 2 || messageType == "" {
return xTemporalPayload(payload)
}
content.Header["type"] = fmt.Sprintf("application/json; format=protobuf; message-type=%q", messageType)
case "binary/protobuf":
if len(payload.Metadata) != 2 || messageType == "" {
return xTemporalPayload(payload)
}
content.Header["type"] = fmt.Sprintf("application/x-protobuf; message-type=%q", messageType)
case "json/plain":
content.Header["type"] = "application/json"
case "binary/null":
if len(payload.Metadata) != 1 {
return xTemporalPayload(payload)
}
// type is unset
case "binary/plain":
if len(payload.Metadata) != 1 {
return xTemporalPayload(payload)
}
content.Header["type"] = "application/octet-stream"
default:
return xTemporalPayload(payload)
}

return &content, nil
}

func xTemporalPayload(payload *commonpb.Payload) (*nexus.Content, error) {
data, err := payload.Marshal()
if err != nil {
return nil, fmt.Errorf("%w: payload marshal error: %w", errSerializer, err)
}
return &nexus.Content{
Header: nexus.Header{"type": "application/x-temporal-payload"},
Data: data,
}, nil
}

var _ nexus.Serializer = payloadSerializer{}
Loading

0 comments on commit 4bd580f

Please sign in to comment.