Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Mar 30, 2022
1 parent d8ef1a5 commit 192037b
Showing 1 changed file with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Void process(GenericObject genericObject, Context context) throws Excepti

Schema outputSchema = schema;
Object outputObject = genericObject.getNativeObject();

boolean someThingDone = false;
if (schema instanceof KeyValueSchema && nativeObject instanceof KeyValue) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;

Expand Down Expand Up @@ -96,6 +96,7 @@ public Void process(GenericObject genericObject, Context context) throws Excepti
Schema newValueSchema = Schema.NATIVE_AVRO(modified);
outputSchema = Schema.KeyValue(keySchema, newValueSchema, kvSchema.getKeyValueEncodingType());
outputObject = new KeyValue(originalObject.getKey(), newValue);
someThingDone = true;
}
}
} else if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
Expand Down Expand Up @@ -125,6 +126,29 @@ public Void process(GenericObject genericObject, Context context) throws Excepti
Schema newValueSchema = Schema.NATIVE_AVRO(modified);
outputSchema = newValueSchema;
outputObject = oo.toByteArray();
someThingDone = true;
}
}

if (!someThingDone) {
// do some processing...
final boolean isStruct;
switch (currentRecord.getSchema().getSchemaInfo().getType()) {
case AVRO:
case JSON:
case PROTOBUF_NATIVE:
isStruct = true;
break;
default:
isStruct = false;
break;
}
if (isStruct) {
// GenericRecord must stay wrapped
outputObject = currentRecord.getValue();
} else {
// primitives and KeyValue must be unwrapped
outputObject = nativeObject;
}
}
log.info("output {} schema {}", outputObject, outputSchema);
Expand Down

0 comments on commit 192037b

Please sign in to comment.