-
Notifications
You must be signed in to change notification settings - Fork 615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement ALTER SOURCE xx FORMAT xx ENCODE xx (...)
#14057
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we mark this PR as user facing changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 4701 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2054 | 1 | 2646 | 0 |
Click to see the invalid file list
- src/frontend/src/handler/alter_source_with_sr.rs
It looks like the pr just contains alter source part. Shall we add support for altering table with connection (& sr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approve for the alter source with schema registry.
Planning to impl table with connector for schema registry refresh in future pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
let request = AlterSourceRequest { | ||
source: Some(source), | ||
}; | ||
let resp = self.inner.alter_source(request).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, the logics in the Meta node side are all completed before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes indeed. Since alter source sr with table
is not implemented in this pr, we only need to replace PbSource in the Meta. The replacement is done by alter_source
handler and thus can be reused.
) | ||
FORMAT PLAIN ENCODE PROTOBUF( | ||
schema.registry = 'http://message_queue:8081', | ||
message = 'test.User' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the Conflulent Schema Registry supports versioning of schema, that is, one schema (topic) may have multiple proto definition with different versions. For example: https://docs.confluent.io/platform/current/schema-registry/develop/using.html#fetch-version-1-of-the-schema-registered-under-subject-kafka-value. Shall we support specifying an optional version
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this feature is not hard to implement. I will try it in the next pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can always stick to the latest schema version in source.
Here we assume the upstream workers always use the latest schema version (writer schema). It is not a strong assumption because if no producer uses a new schema, the schema version on the Schema Registry will not be updated.
Schema registry guarantee backward compatibility, RW can get all it needs if reader schema version <= writer schema version
, ie. there must be some producers generate data with equal or higher version of schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense to me. We may defer it until being requested by some users.
Co-authored-by: Eric Fu <eric@singularity-data.com>
Co-authored-by: Eric Fu <eric@singularity-data.com>
Co-authored-by: Eric Fu <eric@singularity-data.com>
None => { | ||
Err(RwError::from(ProtocolError("The proto payload is empty".to_owned()))) | ||
} | ||
Some(i) => Ok(&remained[(*i as usize)..]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is skipping i
bytes rather than skipping i
variable-length zig-zag encoded integers. For i
within 1..=63
, the encoding outputs a single byte so this happens to work.
prefixed by the length of the array (which is also variable length, Zigzag encoded)
i
itself is also encoded so the decoded length shall be i / 2
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, so the correct implementation is
Some(i) => Ok(&remained[(1 + *i / 2) as usize..]),
function register_schema_registry() { | ||
curl -X POST http://message_queue:8081/subjects/$1/versions \ | ||
-H ‘Content-Type: application/vnd.schemaregistry.v1+json’ \ | ||
--data-binary @<(jq -n --arg schema “$(cat $2)” ‘{schemaType: “PROTOBUF”, schema: $schema}’) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The quotes here are Chinese quotes.. Does it really work???
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it seems this function is unused
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Support to refresh the schema registry by refilling
format_encode_options
.Note that altering data_format and data_encode is not supported for now, and altering source with table is to be implemented in the next pr.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.