Skip to content

Commit

Permalink
backend: do not base64 encode json payload in msgpack serde (#1037)
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand authored Jan 30, 2024
1 parent fc04bda commit b56cb2b
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 12 deletions.
4 changes: 1 addition & 3 deletions backend/pkg/serde/msgpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package serde
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -54,8 +53,7 @@ func (d MsgPackSerde) DeserializePayload(_ context.Context, record *kgo.Record,
return &RecordPayload{}, fmt.Errorf("decoding message pack payload: %w", err)
}

b64 := base64.StdEncoding.EncodeToString(payload)
jsonBytes, err := json.Marshal(b64)
jsonBytes, err := json.Marshal(obj)
if err != nil {
return &RecordPayload{}, fmt.Errorf("decoding message pack payload: %w", err)
}
Expand Down
53 changes: 44 additions & 9 deletions backend/pkg/serde/msgpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ package serde

import (
"context"
"encoding/json"
"os"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -45,15 +47,17 @@ func TestMsgPackSerde_DeserializePayload(t *testing.T) {

tests := []struct {
name string
record *kgo.Record
record func() *kgo.Record
payloadType PayloadType
validationFunc func(t *testing.T, payload RecordPayload, err error)
}{
{
name: "msgpack in value",
record: &kgo.Record{
Value: msgData,
Topic: "msgpack_topic",
record: func() *kgo.Record {
return &kgo.Record{
Value: msgData,
Topic: "msgpack_topic",
}
},
payloadType: PayloadTypeValue,
validationFunc: func(t *testing.T, payload RecordPayload, err error) {
Expand All @@ -62,7 +66,10 @@ func TestMsgPackSerde_DeserializePayload(t *testing.T) {
assert.Nil(t, payload.SchemaID)
assert.Equal(t, PayloadEncodingMsgPack, payload.Encoding)

assert.Equal(t, `"gaNGb2+jYmFy"`, string(payload.NormalizedPayload))
jd, err := json.Marshal(in)
require.NoError(t, err)

assert.Equal(t, string(jd), string(payload.NormalizedPayload))

obj, ok := (payload.DeserializedPayload).(map[string]any)
require.Truef(t, ok, "parsed payload is not of type map[string]any")
Expand All @@ -71,21 +78,49 @@ func TestMsgPackSerde_DeserializePayload(t *testing.T) {
},
{
name: "not in topic map",
record: &kgo.Record{
Value: msgData,
Topic: "not_msgpack_topic",
record: func() *kgo.Record {
return &kgo.Record{
Value: msgData,
Topic: "not_msgpack_topic",
}
},
payloadType: PayloadTypeValue,
validationFunc: func(t *testing.T, payload RecordPayload, err error) {
require.Error(t, err)
assert.Equal(t, "message pack encoding not configured for topic: not_msgpack_topic", err.Error())
},
},
{
name: "msgpack 2",
record: func() *kgo.Record {
msgPackBinFile := "testdata/msgpack/example.msgpack.bin"

msgPackData, err := os.ReadFile(msgPackBinFile)
require.NoError(t, err)

return &kgo.Record{
Value: msgPackData,
Topic: "msgpack_topic",
}
},
payloadType: PayloadTypeValue,
validationFunc: func(t *testing.T, payload RecordPayload, err error) {
require.NoError(t, err)

assert.Equal(t,
`{"array":[3,2,"1",0],"binary":"AAAAAAo=","negative":false,"num":-2.4,"positive":true}`,
string(payload.NormalizedPayload))

obj, ok := (payload.DeserializedPayload).(map[string]any)
require.Truef(t, ok, "parsed payload is not of type map[string]any")
assert.Equal(t, -2.4, obj["num"])
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
payload, err := serde.DeserializePayload(context.Background(), test.record, test.payloadType)
payload, err := serde.DeserializePayload(context.Background(), test.record(), test.payloadType)
test.validationFunc(t, *payload, err)
})
}
Expand Down
Binary file not shown.

0 comments on commit b56cb2b

Please sign in to comment.