Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema registry support #110

Closed
Tracked by #164
brewkode opened this issue Nov 19, 2021 · 9 comments
Closed
Tracked by #164

schema registry support #110

brewkode opened this issue Nov 19, 2021 · 9 comments

Comments

@brewkode
Copy link

I started looking into franz-go and I'm quite excited with what I'm seeing. I wanted to check if there's plan to support schema registry as part of this project. I checked existing issues, did not find anything relevant.

@twmb
Copy link
Owner

twmb commented Nov 19, 2021

Supporting a schema registry itself is easy enough (http requests), the problem is encoding/decoding. Produce expects already serialized structs, and PollFetches gives you raw data.

Some registry flow would have to go in front of Produce and at the end of PollFetches. I'm not quite sure what that API would look like, or if I should even provide an API to produce structs, vs. just having a separate registry package.

I think the clearest API would be for an end user to say "encode/decode type T with schema S". This sidesteps problems of different languages encoding schemas differently based on types (nullable stuff in Java, non-nullable in Go), but it means that you have to generate schemas ahead of time, as well as register types to schemas ahead of time, before encoding/decoding.

What do you think?

Also I don't have time for this right now, but perhaps in a month or two unless somebody else gets to it or if I find some random time.

@brewkode
Copy link
Author

I agree, the schema registry api support itself is a bunch of http requests.. Tying out schema during a produce/consume workflow means we need a serde layer.

I think the clearest API would be for an end user to say "encode/decode type T with schema S". This sidesteps problems of different languages encoding schemas differently based on types (nullable stuff in Java, non-nullable in Go), but it means that you have to generate schemas ahead of time, as well as register types to schemas ahead of time, before encoding/decoding.

The fundamental premise of a schema registry is to provide some level of type guarantees to the events/messages in a topic. So, for most workflows a schema is already generated and registered ahead of time. Here's some doc on Serialization formats supported by the confluent schema registry - https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html

I do think a separate API to encode/decode T with Schema S is cleaner and thus can decouple the serde layer more concisely. Some thoughts around this are:

  • should the schema registry serde be passed in as kgo.Opt ? if we wanted the entire schema registry support as a separate go package, then, I need to look at the codebase to see if we would run into dep cycles with the way its structured today, etc. I'm not saying, we will, just saying, I don't know.
  • can we leverage the Hooks infra to transparently to serde based on the serde opt ? Is it too much magic ?

@twmb
Copy link
Owner

twmb commented Nov 29, 2021

AFAIK, this cannot be done with generics for the kgo.Client now because it would be a breaking change. Go does not allow generic methods, so the entire kgo.Client type would need to be generic.

Hooks are also not an option for essentially the same reason. The only way to do it with hooks would be to pass an interface{}.

I think a wrapper type may be possible, or some top level package functions (which would be a bit ugly).

@peterbourgon
Copy link

if I should even provide an API to produce structs, vs. just having a separate registry package

In Go, even with generics, it's awkward to the point of infeasibility to generate instances of types dynamically. I think you'd want to offload most of the responsibility to your callers, i.e. to deserialize you'd provide a schema in one of the supported formats along with some bytes, and callers would hydrate a struct which they themselves have defined and which would be opaque to you; to serialize you'd provide a schema and they'd return you some bytes.

@vtolstov
Copy link
Contributor

vtolstov commented Dec 6, 2021

may be if we have func that can register some schema desc with go struct like
RegisterSchema(schema string, iface interface{})
we can decide, create new copy of passed interface and unmarshal to it?

@owenhaynes
Copy link
Contributor

Yeah I feel this is best left to the calling code or a wrapper around the client.

Have something mocked up locally

type Decoder interface {
	Decode(ctx context.Context, record *kgo.Record) (interface{}, error)
}

This get called by a wrapper on each record and the caller passes a consume function which looks like this:

func(ctx context.Context, record *kgo.Record, value interface{})

Go Generics then can have a middleware type thing to do the casting for you:
Something like

func ConsumeAs[T any](consume func(ctx context.Context, record *kgo.Record, value T)) func(ctx context.Context, record *kgo.Record, value interface{}) {
	return func(ctx context.Context, record *kgo.Record, value interface{}) {
		c := value.(T)
		consume(ctx, record, c)
	}
}

@twmb twmb mentioned this issue May 9, 2022
10 tasks
@twmb
Copy link
Owner

twmb commented May 23, 2022

I've added some support for the schema registry in these two commits:

This currently exists as a separate, unstable module github.com/twmb/franz-go/pkg/sr. I've tested most of the HTTP API, and I aim to test the Serde client soon (tm) to see if it's an alright API for actual usage in code.

I've punted on the entire issue of accepting arbitrary types because there really isn't a way to do it properly in Go. The Serde option is actually close to what was proposed above by @vtolstov -- to encode or decode a type, you must register the ID and the type along with its encode or decode functions ahead of time. I'm going to close this for now this this is fully implemented an in a separate module, but if anybody in this thread could wants to play with the unstable API, that'd be great. I'm probably going to leave this as a separate module for now because I don't trust the HTTP API not changing underfoot, which may prompt major version bumps of the Go API, and I'd like to keep that separate from franz-go itself.

I'm mostly ok with the Client API (although query parameters get odd fast), and I think the Serde API is ok though I have yet to use it -- I'll probably 1.0 this separate module with franz-go v1.6 (and I might make it a part of franz-go proper, but am leaning no per reasons just above).

@twmb twmb closed this as completed May 23, 2022
@twmb
Copy link
Owner

twmb commented May 23, 2022

I'll also add an example or two of how to use this new package, and mention it in the readme, so that the usage is a lot more obvious. This will be done before 1.6.

@twmb
Copy link
Owner

twmb commented May 26, 2022

Example is added here, and I've tested the sr.Serde type as well: efc48f0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants