Skip to content

Commit

Permalink
DO NOT MERGE: verify API refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
oleiman committed Jun 14, 2024
1 parent 05e6205 commit 06267e2
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func avroToJsonTransform(e transform.WriteEvent, w transform.RecordWriter) error
)
*e = ex
return err
}), sr.ValueSubject[*avro.Example](os.Getenv("REDPANDA_INPUT_TOPIC"), sr.TopicNameStrategy, func(sn string) error {
}), sr.ValueSubjectTopicName[*avro.Example](os.Getenv("REDPANDA_INPUT_TOPIC"), func(sn string) error {
_, err = c.CreateSchema(sn, schema)
return err
}))
Expand Down
73 changes: 28 additions & 45 deletions src/transform-sdk/go/transform/sr/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ var (
// ErrBadHeader is returned from Decode when the input slice is shorter
// than five bytes, or if the first byte is not the magic 0 byte.
ErrBadHeader = errors.New("5 byte header for value is missing or does not have the 0 magic byte")

// ErrBadSubjectName is returned from ApplySubjectNameStrategy in the event
// that the strategy could not be applied for some reason. e.g. the subject
// name strategy is not supported or was missing a record name.
ErrBadSubjectName = errors.New("Failed to apply subject name strategy (confirm valid strategy and record name requirement)")
)

