Skip to content

Commit

Permalink
Merge pull request #6 from rtdi/master
Browse files Browse the repository at this point in the history
update to support all unions
  • Loading branch information
wernerdaehn authored Jan 13, 2025
2 parents 7aeae9b + e5b154d commit ac0ec3f
Show file tree
Hide file tree
Showing 16 changed files with 440 additions and 111 deletions.
2 changes: 1 addition & 1 deletion .settings/org.eclipse.jdt.core.prefs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=warning
org.eclipse.jdt.core.compiler.release=enabled
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=11
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<artifactId>kafkaavro</artifactId>
<name>${project.groupId}:${project.artifactId}</name>
<description>Convenience methods for using Avro with Kafka.</description>
<version>0.10.14</version>
<version>0.10.15</version>
<packaging>jar</packaging>
<url>https://www.rtdi.io/</url>
<scm>
Expand Down
13 changes: 6 additions & 7 deletions src/main/java/io/rtdi/bigdata/kafka/avro/AvroDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -22,22 +21,22 @@
public class AvroDeserializer {

private static final DecoderFactory decoderFactory = DecoderFactory.get();

static {
LogicalDataTypesRegistry.registerAll();
}

/**
* Takes the Kafka message payload and extract the schemaid from it. Based on that the schema can be read from the schema registry.
*
*
* @param data Kafka message payload in binary form
* @return schemaid associated with that message
* @throws IOException in case this is not a valid Avro Kafka message
*/
public static int getSchemaId(byte[] data) throws IOException {
if (data != null) {
if (data[0] != AvroUtils.MAGIC_BYTE) {
throw new AvroRuntimeException("Not a valid Kafka Avro message frame");
throw new IOException("Not a valid Kafka Avro message frame");
} else {
ByteBuffer bb = ByteBuffer.wrap(data, 1, Integer.BYTES);
return bb.getInt();
Expand All @@ -50,7 +49,7 @@ public static int getSchemaId(byte[] data) throws IOException {
/**
* Converts a byte[] into an Avro GenericRecord using the supplied schema.
* The schema must be read from the schema registry using the message's schema id, see {@link #getSchemaId(byte[])}
*
*
* @param data with the binary Avro representation
* @param schema used for the deserialization
* @return AvroRecord in Jexl abstraction
Expand All @@ -61,7 +60,7 @@ public static GenericRecord deserialize(byte[] data, Schema schema) throws IOExc
try (ByteArrayInputStream in = new ByteArrayInputStream(data); ) {
int b = in.read();
if (b != AvroUtils.MAGIC_BYTE) {
throw new AvroRuntimeException("Not a valid Kafka Avro message frame");
throw new IOException("Not a valid Kafka Avro message frame");
} else {
in.skip(Integer.BYTES);
BinaryDecoder decoder = decoderFactory.directBinaryDecoder(in, null);
Expand Down
19 changes: 9 additions & 10 deletions src/main/java/io/rtdi/bigdata/kafka/avro/AvroUtils.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
package io.rtdi.bigdata.kafka.avro;

import java.io.IOException;
import java.util.List;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.text.StringEscapeUtils;

public class AvroUtils {

public static String convertRecordToJson(GenericRecord record) throws IOException {
return record.toString();
}


/**
* Convert a text into a string used as value for a Json field.
*
*
* @param text input text to be escaped
* @return properly escaped string so it does not break the Json format
*/
Expand All @@ -37,7 +31,7 @@ public static String encodeJson(String text) {

/**
* In case this schema is a union of null and something else, it returns the _something else_
*
*
* @param schema of the input
* @return schema without the union of null, in case it is just that. Can return an union still.
*/
Expand All @@ -46,14 +40,19 @@ public static Schema getBaseSchema(Schema schema) {
return null;
} else if (schema.getType() == Type.UNION) {
List<Schema> types = schema.getTypes();
/*
* The first element is what is the data type used by the default value, hence it can be at both places
*/
if (types.size() == 2 && types.get(0).getType() == Type.NULL) {
return types.get(1);
} else if (types.size() == 2 && types.get(1).getType() == Type.NULL) {
return types.get(0);
} else {
return schema;
}
} else {
return schema;
}

}
}
19 changes: 17 additions & 2 deletions src/main/java/io/rtdi/bigdata/kafka/avro/datatypes/AvroArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ private AvroArray(Schema valueschema) {
this.schema = addToSchema(Schema.createArray(valueschema));
}

/**
* Constructor for this static instance
*/
private AvroArray() {
super(NAME);
}

/**
* Create an instance of that type.
* @param valueschema specifies the datatype of the array
Expand All @@ -46,6 +53,14 @@ public static AvroArray create(Schema valueschema) {
return new AvroArray(valueschema);
}

/**
* Create an instance of that type.
* @return the instance
*/
public static AvroArray create() {
return new AvroArray();
}

/**
* @param primitive contains the data type of the array items
* @return AvroArray using the primitive type as list data type
Expand All @@ -60,7 +75,7 @@ public static AvroArray create(IAvroPrimitive primitive) {
public Schema getSchema() {
return schema;
}

@Override
public Schema addToSchema(Schema schema) {
return super.addToSchema(schema);
Expand Down Expand Up @@ -104,7 +119,7 @@ public List<?> convertToJava(Object value) throws AvroDataTypeException {
}

public static class Factory implements LogicalTypeFactory {

public Factory() {
}

Expand Down
18 changes: 12 additions & 6 deletions src/main/java/io/rtdi/bigdata/kafka/avro/datatypes/AvroDouble.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class AvroDouble extends LogicalType implements IAvroPrimitive {
public static Schema getSchema() {
return schema;
}

private AvroDouble() {
super(NAME);
}
Expand Down Expand Up @@ -56,16 +56,20 @@ public void validate(Schema schema) {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return true;
}

@Override
public int hashCode() {
return 1;
}

@Override
public String toString() {
return NAME;
Expand All @@ -84,7 +88,7 @@ public Double convertToInternal(Object value) throws AvroDataTypeException {
throw new AvroDataTypeException("Cannot convert the string \"" + value + "\" into a Double");
}
} else if (value instanceof Number) {
return Double.valueOf(((Number) value).toString()); // going via Strings to avoid representation errors
return Double.valueOf(value.toString()); // going via Strings to avoid representation errors
}
throw new AvroDataTypeException("Cannot convert a value of type \"" + value.getClass().getSimpleName() + "\" into a Double");
}
Expand All @@ -95,12 +99,14 @@ public Double convertToJava(Object value) throws AvroDataTypeException {
return null;
} else if (value instanceof Double) {
return (Double) value;
} else if (value instanceof Number) {
return ((Number) value).doubleValue();
}
throw new AvroDataTypeException("Cannot convert a value of type \"" + value.getClass().getSimpleName() + "\" into a Double");
}

public static class Factory implements LogicalTypeFactory {

public Factory() {
}

Expand Down
16 changes: 11 additions & 5 deletions src/main/java/io/rtdi/bigdata/kafka/avro/datatypes/AvroFloat.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class AvroFloat extends LogicalType implements IAvroPrimitive {
public static Schema getSchema() {
return schema;
}

/**
* Constructor for this static instance
*/
Expand Down Expand Up @@ -59,16 +59,20 @@ public void validate(Schema schema) {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return true;
}

@Override
public int hashCode() {
return 1;
}

@Override
public String toString() {
return NAME;
Expand Down Expand Up @@ -98,12 +102,14 @@ public Float convertToJava(Object value) throws AvroDataTypeException {
return null;
} else if (value instanceof Float) {
return (Float) value;
} else if (value instanceof Number) {
return ((Number) value).floatValue();
}
throw new AvroDataTypeException("Cannot convert a value of type \"" + value.getClass().getSimpleName() + "\" into a Float");
}

public static class Factory implements LogicalTypeFactory {

public Factory() {
}

Expand Down
16 changes: 11 additions & 5 deletions src/main/java/io/rtdi/bigdata/kafka/avro/datatypes/AvroInt.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class AvroInt extends LogicalType implements IAvroPrimitive {
public static Schema getSchema() {
return schema;
}

/**
* Constructor for this static instance
*/
Expand Down Expand Up @@ -59,16 +59,20 @@ public void validate(Schema schema) {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return true;
}

@Override
public int hashCode() {
return 1;
}

@Override
public String toString() {
return NAME;
Expand Down Expand Up @@ -99,12 +103,14 @@ public Integer convertToJava(Object value) throws AvroDataTypeException {
return null;
} else if (value instanceof Integer) {
return (Integer) value;
} else if (value instanceof Number) {
return ((Number) value).intValue();
}
throw new AvroDataTypeException("Cannot convert a value of type \"" + value.getClass().getSimpleName() + "\" into a Integer");
}

public static class Factory implements LogicalTypeFactory {

public Factory() {
}

Expand Down
14 changes: 10 additions & 4 deletions src/main/java/io/rtdi/bigdata/kafka/avro/datatypes/AvroLong.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,20 @@ public void validate(Schema schema) {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return true;
}

@Override
public int hashCode() {
return 1;
}

@Override
public String toString() {
return NAME;
Expand Down Expand Up @@ -98,12 +102,14 @@ public Long convertToJava(Object value) throws AvroDataTypeException {
return null;
} else if (value instanceof Long) {
return (Long) value;
} else if (value instanceof Number) {
return ((Number) value).longValue();
}
throw new AvroDataTypeException("Cannot convert a value of type \"" + value.getClass().getSimpleName() + "\" into a Long");
}

public static class Factory implements LogicalTypeFactory {

public Factory() {
}

Expand Down
Loading

0 comments on commit ac0ec3f

Please sign in to comment.