Skip to content

Commit

Permalink
Throw parse exceptions on schema get errors for SchemaRegistryBasedAv…
Browse files Browse the repository at this point in the history
…roBytesDecoder (apache#12080)

* Add option to throw parse exceptions on schema get errors for SchemaRegistryBasedAvroBytesDecoder

* Remove option

(cherry picked from commit 74c876e)
  • Loading branch information
jon-wei authored and sachinsagare committed Nov 3, 2022
1 parent 0b7889a commit e373368
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 19 deletions.
6 changes: 6 additions & 0 deletions docs/ingestion/data-formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,12 @@ Multiple Instances:
...
```

###### Parse exceptions

The following errors when reading records will be considered parse exceptions, which can be limited and logged with ingestion task configurations such as `maxParseExceptions` and `maxSavedParseExceptions`:
- Failure to retrieve a schema due to misconfiguration or corrupt records (invalid schema IDs)
- Failure to decode an Avro message

### Avro OCF

To load the Avro OCF input format, load the Druid Avro extension ([`druid-avro-extensions`](../development/extensions-core/avro.md)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.DynamicConfigProviderUtils;

Expand Down Expand Up @@ -141,10 +140,10 @@ public GenericRecord parse(ByteBuffer bytes)
schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null;
}
catch (IOException | RestClientException ex) {
throw new RE(ex, "Failed to get Avro schema: %s", id);
throw new ParseException(null, "Failed to get Avro schema: %s", id);
}
if (schema == null) {
throw new RE("Failed to find Avro schema: %s", id);
throw new ParseException(null, "Failed to find Avro schema: %s", id);
}
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
try {
Expand All @@ -164,24 +163,17 @@ public boolean equals(Object o)
if (o == null || getClass() != o.getClass()) {
return false;
}

SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o;

return Objects.equals(url, that.url) &&
Objects.equals(capacity, that.capacity) &&
Objects.equals(urls, that.urls) &&
Objects.equals(config, that.config) &&
Objects.equals(headers, that.headers);
return capacity == that.capacity
&& Objects.equals(url, that.url)
&& Objects.equals(urls, that.urls)
&& Objects.equals(config, that.config)
&& Objects.equals(headers, that.headers);
}

@Override
public int hashCode()
{
int result = url != null ? url.hashCode() : 0;
result = 31 * result + capacity;
result = 31 * result + (urls != null ? urls.hashCode() : 0);
result = 31 * result + (config != null ? config.hashCode() : 0);
result = 31 * result + (headers != null ? headers.hashCode() : 0);
return result;
return Objects.hash(registry, url, capacity, urls, config, headers, jsonMapper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.SomeAvroDatum;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.junit.Assert;
Expand Down Expand Up @@ -148,7 +147,7 @@ public void testParseCorruptedPartial() throws Exception
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
}

@Test(expected = RE.class)
@Test(expected = ParseException.class)
public void testParseWrongSchemaType() throws Exception
{
// Given
Expand All @@ -159,7 +158,7 @@ public void testParseWrongSchemaType() throws Exception
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
}

@Test(expected = RE.class)
@Test(expected = ParseException.class)
public void testParseWrongId() throws Exception
{
// Given
Expand Down

0 comments on commit e373368

Please sign in to comment.