diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldFunction.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldFunction.java index fcd629ad305f24..cc2a81bde645bf 100644 --- a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldFunction.java +++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldFunction.java @@ -56,7 +56,7 @@ public Void process(GenericObject genericObject, Context context) throws Excepti Schema outputSchema = schema; Object outputObject = genericObject.getNativeObject(); - + boolean someThingDone = false; if (schema instanceof KeyValueSchema && nativeObject instanceof KeyValue) { KeyValueSchema kvSchema = (KeyValueSchema) schema; @@ -96,6 +96,7 @@ public Void process(GenericObject genericObject, Context context) throws Excepti Schema newValueSchema = Schema.NATIVE_AVRO(modified); outputSchema = Schema.KeyValue(keySchema, newValueSchema, kvSchema.getKeyValueEncodingType()); outputObject = new KeyValue(originalObject.getKey(), newValue); + someThingDone = true; } } } else if (schema.getSchemaInfo().getType() == SchemaType.AVRO) { @@ -125,6 +126,29 @@ public Void process(GenericObject genericObject, Context context) throws Excepti Schema newValueSchema = Schema.NATIVE_AVRO(modified); outputSchema = newValueSchema; outputObject = oo.toByteArray(); + someThingDone = true; + } + } + + if (!someThingDone) { + // do some processing... + final boolean isStruct; + switch (currentRecord.getSchema().getSchemaInfo().getType()) { + case AVRO: + case JSON: + case PROTOBUF_NATIVE: + isStruct = true; + break; + default: + isStruct = false; + break; + } + if (isStruct) { + // GenericRecord must stay wrapped + outputObject = currentRecord.getValue(); + } else { + // primitives and KeyValue must be unwrapped + outputObject = nativeObject; } } log.info("output {} schema {}", outputObject, outputSchema);