-
Notifications
You must be signed in to change notification settings - Fork 820
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add fetch_in_order
field to schema_registry
input
#2898
Conversation
} | ||
|
||
if test.schemaWithReference != "" { | ||
body, err := json.Marshal(payload{Schema: test.schemaWithReference, References: []map[string]any{{"name": "foo", "subject": subject, "version": 1}}}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit janky, but I can clean it up to use the name from t.schema
if need be...
ee05f61
to
4a21635
Compare
if ["topic", "key", "partition", "partitioner", "timestamp"].any(f -> this.redpanda_migrator.keys().contains(f)) { | ||
root = throw("The topic, key, partition, partitioner and timestamp fields of the redpanda_migrator output must be left empty") | ||
} | ||
let rpMigratorMaxInFlight = this.redpanda_migrator.max_in_flight.or(1) | ||
let redpandaMigrator = this.redpanda_migrator.assign( | ||
{ | ||
"topic": "${! metadata(\"kafka_topic\").or(throw(\"missing kafka_topic metadata\")) }", | ||
"key": "${! metadata(\"kafka_key\") }", | ||
"partition": "${! metadata(\"kafka_partition\").or(throw(\"missing kafka_partition metadata\")) }", | ||
"partitioner": "manual", | ||
"timestamp": "${! metadata(\"kafka_timestamp_unix\").or(timestamp_unix()) }" | ||
"timestamp": "${! metadata(\"kafka_timestamp_unix\").or(timestamp_unix()) }", | ||
"max_in_flight": $rpMigratorMaxInFlight | ||
} | ||
) | ||
|
||
let redpandaMigratorOffsets = this.redpanda_migrator.with("seed_brokers", "consumer_group", "client_id", "rack_id", "max_message_bytes", "broker_write_max_bytes", "tls", "sasl") | ||
|
||
if this.schema_registry.keys().contains("subject") { | ||
root = throw("The subject field of the schema_registry output must not be set") | ||
} | ||
let schema_registry = if this.schema_registry.length() > 0 { this.schema_registry.assign({"subject": "${! @schema_registry_subject }"}) } | ||
if ["topic", "key", "partition", "partitioner", "timestamp"].any(f -> this.redpanda_migrator.keys().contains(f)) { | ||
root = throw("The topic, key, partition, partitioner and timestamp fields of the redpanda_migrator output must be left empty") | ||
|
||
let srMaxInFlight = this.schema_registry.max_in_flight.or(1) | ||
let schemaRegistry = if this.schema_registry.length() > 0 { | ||
this.schema_registry.assign({ | ||
"subject": "${! @schema_registry_subject }", | ||
"max_in_flight": $srMaxInFlight | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just making sure that we set max_in_flight
to 1
unless users set it to something else explicitly (and hopefully are aware they'll get out-of-order messages in that case).
=== `fetch_all_schemas_on_connect` | ||
|
||
Fetch all schemas on connect. Set this to `true` when schema references are used. | ||
|
||
|
||
*Type*: `bool` | ||
|
||
*Default*: `true` | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name is not very intuitive for me.
Can we name it:
enforce_ordering: bool
If true all schemas will be fetched and sorted, so that schemas are inserted in the same order. This is required to correctly resolve schema references.
default: true
On another note: do we even need this option? Is this only false if we would OOM reading all the schemas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe fetch_in_order
since they can still mess up the order on the output with max_in_flight
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming things is hard 😅 I like fetch_in_order
, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this only false if we would OOM reading all the schemas?
I'm not super-concerned about OOM unless the schemas are huge, but it could take a long time to fetch them all if there are many of them, so it might be handy to stream them if people wish to stick a cache in the middle. Or maybe I'm overthinking it... The tests cover both flows now, so I'm happy to leave it there.
5217677
to
141ec00
Compare
Signed-off-by: Mihai Todor <todormihai@gmail.com>
141ec00
to
ff11032
Compare
fetch_all_schemas_on_connect
field to schema_registry
inputVersion
field to schema_registry
input
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Way cleaner - I did find one issue, but otherwise LGTM
if !tlsEnabled { | ||
tlsConf = nil | ||
} | ||
if i.client, err = sr.NewClient(srURL.String(), reqSigner, tlsConf, mgr); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥳
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for that suggestion! The code is much cleaner indeed! ❤️
schemas[schema.ID] = schemaInfo{ | ||
SchemaInfo: schema, | ||
subject: subject, | ||
version: version, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will overwrite if there is a schema stored in multiple different subjects.
For example schema ID 2 can be registered in 3 different subjects, and this way we will drop two others. I think we need to collect all of them without deduplication, then post them all in sorted order. The order between same id keys doesn't matter, but we still need to populate the subject -> schema ID relation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for spotting that! Indeed, this is a corner case that I was trying to get my head around and it looks like you're right and the schemas will get duplicate IDs :( The trouble is that I can't get Redpanda to import schemas with a fixed ID but different subjects:
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"id": 1, "version": 1, "schema": "{\"name\": \"Foo\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/foo/versions
{"id":1}%
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"id": 1, "version": 1, "schema": "{\"name\": \"Foo\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/bar/versions
{"error_code":42207,"message":"Overwrite new schema with id 1 is not permitted."}%
It does work, however, if I drop the id
field or both the id and
version` fields:
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\": \"Foo\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/bar/versions
{"id":1}%
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"version": 1, "schema": "{\"name\": \"Foo\", \"type\": \"record\", \"fields\": [{\"name\": \"data\", \"type\": \"int\"}]}"}' http://localhost:8081/subjects/baz/versions
{"id":1}%
However, if I do this, then it feels kind of brittle because I'd have to make sure I first send the schema which does have the id
field set...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be fixed now. I'll defer adding a test until Redpanda lifts the above constraint as part of
CORE-7734.
Version
field to schema_registry
inputfetch_in_order
field to schema_registry
input
Schemas can have the same ID and be registered under different subjects. I can't add a test for now because Redpanda doesn't allow us to POST schemas with the same ID under different subjects for now. This limitation will be resolved as part of a follow-up task. Signed-off-by: Mihai Todor <todormihai@gmail.com>
// Omit `subject` from the schema since the `/subjects/<subject>/versions` endpoint doesn't allow it as part of | ||
// the payload we pass it along as metadata. | ||
schema := map[string]any{ | ||
"id": si.ID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we have to drop the ID if there are multiple subjects with the same ID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully CORE-7734 gets addressed in Redpanda so we won’t have to make this logic even more convoluted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
No description provided.