type (
Expand Down Expand Up @@ -61,38 +56,8 @@ func DecodeFn[T any](fn func([]byte, T) error) SerdeOpt[T] {
return serdeOpt[T]{func(t *idSerde[T]) { t.decode = fn }}
}

type SubjectNameStrat int
type TopicIndex int

const (
TopicNameStrategy SubjectNameStrat = iota
RecordNameStrategy
TopicRecordNameStrategy
)

// ApplySubjectNameStrategy takes a topic name, a strategy, a suffix, and, optionally,
// a fully qualified record name and uses those to derive a subject name.
func ApplySubjectNameStrategy(topic string, strategy SubjectNameStrat, suffix string, name *string) (string, error) {
if strategy == TopicNameStrategy {
return fmt.Sprintf("%s-%s", topic, suffix), nil
} else if strategy == RecordNameStrategy && name != nil {
return fmt.Sprintf("%s-%s", *name, suffix), nil
} else if strategy == TopicRecordNameStrategy && name != nil {
return fmt.Sprintf("%s-%s-%s", topic, *name, suffix), nil
}

return "", ErrBadSubjectName

}

func doSubject(topic string, strategy SubjectNameStrat, fn func(string) error, sfx string, name *string) error {
subject_name, err := ApplySubjectNameStrategy(topic, strategy, sfx, name)
if err != nil {
return err
}
return fn(subject_name)
}

// KeySubject tells Serde to construct a subject name based on the supplied topic and name
// strategy and pass the result into the supplied fn, where clients may, for example, create a
// subject with that name and the desired Schema.
Expand All @@ -103,12 +68,25 @@ func doSubject(topic string, strategy SubjectNameStrat, fn func(string) error, s
//
// Note that applying RecordNameStrategy or TopicRecordNameStrategy without having previously supplied
// a RecordName will result in an error.
func KeySubject[T any](topic string, strategy SubjectNameStrat, fn func(string) error) SerdeOpt[T] {

func keySubject[T any](base_name string, fn func(string) error) SerdeOpt[T] {
return serdeOpt[T]{func(t *idSerde[T]) {
t.ks_err = doSubject(topic, strategy, fn, "key", t.record_name)
t.ks_err = fn(fmt.Sprintf("%s-key", base_name))
}}
}

func KeySubjectTopicName[T any](topic string, fn func(string) error) SerdeOpt[T] {
return keySubject[T](topic, fn)
}

func KeySubjectRecordName[T any](record_name string, fn func(string) error) SerdeOpt[T] {
return keySubject[T](record_name, fn)
}

func KeySubjectTopicRecordName[T any](topic string, record_name string, fn func(string) error) SerdeOpt[T] {
return keySubject[T](fmt.Sprintf("%s-%s", topic, record_name), fn)
}

// ValueSubject tells Serde to construct a subject name based on the supplied topic and name
// strategy and pass the result into the supplied fn, where clients may, for example, create a
// subject with that name and the desired Schema.
Expand All @@ -119,24 +97,29 @@ func KeySubject[T any](topic string, strategy SubjectNameStrat, fn func(string)
//
// Note that applying RecordNameStrategy or TopicRecordNameStrategy without having previously supplied
// a RecordName will result in an error.
func ValueSubject[T any](topic string, strategy SubjectNameStrat, fn func(string) error) SerdeOpt[T] {

func valueSubject[T any](base_name string, fn func(string) error) SerdeOpt[T] {
return serdeOpt[T]{func(t *idSerde[T]) {
t.vs_err = doSubject(topic, strategy, fn, "value", t.record_name)
t.vs_err = fn(fmt.Sprintf("%s-value", base_name))
}}
}

// RecordName provides Serde with a fully qualified record name for the ID. This information
// is optional for general Serde duties, but it is required for subject name strategies
// RecordNameStrategy and TopicRecordNameStrategy.
func RecordName[T any](name string) SerdeOpt[T] {
return serdeOpt[T]{func(t *idSerde[T]) { t.record_name = &name }}
func ValueSubjectTopicName[T any](topic string, fn func(string) error) SerdeOpt[T] {
return valueSubject[T](topic, fn)
}

func ValueSubjectRecordName[T any](record_name string, fn func(string) error) SerdeOpt[T] {
return valueSubject[T](record_name, fn)
}

func ValueSubjectTopicRecordName[T any](topic string, record_name string, fn func(string) error) SerdeOpt[T] {
return valueSubject[T](fmt.Sprintf("%s-%s", topic, record_name), fn)
}

type idSerde[T any] struct {
encode func(T) ([]byte, error)
appendEncode func([]byte, T) ([]byte, error)
decode func([]byte, T) error
record_name *string
ks_err error
vs_err error
}
Expand Down
45 changes: 16 additions & 29 deletions src/transform-sdk/go/transform/sr/serde_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package sr

import (
"encoding/json"
"errors"
"fmt"
"reflect"
"testing"
Expand Down Expand Up @@ -123,8 +124,6 @@ func TestSubjectNameDerivation(t *testing.T) {
topicName := "demo-topic"
recordName := "example"

s.SetDefaults(RecordName[*example](recordName))

s.Register(
3,
EncodeFn[*example](func(e *example) ([]byte, error) {
Expand All @@ -133,11 +132,11 @@ func TestSubjectNameDerivation(t *testing.T) {
DecodeFn[*example](func(b []byte, e *example) error {
return json.Unmarshal(b, e)
}),
KeySubject[*example](topicName, TopicNameStrategy, func(subj_name string) error {
KeySubjectTopicName[*example](topicName, func(subj_name string) error {
ks = subj_name
return nil
}),
ValueSubject[*example](topicName, RecordNameStrategy, func(subj_name string) error {
ValueSubjectRecordName[*example](recordName, func(subj_name string) error {
vs = subj_name
return nil
}),
Expand Down Expand Up @@ -165,7 +164,7 @@ func TestSubjectNameDerivation(t *testing.T) {
DecodeFn[*example](func(b []byte, e *example) error {
return json.Unmarshal(b, e)
}),
KeySubject[*example](topicName, TopicRecordNameStrategy, func(subj_name string) error {
KeySubjectTopicRecordName[*example](topicName, recordName, func(subj_name string) error {
ks = subj_name
return nil
}),
Expand All @@ -190,13 +189,13 @@ func TestSubjectNameDerivationErrors(t *testing.T) {
C: true,
}

var (
SomeError = errors.New("Oops")
)

// Register again, with the SNS options
val := false
topicName := "demo-topic"

// Clear out the record name default
s.SetDefaults()

s.Register(
5,
EncodeFn[*example](func(e *example) ([]byte, error) {
Expand All @@ -205,20 +204,14 @@ func TestSubjectNameDerivationErrors(t *testing.T) {
DecodeFn[*example](func(b []byte, e *example) error {
return json.Unmarshal(b, e)
}),
KeySubject[*example](topicName, RecordNameStrategy, func(subj_name string) error {
val = true
return nil
KeySubjectTopicName[*example](topicName, func(subj_name string) error {
return SomeError
}),
)

// check that the callback wasn't called
if val {
t.Fatal("unexpected callback", val)
}

// but also that encoding will fail
// Check that encoding fails and reports the error from the user-supplied function
_, err := s.Encode(&e1)
if err != ErrBadSubjectName {
if err != SomeError {
t.Fatal("unexpected error", err)
}

Expand All @@ -230,20 +223,14 @@ func TestSubjectNameDerivationErrors(t *testing.T) {
DecodeFn[*example](func(b []byte, e *example) error {
return json.Unmarshal(b, e)
}),
ValueSubject[*example](topicName, TopicRecordNameStrategy, func(subj_name string) error {
val = false
return nil
ValueSubjectTopicName[*example](topicName, func(subj_name string) error {
return SomeError
}),
)

// check that the callback wasn't called
if val {
t.Fatal("unexpected callback", val)
}

// but also that encoding will fail
// Check that encoding fails and reports the error from the user-supplied function
_, err = s.Encode(&e1)
if err != ErrBadSubjectName {
if err != SomeError {
t.Fatal("unexpected error", err)
}
}

0 comments on commit 06267e2

Please sign in to comment.