Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pulsar IO] Clarify usage of KVRecord on Sinks #10113

Closed
wants to merge 3 commits into from

Conversation

eolivelli
Copy link
Contributor

Motivation

When you develop a Source you can push KVRecords in order to send records with the KeyValue schema and set properly the KeySchema the ValueSchema and the Key encoding type.

On the Sink side you are receiving only Record, it is better to have a consistent behaviour

Modifications

  • set the generic type of KVRecord <K,V> to Record<KeyValue<K,V>>
  • let Sinks receive instances of KVRecord in case of KeyValue payload and schema

Verifying this change

This change added tests

- set the generic type to Record<KeyValue>
- let Sinks receive instances of KVRecord in case of KeyValue payload and schema
@eolivelli eolivelli changed the title Clarify usage of KVRecord [Pulsar IO] Clarify usage of KVRecord on Sinks Apr 1, 2021
@eolivelli eolivelli self-assigned this Apr 1, 2021
@eolivelli
Copy link
Contributor Author

@tuteng can you please take a look? since you are the author of KVRecord interface

@eolivelli eolivelli marked this pull request as ready for review April 1, 2021 14:58
@@ -342,7 +345,14 @@ private void sendOutputMessage(Record srcRecord, Object output) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
this.sink.write(new SinkRecord<>(srcRecord, output));
SinkRecord sinkRecord;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why do you need this change here. What is the value for adding a SinkKVRecord? SinkRecord is a wrapper of the output object. The output object can be a KeyValue object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie
these are the key points:

  1. on the Source side you use KVRecord for records with a KeyValue payload, this lets you specify the keySchema and the valueSchema, for consistency it is good to have the same situation on the Sink side
  2. KeyValueSchema is not part of the public api (it is in the impl package), so in theory you cannot get the keySchema and the valueSchema when you are inside a Sink, if we implement KVRecord then you have those schemas
  3. When we have AutoConsumeSchema that returns a KeyValue object the record.getSchema() is the internal class AutoConsumeSchema, so you cannot still get the keySchema and the valueSchema (this problem is still not present, because we have not merged the Sink<GenericObject patch yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie the alternative is:

  • make KeyValueSchema part of the "public" API (or add an interface on the "api" package)
  • hide "AutoConsumeSchema" in the Sink side and make Record#getSchema() return the current KeyValueSchema

but this alternative is not good because:

  • we are going to add a new API to represent KeyValueSchema, but we already have KVRecord
  • we are changing the behaviour of the Sink, not returning AutoConsumeSchema anymore

@eolivelli
Copy link
Contributor Author

eolivelli commented Apr 10, 2021

@sijie I am closing this PR. I will send another patch with an alternative approach

cc @codelipenghui

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants