Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare Schema Registry for preview #15001

Merged
merged 21 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@
<!-- Suppress the check on code-gen classes -->
<suppress checks="LineLength" files="com.azure.ai.textanalytics.implementation.TextAnalyticsClientImplBuilder"/>
<suppress checks="LineLength" files="com.azure.ai.textanalytics.implementation.TextAnalyticsClientImpl"/>
<suppress checks="." files="com.azure.data.schemaregistry.client.implementation.AzureSchemaRegistryRestService"/>
<suppress checks="." files="AzureSchemaRegistryRestService"/>

<!-- Suppress the check on code-gen classes -->
<suppress checks="LineLength" files="com.azure.ai.formrecognizer.implementation.FormRecognizerClientImplBuilder"/>
Expand All @@ -432,6 +432,7 @@

<!-- TODO: Fix with https://github.com/Azure/azure-sdk-for-java/issues#6716 Method name should follow a common vocabulary. -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files=".*[/\\]textanalytics[/\\].*"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files=".*[/\\]schemaregistry[/\\].*"/>

<!-- Don't enforce non-static ClientLogger instances in azure-core-mgmt PollerFactory and PollingState types-->
<suppress checks="com\.azure\.tools\.checkstyle\.checks\.(ThrowFromClientLoggerCheck|GoodLoggingCheck)"
Expand Down
5 changes: 5 additions & 0 deletions sdk/schemaregistry/azure-data-schemaregistry-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
<artifactId>azure-data-schemaregistry</artifactId>
<version>1.0.0-beta.3</version> <!-- {x-version-update;com.azure:azure-data-schemaregistry;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-experimental</artifactId>
<version>1.0.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-experimental;dependency} -->
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.data.schemaregistry.avro;

import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.models.SerializationType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

/**
* Base Codec class for Avro encoder and decoder implementations
*/
class AvroSchemaRegistryUtils {
private final ClientLogger logger = new ClientLogger(AvroSchemaRegistryUtils.class);
private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get();
private static final Boolean AVRO_SPECIFIC_READER_DEFAULT = false;

private final Boolean avroSpecificReader;

/**
* Instantiates AvroCodec instance
* @param avroSpecificReader flag indicating if decoder should decode records as SpecificRecords
*/
AvroSchemaRegistryUtils(Boolean avroSpecificReader) {
if (avroSpecificReader == null) {
this.avroSpecificReader = AvroSchemaRegistryUtils.AVRO_SPECIFIC_READER_DEFAULT;
} else {
this.avroSpecificReader = avroSpecificReader;
}
}

SerializationType getSerializationType() {
return SerializationType.AVRO;
}

/**
* @param schemaString string representation of schema
* @return avro schema
*/
Schema parseSchemaString(String schemaString) {
return (new Schema.Parser()).parse(schemaString);
}


/**
* @param object Schema object used to generate schema string
* @see AvroSchemaUtils for distinction between primitive and Avro schema generation
* @return string representation of schema
*/
String getSchemaString(Object object) {
Schema schema = AvroSchemaUtils.getSchema(object);
return schema.toString();
}

/**
* Returns schema name for storing schemas in schema registry store.
*
* @param object Schema object used to generate schema path
* @return schema name as string
*/
String getSchemaName(Object object) {
return AvroSchemaUtils.getSchema(object).getFullName();
}

String getSchemaGroup() {
return "$Default";
}

/**
* Returns ByteArrayOutputStream containing Avro encoding of object parameter
* @param object Object to be encoded into byte stream
* @return closed ByteArrayOutputStream
*/
byte[] encode(Object object) {
Schema schema = AvroSchemaUtils.getSchema(object);

try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
if (object instanceof byte[]) {
out.write((byte[]) object); // todo: real avro byte arrays require writing array size to buffer
} else {
BinaryEncoder encoder = ENCODER_FACTORY.directBinaryEncoder(out, null);
DatumWriter<Object> writer;
if (object instanceof SpecificRecord) {
writer = new SpecificDatumWriter<>(schema);
} else {
writer = new GenericDatumWriter<>(schema);
}
writer.write(object, encoder);
encoder.flush();
}
return out.toByteArray();
} catch (IOException | RuntimeException e) {
// Avro serialization can throw AvroRuntimeException, NullPointerException, ClassCastException, etc
throw logger.logExceptionAsError(
new IllegalStateException("Error serializing Avro message", e));
}
}


/**
* @param b byte array containing encoded bytes
* @param schemaBytes schema content for Avro reader read - fetched from Azure Schema Registry
* @return deserialized object
*/
<T> T decode(byte[] b, byte[] schemaBytes) {
Objects.requireNonNull(schemaBytes, "Schema must not be null.");

String schemaString = new String(schemaBytes, StandardCharsets.UTF_8);
Schema schemaObject = parseSchemaString(schemaString);

DatumReader<T> reader = getDatumReader(schemaObject);

try {
T result = reader.read(null, DECODER_FACTORY.binaryDecoder(b, null));
return result;
} catch (IOException | RuntimeException e) {
throw logger.logExceptionAsError(new IllegalStateException("Error deserializing Avro message.", e));
}
}

/**
* Returns correct reader for decoding payload.
*
* @param writerSchema Avro schema fetched from schema registry store
* @return correct Avro DatumReader object given encoder configuration
*/
private <T> DatumReader<T> getDatumReader(Schema writerSchema) {
boolean writerSchemaIsPrimitive = AvroSchemaUtils.getPrimitiveSchemas().values().contains(writerSchema);
// do not use SpecificDatumReader if writerSchema is a primitive
if (avroSpecificReader && !writerSchemaIsPrimitive) {
return new SpecificDatumReader<>(writerSchema);
} else {
return new GenericDatumReader<>(writerSchema);
}
}
}
Loading