-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement GenericObject - Allow GenericRecord to wrap any Java Object #10057
Merged
Merged
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
8d97efe
Implement Schema#OBJECT prototype
eolivelli 3cb55a7
Reduce code duplication
eolivelli 55e337b
cleanup
eolivelli c0433fb
Fix imports
eolivelli ae23109
Merge branch 'impl/schema-object' into impl/primitive-record
eolivelli 6294e12
Add PrimitiveRecord in order to support primitive types in Schema.AUT…
eolivelli 9e1a0bf
Merge branch 'master' into impl/primitive-record
eolivelli 77ec727
Implement PulsarObject
eolivelli a58c723
Implement PulsarObject
eolivelli 3364edd
Rename PulsarObject to GenericObject
eolivelli bd0fbb1
restore
eolivelli 9515771
clean up
eolivelli a9efa02
restore fields
eolivelli 032b824
Rename PrimiveRecord to GenericObjectWrapper
eolivelli 6b56a4b
Address Sijie's comments
eolivelli 29e28eb
Merge remote-tracking branch 'origin/master' into impl/pulsar-object
eolivelli d8f02f3
Merge branch 'master' into impl/pulsar-object
eolivelli 9e2ba53
fix merge conflicts
eolivelli c4c1d21
Merge branch 'master' into impl/pulsar-object
eolivelli 915cecd
Merge branch 'master' into impl/pulsar-object
eolivelli ca86cf1
Merge branch 'master' into impl/pulsar-object
eolivelli File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
50 changes: 50 additions & 0 deletions
50
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericObject.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.client.api.schema; | ||
|
||
import org.apache.pulsar.common.schema.SchemaType; | ||
|
||
/** | ||
* This is an abstraction over the logical value that is store into a Message. | ||
* Pulsar decodes the payload of the Message using the Schema that is configured for the topic. | ||
*/ | ||
public interface GenericObject { | ||
|
||
/** | ||
* Return the schema tyoe. | ||
* | ||
* @return the schema type | ||
* @throws UnsupportedOperationException if this feature is not implemented | ||
* @see SchemaType#BYTES when the topic has no schema information | ||
* @see SchemaType#STRING | ||
* @see SchemaType#AVRO | ||
* @see SchemaType#PROTOBUF_NATIVE | ||
* @see SchemaType#JSON | ||
*/ | ||
SchemaType getSchemaType(); | ||
|
||
/** | ||
* Return the internal native representation of the Object, | ||
* like a AVRO GenericRecord, a String or a byte[]. | ||
* | ||
* @return the decoded object | ||
* @throws UnsupportedOperationException if the operation is not supported | ||
*/ | ||
Object getNativeObject(); | ||
eolivelli marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
90 changes: 90 additions & 0 deletions
90
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/PrimitiveRecord.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.client.api.schema; | ||
|
||
import org.apache.pulsar.common.classification.InterfaceAudience; | ||
import org.apache.pulsar.common.classification.InterfaceStability; | ||
import org.apache.pulsar.common.schema.SchemaType; | ||
|
||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Objects; | ||
|
||
/** | ||
* An interface represents a message with schema for non Struct types. | ||
*/ | ||
@InterfaceAudience.Private | ||
@InterfaceStability.Evolving | ||
public class PrimitiveRecord implements GenericRecord { | ||
jerrypeng marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private final Object nativeObject; | ||
private final SchemaType schemaType; | ||
|
||
public static PrimitiveRecord of(Object nativeRecord, SchemaType schemaType) { | ||
return new PrimitiveRecord(nativeRecord, schemaType); | ||
} | ||
|
||
private PrimitiveRecord(Object nativeObject, SchemaType schemaType) { | ||
this.nativeObject = nativeObject; | ||
this.schemaType = schemaType; | ||
} | ||
|
||
@Override | ||
public byte[] getSchemaVersion() { | ||
return null; | ||
} | ||
|
||
@Override | ||
public List<Field> getFields() { | ||
return Collections.emptyList(); | ||
} | ||
|
||
@Override | ||
public Object getField(String fieldName) { | ||
return null; | ||
} | ||
|
||
@Override | ||
public SchemaType getSchemaType() { | ||
return schemaType; | ||
} | ||
|
||
@Override | ||
public Object getNativeObject() { | ||
return nativeObject; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return Objects.toString(nativeObject); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hashCode(nativeObject); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object other) { | ||
if (! (other instanceof PrimitiveRecord)) { | ||
return false; | ||
} | ||
return Objects.equals(nativeObject, ((PrimitiveRecord) other).nativeObject); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this interface suppose to be used as a schema type for producers or consumers, .e.g. Consumer ? If so, can you write some tests that has consumers / producers that uses this new type of Schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we need this interface as all. Shouldn't having these additional methods in
GenericRecord
suffice?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerrypeng
the idea is to let users user "GenericObject" instead of GenericRecord while dealing with "Any object", on the Consumer/Sink side, because GenericRecord for many users is related to a Struct type (like AVRO GenericRecord), and it carries "getFields()"
If so, can you write some tests that has consumers / producers that uses this new type of Schema?
I added tests about GenericObjectWrapper.
The main goal is to have Sink and having the ability to write Sinks that are not bound to a specific Schema at compile time.