From e373368060559ca76e54c417fd8547db961c947a Mon Sep 17 00:00:00 2001 From: Jonathan Wei Date: Thu, 13 Jan 2022 12:36:51 -0600 Subject: [PATCH] Throw parse exceptions on schema get errors for SchemaRegistryBasedAvroBytesDecoder (#12080) * Add option to throw parse exceptions on schema get errors for SchemaRegistryBasedAvroBytesDecoder * Remove option (cherry picked from commit 74c876e57804617ccb3af2b5af026cacd0320ff4) --- docs/ingestion/data-formats.md | 6 +++++ .../SchemaRegistryBasedAvroBytesDecoder.java | 24 +++++++------------ ...hemaRegistryBasedAvroBytesDecoderTest.java | 5 ++-- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index fc56018c0154..15872734082e 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -488,6 +488,12 @@ Multiple Instances: ... ``` +###### Parse exceptions + +The following errors when reading records will be considered parse exceptions, which can be limited and logged with ingestion task configurations such as `maxParseExceptions` and `maxSavedParseExceptions`: +- Failure to retrieve a schema due to misconfiguration or corrupt records (invalid schema IDs) +- Failure to decode an Avro message + ### Avro OCF To load the Avro OCF input format, load the Druid Avro extension ([`druid-avro-extensions`](../development/extensions-core/avro.md)). diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 41847d562d84..f7006cb02c21 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -35,7 +35,6 @@ import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; import org.apache.druid.guice.annotations.Json; -import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.utils.DynamicConfigProviderUtils; @@ -141,10 +140,10 @@ public GenericRecord parse(ByteBuffer bytes) schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null; } catch (IOException | RestClientException ex) { - throw new RE(ex, "Failed to get Avro schema: %s", id); + throw new ParseException(null, "Failed to get Avro schema: %s", id); } if (schema == null) { - throw new RE("Failed to find Avro schema: %s", id); + throw new ParseException(null, "Failed to find Avro schema: %s", id); } DatumReader reader = new GenericDatumReader<>(schema); try { @@ -164,24 +163,17 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o; - - return Objects.equals(url, that.url) && - Objects.equals(capacity, that.capacity) && - Objects.equals(urls, that.urls) && - Objects.equals(config, that.config) && - Objects.equals(headers, that.headers); + return capacity == that.capacity + && Objects.equals(url, that.url) + && Objects.equals(urls, that.urls) + && Objects.equals(config, that.config) + && Objects.equals(headers, that.headers); } @Override public int hashCode() { - int result = url != null ? url.hashCode() : 0; - result = 31 * result + capacity; - result = 31 * result + (urls != null ? urls.hashCode() : 0); - result = 31 * result + (config != null ? config.hashCode() : 0); - result = 31 * result + (headers != null ? headers.hashCode() : 0); - return result; + return Objects.hash(registry, url, capacity, urls, config, headers, jsonMapper); } } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index ec073c92750b..9348e9486aef 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -33,7 +33,6 @@ import org.apache.druid.data.input.AvroStreamInputRowParserTest; import org.apache.druid.data.input.SomeAvroDatum; import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.utils.DynamicConfigProviderUtils; import org.junit.Assert; @@ -148,7 +147,7 @@ public void testParseCorruptedPartial() throws Exception new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); } - @Test(expected = RE.class) + @Test(expected = ParseException.class) public void testParseWrongSchemaType() throws Exception { // Given @@ -159,7 +158,7 @@ public void testParseWrongSchemaType() throws Exception new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); } - @Test(expected = RE.class) + @Test(expected = ParseException.class) public void testParseWrongId() throws Exception { // Given