Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar Functions: allow a Function<GenericObject,?> to access the original Schema of the Message and use it #14847

Conversation

eolivelli
Copy link
Contributor

@eolivelli eolivelli commented Mar 24, 2022

Motivation

Currently a Function cannot access the original Schema of the Message but it only receives AutoConsumeSchema that is a special schema that is not suitable to Producing messages.

This is an example of Identity Function that picks any message, in spite of the Schema and writes it to a output topic.

@Slf4j
public class MyFunctionIdentityTransform implements Function<GenericObject, Void> {

    @Override
    public Void process(GenericObject genericObject, Context context) throws Exception {
        Record<?> currentRecord = context.getCurrentRecord();
        log.info("apply to {} {}", genericObject, genericObject.getNativeObject());
        log.info("record with schema {} {}", currentRecord.getSchema(), currentRecord);
        context.newOutputMessage(context.getOutputTopic(), (Schema) currentRecord.getSchema())
                .value(genericObject.getNativeObject()).send();
        return null;
    }
}

This kind of Functions must also work well with KeyValue<GenericRecord, GenericRecord> input messages, and preserve the schema properties (like KeyValueEncoding.SEPARATED, or the SchemaType of the components).

Please note that Function<GenericObject, GenericObject> cannot work, because GenericObject (or GenericRecord) does not carry full Schema information, so you cannot set a Schema to the output Record just by returning a POJO or a GenericObject. The user has to use newOutputMessage(topic, Schema)

Modifications

Unwrap AutoConsumeSchema in PulsarSource, when we pick a Message from the Pulsar topic, and set on the PulsarRecord the wrapped Schema.

Verifying this change

I will add tests

@eolivelli eolivelli added this to the 2.11.0 milestone Mar 24, 2022
@eolivelli eolivelli self-assigned this Mar 24, 2022
@eolivelli
Copy link
Contributor Author

@congbobo184 This patch is for early preview, I am going to add integration tests that covers the example function.

@eolivelli eolivelli requested a review from congbobo184 March 24, 2022 11:30
@eolivelli
Copy link
Contributor Author

This is another kind of Function that is unblocked by this patch.
The function reads any Object and in case it is a KeyValue<?, AVRO> it removes a field from the AVRO struct and then writes the KeyValue downstream
(this is only an example, we should cache the Schema instances and organise the code better)

@Slf4j
public class MyFunctionRemoveFieldTransform implements Function<GenericObject, Void> {

    private static final String FIELD_TO_REMOVE = "foo";

    @Override
    public Void process(GenericObject genericObject, Context context) throws Exception {
        Record<?> currentRecord = context.getCurrentRecord();
        log.info("apply to {} {}", genericObject, genericObject.getNativeObject());
        log.info("record with schema {} {}", currentRecord.getSchema(), currentRecord);
        Object nativeObject = genericObject.getNativeObject();
        Schema<?> schema = currentRecord.getSchema();

        Schema outputSchema = schema;
        Object outputObject = genericObject.getNativeObject();

        if (schema instanceof KeyValueSchema && nativeObject instanceof KeyValue)  {
            KeyValueSchema kvSchema = (KeyValueSchema) schema;

            Schema keySchema = kvSchema.getKeySchema();
            Schema valueSchema = kvSchema.getValueSchema();
            // remove a column "foo" from the "valueSchema"
            if (valueSchema.getSchemaInfo().getType() == SchemaType.AVRO) {

                org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) valueSchema.getNativeSchema().get();
                if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
                    org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
                    org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
                    org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
                            originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(), originalAvroSchema.isError(),
                            originalAvroSchema.getFields().
                                stream()
                                .filter(f->!f.name().equals(FIELD_TO_REMOVE))
                                .map(f-> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
                                .collect(Collectors.toList()));

                    Schema newValueSchema = Schema.NATIVE_AVRO(modified);

                    outputSchema = Schema.KeyValue(keySchema, newValueSchema, kvSchema.getKeyValueEncodingType());
                    KeyValue originalObject = (KeyValue) nativeObject;

                    GenericRecord value = (GenericRecord) originalObject.getValue();
                    org.apache.avro.generic.GenericRecord genericRecord
                            = (org.apache.avro.generic.GenericRecord) value.getNativeObject();

                    org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
                    for (org.apache.avro.Schema.Field field : modified.getFields()) {
                        newRecord.put(field.name(), genericRecord.get(field.name()));
                    }
                    GenericDatumWriter writer = new GenericDatumWriter(modified);
                    ByteArrayOutputStream oo = new ByteArrayOutputStream();
                    BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(oo, null);
                    writer.write(newRecord, encoder);
                    Object newValue = oo.toByteArray();

                    outputObject = new KeyValue(originalObject.getKey(), newValue);
                }


            }
        }
        log.info("output {} schema {}", outputObject, outputSchema);
        context.newOutputMessage(context.getOutputTopic(), outputSchema)
                .value(outputObject).send();
        return null;
    }
}

@github-actions
Copy link

@eolivelli:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@eolivelli eolivelli added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Mar 25, 2022
@eolivelli eolivelli force-pushed the impl/functions-original-schema-genericobject branch 2 times, most recently from 192037b to c025e5a Compare March 30, 2022 07:03
@eolivelli eolivelli marked this pull request as ready for review March 30, 2022 07:03
@eolivelli eolivelli closed this Mar 30, 2022
@eolivelli eolivelli reopened this Mar 30, 2022
@eolivelli eolivelli force-pushed the impl/functions-original-schema-genericobject branch 2 times, most recently from 5ffb1ad to 0ef1cc6 Compare March 30, 2022 15:54
@eolivelli eolivelli closed this Mar 31, 2022
@eolivelli eolivelli reopened this Mar 31, 2022
@eolivelli eolivelli force-pushed the impl/functions-original-schema-genericobject branch from 0ef1cc6 to fcde959 Compare March 31, 2022 07:03
Copy link
Member

@zymap zymap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@hangc0276 hangc0276 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me.

@eolivelli eolivelli force-pushed the impl/functions-original-schema-genericobject branch from 4057ce0 to 01c0b70 Compare April 1, 2022 07:03
@eolivelli eolivelli merged commit 193f5b2 into apache:master Apr 1, 2022
@eolivelli eolivelli deleted the impl/functions-original-schema-genericobject branch April 1, 2022 08:53
Nicklee007 pushed a commit to Nicklee007/pulsar that referenced this pull request Apr 20, 2022
nicoloboschi pushed a commit that referenced this pull request May 12, 2022
…ginal Schema of the Message and use it (#14847)

(cherry picked from commit 193f5b2)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request May 12, 2022
…ginal Schema of the Message and use it (apache#14847)

(cherry picked from commit 193f5b2)
liangyepianzhou pushed a commit that referenced this pull request Nov 30, 2022
…ginal Schema of the Message and use it (#14847)

(cherry picked from commit 193f5b2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants