-
-
Notifications
You must be signed in to change notification settings - Fork 149
Description
I am working on a requirement to develop a generic cdc consumer using kafka . The records are put into kafka using jdbc connector , so based on the source the avro schema may vary . For example if I use Idcol which is long data type in the java , the datatypes in the databases could be different
postgres :
idcol bigint,
avro schema :
"name": "idcol",
"type": [
"null",
"long"
]
sqlserver :
numeric(19,0)
avro schema :
"name": "idcol",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
oracle:
idcol NUMBER(19,0)
avroschema:
"name":"IDCOL",
"type":{
"type":"bytes",
"scale":0,
"precision":64,
"connect.version":1,
"connect.parameters":{
"scale":"0"
},
"connect.name":"org.apache.kafka.connect.data.Decimal",
"logicalType":"decimal"
}
So when ever I tried to deserialize the data using avro mapper , the code works for postgres and fails for the oracle & mssql with exception.
com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of long out of VALUE_EMBEDDED_OBJECT token
I can avoid this by adding a custom deserializer above the destination POJO , some thing like this
@Id
@Column(name = "id")
@JsonProperty("idcol")
@JsonDeserialize(using = LongDeserializer.class)
private Long id;
But this is the same case for all the data types like int , boolean, short etc ..,
I am mapping the pojo using the following snippet
AvroMapper mapper = new AvroMapper();
mapper.configure(JsonGenerator.Feature.IGNORE_UNKNOWN, true);
mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
GenericRecord avroRecord = (GenericRecord)consumerRecord.value();
byte[] avroBytes = convertToBytes(avroRecord);
Schema schema = avroRecord.getSchema();
AvroSchema avroSchema = new AvroSchema(schema);
JavaType javaType = TypeFactory.defaultInstance().constructType(pojoType);
return avroMapper.readerFor(javaType).with(avroSchema)
.readValue(avroBytes);
Let me know if I am missing some thing , any help is appreciated .