Skip to content

Commit

Permalink
Use Payload for SearchAttributes, Memo, and Headers (#117)
Browse files Browse the repository at this point in the history
* Use Payload for SA, Memo, and Headers.

* Use Payload for SA, Memo, and Headers.

* Fix check.
  • Loading branch information
alexshtin authored May 1, 2020
1 parent 6bcd477 commit 5f7230d
Show file tree
Hide file tree
Showing 21 changed files with 171 additions and 142 deletions.
8 changes: 8 additions & 0 deletions encoded/encoded.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,17 @@ type (
// Temporal support using different DataConverters for different activity/childWorkflow in same workflow.
// 2. Activity/Workflow worker that run these activity/childWorkflow, through worker.Options.
DataConverter = internal.DataConverter

// PayloadConverter converts single value to/from payload.
PayloadConverter = internal.PayloadConverter
)

// GetDefaultDataConverter return default data converter used by Temporal worker
func GetDefaultDataConverter() DataConverter {
return internal.DefaultDataConverter
}

// GetDefaultPayloadConverter return default data converter used by Temporal worker
func GetDefaultPayloadConverter() PayloadConverter {
return internal.DefaultPayloadConverter
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/uber-go/tally v3.3.15+incompatible
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
go.temporal.io/temporal-proto v0.20.30
go.temporal.io/temporal-proto v0.20.31
go.uber.org/atomic v1.6.0
go.uber.org/goleak v1.0.0
go.uber.org/zap v1.14.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMW
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.temporal.io/temporal-proto v0.20.30 h1:QxvCfTZ1U686bmlMPTTg0F/dvMvt02m7i3jF3zWEX/E=
go.temporal.io/temporal-proto v0.20.30/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s=
go.temporal.io/temporal-proto v0.20.31 h1:AlY49UhslnoUSV9HvnEewgy0ursxMPrOJAaQZHmDwzM=
go.temporal.io/temporal-proto v0.20.31/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
Expand Down
146 changes: 82 additions & 64 deletions internal/encoded.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,32 @@ type (
ToData(value ...interface{}) (*commonpb.Payloads, error)
// FromData implements conversion of an array of values of different types.
// Useful for deserializing arguments of function invocations.
FromData(input *commonpb.Payloads, valuePtr ...interface{}) error
FromData(input *commonpb.Payloads, valuePtrs ...interface{}) error
}

// defaultDataConverter uses JSON.
defaultDataConverter struct{}
// PayloadConverter converts single value to/from payload.
PayloadConverter interface {
// ToData single value to payload.
ToData(value interface{}) (*commonpb.Payload, error)
// FromData single value from payload.
FromData(input *commonpb.Payload, valuePtr interface{}) error
}

defaultPayloadConverter struct{}

// NameValuePair represent named value.
NameValuePair struct {
Name string
Value interface{}
defaultDataConverter struct {
payloadConverter PayloadConverter
}
)

var (
// DefaultDataConverter is default data converter used by Temporal worker
DefaultDataConverter = &defaultDataConverter{}
// DefaultPayloadConverter is default single value serializer.
DefaultPayloadConverter = &defaultPayloadConverter{}

// DefaultDataConverter is default data converter used by Temporal worker.
DefaultDataConverter = &defaultDataConverter{
payloadConverter: DefaultPayloadConverter,
}

// ErrMetadataIsNotSet is returned when metadata is not set.
ErrMetadataIsNotSet = errors.New("metadata is not set")
Expand All @@ -102,7 +112,7 @@ var (
ErrUnableToSetBytes = errors.New("unable to set []byte value")
)

// getDefaultDataConverter return default data converter used by Temporal worker
// getDefaultDataConverter return default data converter used by Temporal worker.
func getDefaultDataConverter() DataConverter {
return DefaultDataConverter
}
Expand All @@ -114,34 +124,11 @@ func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payload

result := &commonpb.Payloads{}
for i, value := range values {
nvp, ok := value.(NameValuePair)
if !ok {
nvp.Name = fmt.Sprintf("values[%d]", i)
nvp.Value = value
payload, err := dc.payloadConverter.ToData(value)
if err != nil {
return nil, fmt.Errorf("values[%d]: %w", i, err)
}

var payload *commonpb.Payload
if bytes, isByteSlice := nvp.Value.([]byte); isByteSlice {
payload = &commonpb.Payload{
Metadata: map[string][]byte{
metadataEncoding: []byte(metadataEncodingRaw),
metadataName: []byte(nvp.Name),
},
Data: bytes,
}
} else {
data, err := json.Marshal(nvp.Value)
if err != nil {
return nil, fmt.Errorf("%s: %w: %v", nvp.Name, ErrUnableToEncodeJSON, err)
}
payload = &commonpb.Payload{
Metadata: map[string][]byte{
metadataEncoding: []byte(metadataEncodingJSON),
metadataName: []byte(nvp.Name),
},
Data: data,
}
}
result.Payloads = append(result.Payloads, payload)
}

Expand All @@ -158,40 +145,71 @@ func (dc *defaultDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs
break
}

metadata := payload.GetMetadata()
if metadata == nil {
return fmt.Errorf("payload item %d: %w", i, ErrMetadataIsNotSet)
err := dc.payloadConverter.FromData(payload, valuePtrs[i])
if err != nil {
return fmt.Errorf("payload item %d: %w", i, err)
}
}

var name string
if n, ok := metadata[metadataName]; ok {
name = string(n)
} else {
name = fmt.Sprintf("values[%d]", i)
}
return nil
}

var encoding string
if e, ok := metadata[metadataEncoding]; ok {
encoding = string(e)
} else {
return fmt.Errorf("%s: %w", name, ErrEncodingIsNotSet)
func (vs *defaultPayloadConverter) ToData(value interface{}) (*commonpb.Payload, error) {
var payload *commonpb.Payload
if bytes, isByteSlice := value.([]byte); isByteSlice {
payload = &commonpb.Payload{
Metadata: map[string][]byte{
metadataEncoding: []byte(metadataEncodingRaw),
},
Data: bytes,
}
} else {
data, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrUnableToEncodeJSON, err)
}
payload = &commonpb.Payload{
Metadata: map[string][]byte{
metadataEncoding: []byte(metadataEncodingJSON),
},
Data: data,
}
}

return payload, nil
}

func (vs *defaultPayloadConverter) FromData(payload *commonpb.Payload, valuePtr interface{}) error {
if payload == nil {
return nil
}

switch encoding {
case metadataEncodingRaw:
valueBytes := reflect.ValueOf(valuePtrs[i]).Elem()
if !valueBytes.CanSet() {
return fmt.Errorf("%s: %w", name, ErrUnableToSetBytes)
}
valueBytes.SetBytes(payload.GetData())
case metadataEncodingJSON:
err := json.Unmarshal(payload.GetData(), valuePtrs[i])
if err != nil {
return fmt.Errorf("%s: %w: %v", name, ErrUnableToDecodeJSON, err)
}
default:
return fmt.Errorf("%s, encoding %s: %w", name, encoding, ErrEncodingIsNotSupported)
metadata := payload.GetMetadata()
if metadata == nil {
return ErrMetadataIsNotSet
}

var encoding string
if e, ok := metadata[metadataEncoding]; ok {
encoding = string(e)
} else {
return ErrEncodingIsNotSet
}

switch encoding {
case metadataEncodingRaw:
valueBytes := reflect.ValueOf(valuePtr).Elem()
if !valueBytes.CanSet() {
return ErrUnableToSetBytes
}
valueBytes.SetBytes(payload.GetData())
case metadataEncodingJSON:
err := json.Unmarshal(payload.GetData(), valuePtr)
if err != nil {
return fmt.Errorf("%w: %v", ErrUnableToDecodeJSON, err)
}
default:
return fmt.Errorf("encoding %s: %w", encoding, ErrEncodingIsNotSupported)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,10 @@ func Test_ContinueAsNewError(t *testing.T) {
return NewContinueAsNewError(ctx, continueAsNewWfName, a1, a2)
}

headerValue, err := DefaultDataConverter.ToData("test-data")
headerValue, err := DefaultPayloadConverter.ToData("test-data")
assert.NoError(t, err)
header := &commonpb.Header{
Fields: map[string]*commonpb.Payloads{"test": headerValue},
Fields: map[string]*commonpb.Payload{"test": headerValue},
}

s := &WorkflowTestSuite{
Expand Down
10 changes: 5 additions & 5 deletions internal/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import (

// HeaderWriter is an interface to write information to temporal headers
type HeaderWriter interface {
Set(string, *commonpb.Payloads)
Set(string, *commonpb.Payload)
}

// HeaderReader is an interface to read information from temporal headers
type HeaderReader interface {
ForEachKey(handler func(string, *commonpb.Payloads) error) error
ForEachKey(handler func(string, *commonpb.Payload) error) error
}

// ContextPropagator is an interface that determines what information from
Expand All @@ -62,7 +62,7 @@ type headerReader struct {
header *commonpb.Header
}

func (hr *headerReader) ForEachKey(handler func(string, *commonpb.Payloads) error) error {
func (hr *headerReader) ForEachKey(handler func(string, *commonpb.Payload) error) error {
if hr.header == nil {
return nil
}
Expand All @@ -83,7 +83,7 @@ type headerWriter struct {
header *commonpb.Header
}

func (hw *headerWriter) Set(key string, value *commonpb.Payloads) {
func (hw *headerWriter) Set(key string, value *commonpb.Payload) {
if hw.header == nil {
return
}
Expand All @@ -93,7 +93,7 @@ func (hw *headerWriter) Set(key string, value *commonpb.Payloads) {
// NewHeaderWriter returns a header writer interface
func NewHeaderWriter(header *commonpb.Header) HeaderWriter {
if header != nil && header.Fields == nil {
header.Fields = make(map[string]*commonpb.Payloads)
header.Fields = make(map[string]*commonpb.Payload)
}
return &headerWriter{header}
}
30 changes: 15 additions & 15 deletions internal/headers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,48 +37,48 @@ func TestHeaderWriter(t *testing.T) {
name string
initial *commonpb.Header
expected *commonpb.Header
vals map[string]*commonpb.Payloads
vals map[string]*commonpb.Payload
}{
{
"no values",
&commonpb.Header{
Fields: map[string]*commonpb.Payloads{},
Fields: map[string]*commonpb.Payload{},
},
&commonpb.Header{
Fields: map[string]*commonpb.Payloads{},
Fields: map[string]*commonpb.Payload{},
},
map[string]*commonpb.Payloads{},
map[string]*commonpb.Payload{},
},
{
"add values",
&commonpb.Header{
Fields: map[string]*commonpb.Payloads{},
Fields: map[string]*commonpb.Payload{},
},
&commonpb.Header{
Fields: map[string]*commonpb.Payloads{
Fields: map[string]*commonpb.Payload{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
},
map[string]*commonpb.Payloads{
map[string]*commonpb.Payload{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
},
{
"overwrite values",
&commonpb.Header{
Fields: map[string]*commonpb.Payloads{
Fields: map[string]*commonpb.Payload{
"key1": encodeString(t, "unexpected"),
},
},
&commonpb.Header{
Fields: map[string]*commonpb.Payloads{
Fields: map[string]*commonpb.Payload{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
},
map[string]*commonpb.Payloads{
map[string]*commonpb.Payload{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
Expand All @@ -98,8 +98,8 @@ func TestHeaderWriter(t *testing.T) {
}
}

func encodeString(t *testing.T, s string) *commonpb.Payloads {
p, err := DefaultDataConverter.ToData(s)
func encodeString(t *testing.T, s string) *commonpb.Payload {
p, err := DefaultPayloadConverter.ToData(s)
assert.NoError(t, err)
return p
}
Expand All @@ -115,7 +115,7 @@ func TestHeaderReader(t *testing.T) {
{
"valid values",
&commonpb.Header{
Fields: map[string]*commonpb.Payloads{
Fields: map[string]*commonpb.Payload{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
Expand All @@ -126,7 +126,7 @@ func TestHeaderReader(t *testing.T) {
{
"invalid values",
&commonpb.Header{
Fields: map[string]*commonpb.Payloads{
Fields: map[string]*commonpb.Payload{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
Expand All @@ -141,7 +141,7 @@ func TestHeaderReader(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
reader := NewHeaderReader(test.header)
err := reader.ForEachKey(func(key string, _ *commonpb.Payloads) error {
err := reader.ForEachKey(func(key string, _ *commonpb.Payload) error {
if _, ok := test.keys[key]; !ok {
return assert.AnError
}
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func mergeSearchAttributes(current, upsert *commonpb.SearchAttributes) *commonpb
return nil
}
current = &commonpb.SearchAttributes{
IndexedFields: make(map[string]*commonpb.Payloads),
IndexedFields: make(map[string]*commonpb.Payload),
}
}

Expand Down
Loading

0 comments on commit 5f7230d

Please sign in to comment.