Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

golang SDK: Introduce serde opts for subject auto(ish)-creation #19823

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"bytes"
"os"

"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform/sr"
Expand All @@ -30,14 +31,6 @@ var (

func main() {
c = sr.NewClient()
e := avro.Example{}
_, err := c.CreateSchema(topicName+"-value", sr.Schema{
Type: sr.TypeAvro,
Schema: e.Schema(),
})
if err != nil {
println("unable to registry schema: ", err)
}
transform.OnRecordWritten(avroToJsonTransform)
}

Expand All @@ -53,9 +46,9 @@ func avroToJsonTransform(e transform.WriteEvent, w transform.RecordWriter) error
if err != nil {
return err
}
schema, err := c.LookupSchemaById(id)
if err != nil {
return err
schema := sr.Schema{
Type: sr.TypeAvro,
Schema: ex.Schema(),
}
// Register the new schema
s.Register(id, sr.DecodeFn[*avro.Example](func(b []byte, e *avro.Example) error {
Expand All @@ -65,6 +58,9 @@ func avroToJsonTransform(e transform.WriteEvent, w transform.RecordWriter) error
)
*e = ex
return err
}), sr.ValueSubjectTopicName[*avro.Example](os.Getenv("REDPANDA_INPUT_TOPIC"), func(sn string) error {
_, err = c.CreateSchema(sn, schema)
return err
}))
// Now try and decode the value now that we've looked it up.
if err = s.Decode(v, &ex); err != nil {
Expand Down
69 changes: 69 additions & 0 deletions src/transform-sdk/go/transform/sr/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package sr
import (
"encoding/binary"
"errors"
"fmt"
)

var (
Expand Down Expand Up @@ -55,10 +56,72 @@ func DecodeFn[T any](fn func([]byte, T) error) SerdeOpt[T] {
return serdeOpt[T]{func(t *idSerde[T]) { t.decode = fn }}
}

func keySubject[T any](base_name string, fn func(string) error) SerdeOpt[T] {
return serdeOpt[T]{func(t *idSerde[T]) {
t.ks_err = fn(fmt.Sprintf("%s-key", base_name))
}}
}

// KeySubjectTopicName tells Serde to construct a subject name using topic name strategy and
// pass the result to a user-supplied function, where, for example, the subject might be pushed
// out to schema registry. If the user supplied function returns an error, subsequent call to
// Serde.Encode will short circuit, returning that error.
func KeySubjectTopicName[T any](topic string, fn func(string) error) SerdeOpt[T] {
return keySubject[T](topic, fn)
}

// KeySubjectRecordName tells Serde to construct a subject name using record name strategy and
// pass the result to a user-supplied function, where, for example, the subject might be pushed
// out to schema registry. If the user supplied function returns an error, subsequent call to
// Serde.Encode will short circuit, returning that error.
func KeySubjectRecordName[T any](record_name string, fn func(string) error) SerdeOpt[T] {
return keySubject[T](record_name, fn)
}

// KeySubjectTopicRecordName tells Serde to construct a subject name using topic record name
// strategy and pass the result to a user-supplied function, where, for example, the subject
// might be pushed out to schema registry. If the user supplied function returns an error,
// subsequent call to Serde.Encode will short circuit, returning that error.
func KeySubjectTopicRecordName[T any](topic string, record_name string, fn func(string) error) SerdeOpt[T] {
return keySubject[T](fmt.Sprintf("%s-%s", topic, record_name), fn)
}

func valueSubject[T any](base_name string, fn func(string) error) SerdeOpt[T] {
return serdeOpt[T]{func(t *idSerde[T]) {
t.vs_err = fn(fmt.Sprintf("%s-value", base_name))
}}
}

// ValueSubjectTopicName tells Serde to construct a subject name using topic name strategy and
// pass the result to a user-supplied function, where, for example, the subject might be pushed
// out to schema registry. If the user supplied function returns an error, subsequent call to
// Serde.Encode will short circuit, returning that error.
func ValueSubjectTopicName[T any](topic string, fn func(string) error) SerdeOpt[T] {
return valueSubject[T](topic, fn)
}

// ValueSubjectRecordName tells Serde to construct a subject name using record name strategy and
// pass the result to a user-supplied function, where, for example, the subject might be pushed
// out to schema registry. If the user supplied function returns an error, subsequent call to
// Serde.Encode will short circuit, returning that error.
func ValueSubjectRecordName[T any](record_name string, fn func(string) error) SerdeOpt[T] {
return valueSubject[T](record_name, fn)
}

// ValueSubjectTopicRecordName tells Serde to construct a subject name using topic record name
// strategy and pass the result to a user-supplied function, where, for example, the subject
// might be pushed out to schema registry. If the user supplied function returns an error,
// subsequent call to Serde.Encode will short circuit, returning that error.
func ValueSubjectTopicRecordName[T any](topic string, record_name string, fn func(string) error) SerdeOpt[T] {
return valueSubject[T](fmt.Sprintf("%s-%s", topic, record_name), fn)
}

type idSerde[T any] struct {
encode func(T) ([]byte, error)
appendEncode func([]byte, T) ([]byte, error)
decode func([]byte, T) error
ks_err error
vs_err error
}

func (s *idSerde[T]) isEncoder() bool {
Expand Down Expand Up @@ -143,6 +206,12 @@ func (s *Serde[T]) AppendEncode(b []byte, v T) ([]byte, error) {
return b, ErrNotRegistered
}

if idserde.ks_err != nil {
return b, idserde.ks_err
} else if idserde.vs_err != nil {
return b, idserde.vs_err
}

// write the magic leading byte, then the id in big endian
b = append(b, 0, 0, 0, 0, 0)
binary.BigEndian.PutUint32(b[1:5], uint32(s.encodingVersion))
Expand Down
125 changes: 125 additions & 0 deletions src/transform-sdk/go/transform/sr/serde_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package sr

import (
"encoding/json"
"errors"
"fmt"
"reflect"
"testing"
)
Expand Down Expand Up @@ -109,3 +111,126 @@ func TestUnregistered(t *testing.T) {
t.Fatal("unexpected error", err)
}
}

func TestSubjectNameDerivation(t *testing.T) {
e1 := example{
A: "foo",
B: 42,
C: true,
}
// Register again, with the SNS options
ks := ""
vs := ""
topicName := "demo-topic"
recordName := "example"

s.Register(
3,
EncodeFn[*example](func(e *example) ([]byte, error) {
return json.Marshal(e)
}),
DecodeFn[*example](func(b []byte, e *example) error {
return json.Unmarshal(b, e)
}),
KeySubjectTopicName[*example](topicName, func(subj_name string) error {
ks = subj_name
return nil
}),
ValueSubjectRecordName[*example](recordName, func(subj_name string) error {
vs = subj_name
return nil
}),
)

if ks != fmt.Sprintf("%s-key", topicName) {
t.Fatal("Incorrect key subject name TopicNameStrategy", ks)
}

if vs != fmt.Sprintf("%s-value", recordName) {
t.Fatal("Incorrect value subject name for RecordNameStrategy", vs)
}

b := s.MustEncode(&e1)
e2 := example{}
if err := s.Decode(b, &e2); err != nil {
t.Fatal(err)
}

s.Register(
4,
EncodeFn[*example](func(e *example) ([]byte, error) {
return json.Marshal(e)
}),
DecodeFn[*example](func(b []byte, e *example) error {
return json.Unmarshal(b, e)
}),
KeySubjectTopicRecordName[*example](topicName, recordName, func(subj_name string) error {
ks = subj_name
return nil
}),
)

if ks != fmt.Sprintf("%s-%s-key", topicName, recordName) {
t.Fatal("Incorrect key subject name for TopicRecordNameStrategy", ks)
}

b = s.MustEncode(&e1)
e2 = example{}
if err := s.Decode(b, &e2); err != nil {
t.Fatal(err)
}

}

func TestSubjectNameDerivationErrors(t *testing.T) {
e1 := example{
A: "foo",
B: 42,
C: true,
}

var (
SomeError = errors.New("Oops")
)

// Register again, with the SNS options
topicName := "demo-topic"

s.Register(
5,
EncodeFn[*example](func(e *example) ([]byte, error) {
return json.Marshal(e)
}),
DecodeFn[*example](func(b []byte, e *example) error {
return json.Unmarshal(b, e)
}),
KeySubjectTopicName[*example](topicName, func(subj_name string) error {
return SomeError
}),
)

// Check that encoding fails and reports the error from the user-supplied function
_, err := s.Encode(&e1)
if err != SomeError {
t.Fatal("unexpected error", err)
}

s.Register(
6,
EncodeFn[*example](func(e *example) ([]byte, error) {
return json.Marshal(e)
}),
DecodeFn[*example](func(b []byte, e *example) error {
return json.Unmarshal(b, e)
}),
ValueSubjectTopicName[*example](topicName, func(subj_name string) error {
return SomeError
}),
)

// Check that encoding fails and reports the error from the user-supplied function
_, err = s.Encode(&e1)
if err != SomeError {
t.Fatal("unexpected error", err)
oleiman marked this conversation as resolved.
Show resolved Hide resolved
}
}
2 changes: 1 addition & 1 deletion src/transform-sdk/rust/examples/schema_registry/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn main() {
let mut client = SchemaRegistryClient::new();
let s: apache_avro::Schema = Example::get_schema();
match client.create_schema(
"demo-topic-value",
"avro-value",
Schema::new_avro(s.canonical_form(), Vec::new()),
) {
Ok(_) => {}
Expand Down
7 changes: 7 additions & 0 deletions src/transform-sdk/tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ func TestSchemaRegistry(t *testing.T) {
// Make sure the schema was created by the transform
schema, err := srClient.SchemaByID(ctx, 1)
require.NoError(t, err)

// Quick check that the transform created an additional subject
// by applying topic name strategy to the input topic
subj_schema, err := srClient.SchemaByVersion(ctx, "avro-value", -1)
require.NoError(t, err)
require.Equal(t, schema, subj_schema.Schema, "Schemas should be the same")

// Ensure the canonicalized schema is what we expect.
require.Equal(t, avro.MustParse(schema.Schema).String(), RecordV1Schema.String())
v2 := RecordV2{
Expand Down
Loading