From 03c94733365192fca43c56124055f5d135286289 Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Thu, 12 Sep 2024 12:00:43 +0530 Subject: [PATCH] fix: NPE when a schema isn't found in a schema registry --- .../model/avro/internal/AvroModelHandler.java | 35 +++++++------ .../internal/AvroWriteConverterHandler.java | 49 ++++++++++--------- 2 files changed, 45 insertions(+), 39 deletions(-) diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java index d0dcca7602..5aec7c6007 100644 --- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java +++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java @@ -135,26 +135,29 @@ protected final boolean validate( try { Schema schema = supplySchema(schemaId); - switch (schema.getType()) + if (schema != null) { - case STRING: - status = true; - break; - case RECORD: - GenericRecord record = supplyRecord(schemaId); - in.wrap(buffer, index, length); - GenericDatumReader reader = supplyReader(schemaId); - if (reader != null) + switch (schema.getType()) { - decoderFactory.binaryDecoder(in, decoder); - reader.read(record, decoder); + case STRING: status = true; + break; + case RECORD: + GenericRecord record = supplyRecord(schemaId); + in.wrap(buffer, index, length); + GenericDatumReader reader = supplyReader(schemaId); + if (reader != null) + { + decoderFactory.binaryDecoder(in, decoder); + reader.read(record, decoder); + status = true; + } + progress = index; + extractFields(buffer, index + length, schema); + break; + default: + break; } - progress = index; - extractFields(buffer, index + length, schema); - break; - default: - break; } } catch (IOException | AvroRuntimeException ex) diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java index 68617a1ecb..fc6ff31555 100644 --- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java +++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java @@ -87,34 +87,37 @@ private int serializeJsonRecord( { Schema schema = supplySchema(schemaId); - switch (schema.getType()) + if (schema != null) { - case STRING: - next.accept(buffer, index, length); - valLength = length; - break; - case RECORD: - GenericDatumReader reader = supplyReader(schemaId); - GenericDatumWriter writer = supplyWriter(schemaId); - if (reader != null) + switch (schema.getType()) { - GenericRecord record = supplyRecord(schemaId); - in.wrap(buffer, index, length); - expandable.wrap(expandable.buffer()); - record = reader.read(record, decoderFactory.jsonDecoder(schema, in)); - encoderFactory.binaryEncoder(expandable, encoder); - writer.write(record, encoder); - encoder.flush(); - int position = expandable.position(); - if (position > 0) + case STRING: + next.accept(buffer, index, length); + valLength = length; + break; + case RECORD: + GenericDatumReader reader = supplyReader(schemaId); + GenericDatumWriter writer = supplyWriter(schemaId); + if (reader != null) { - next.accept(expandable.buffer(), 0, position); - valLength = position; + GenericRecord record = supplyRecord(schemaId); + in.wrap(buffer, index, length); + expandable.wrap(expandable.buffer()); + record = reader.read(record, decoderFactory.jsonDecoder(schema, in)); + encoderFactory.binaryEncoder(expandable, encoder); + writer.write(record, encoder); + encoder.flush(); + int position = expandable.position(); + if (position > 0) + { + next.accept(expandable.buffer(), 0, position); + valLength = position; + } } + break; + default: + break; } - break; - default: - break; } } catch (IOException | AvroRuntimeException ex)