-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
transform-sdk: Introduce sr package #13049
Conversation
978f052
to
fc67782
Compare
if ok { | ||
return cached, nil | ||
} | ||
s, err = sr.underlying.LookupSchemaById(id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hottt.
CI Failure: #12120 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c++ bits look good to me. since it's SR stuff maybe @BenPope @NyaliaLui @oleiman also wanna take a peak
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've had a glance at this. In general it looks ok to me.
Without documentation it's hard to know what limitations are expected.
For example, I see no [en|de]coding of the protobuf message offsets in the wire format.
If possible, it would be nice to break commits into slightly smaller than 1200 line chunks so that discrete functionality is easier to review.
As far as I know Tinygo doesn't support protobuf right now: tinygo-org/tinygo#2667 I do have some tasks to look into if that's fixed in the most recent version, so I'm going to give that a go then I may have to have a followup PR to add the message offsets, in a similar manner to Tinygo. Honestly I'd love to have a much better API here, but that needs a lot of time to flesh out the right API, which will have to come later.
Apologies, I had meant to break this up before I submitted this for review. I've gone ahead and broken this up now |
src/go/transform-sdk/sr/client.go
Outdated
package sr | ||
|
||
// schemaId is an ID of a schema registered with schema registry | ||
type schemaId uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Redpanda, this is signed, is the difference intentional?
using schema_version = named_type<int32_t, struct schema_version_tag>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch - it was not.
src/go/transform-sdk/sr/client.go
Outdated
|
||
type ( | ||
clientOpts struct { | ||
disableCaches bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think specifying a cache size might give some more flexibility here, otherwise the cache is unbounded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, my thought process was:
- I expect people to likely call
LookupSchemaById
for every record initially, so we should cache by default - schemas change infrequently
- changes in schema will likely also mean code will be deployed soon (because you'll want your transform to use the new data your schema just added right?), resetting the cache.
- if you want to do something custom we should just disable caches and let you write whatever fancy caching logic you want.
I've documented the unbounded cache default more and added a simple entry count based map. My hope is that is simple enough for basic usage and more complex use cases can just disable this level of caching then use whatever custom logic they want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any useful go-packages with cache implementations? I wonder if LRU or LFU evictions would be better, and it's mostly a solved problem.
Most of them seem to be "thread safe", which I assume is undesirable?
value V | ||
insertionNumber int | ||
} | ||
// A cache that evicts based on number of entries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears to use FIFO eviction, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, updated the documentation.
|
||
// Put adds an entry into the cache | ||
func (c *Cache[K, V]) Put(k K, v V) { | ||
c.underlying[k] = entry[V]{value: v, insertionNumber: c.latestEntryInsertionNumber} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This temporarily inserts one more than the limit, which means if you're really unlucky, the number of buckets could be twice what's required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch - fixed.
return len(c.underlying) | ||
} | ||
|
||
func (c *Cache[K, V]) prune() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The values (overall) are quite large, so I wonder if it's worth choosing a different data structure that would trade off some memory for bookkeeping, but reduce insertion time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
All of the major ones that I've seen use goroutines and currently are not supported in the way we use tinygo + wasm, or don't have friendly licenses. I've written a small LRU cache that hopefully works good enough for 99% of users, if folks want to do something more complex they can disable the cache and put something on top of our client. |
These functions where already defined in redpanda-data#12666 and their encoding. We also add a stub version of the ABI for IDEs that don't use the tinygo build tags (and we can build/test this package in standard go). Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
The corresponding side of this contract on the broker lies in src/v/wasm/schema_registry_module.cc Also add tests that this can be roundtripped. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
This wraps the ABI exposed in previous commits in a user friendly fashion. By default all data from schema registry is cached forever in the client, since we expect schemas to change infrequently this seems to be the right tradeoff. This also is the default in most kafka clients. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
The API is mostly the same as the one in franz-go, but it uses generics for some additional typesafety. We currently only support avro schemas here. At the time of writing, Redpanda does not support JSON schema, and tinygo does not support protocol buffers. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
We'll use this transform to test the schema registry code in Redpanda. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
This will allow us to test the schema registry functionality in wasm without needing to pull in or spin up all of Redpanda. It's very simple and certainly wrong, but will be enough functionality to write some simple tests that use schema registry. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
This ensures our ABI contract works with the SDK and we can perform basic operations using the fake schema registry. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
client To give more control than all or nothing. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Force push: fixed adding in the client then removing it in the first two commits. |
Introduce a
sr
(schema registry) client within our tinygo SDK.Exposes the ability to read/write from schema registry in a limited way.
We currently only support registring, and looking up schemas.
Additionally add a test case that shows the ability to convert from avro
-> json using a schema registry encoded schema.
NOTE: The host side of the schema-registry was checked in as part of #12666
Backports Required
Release Notes