clazz) {
+ return this.deserializeImpl(stream)
+ .map(o -> {
+ if (clazz.isInstance(o)) {
+ return clazz.cast(o);
+ }
+ throw logger.logExceptionAsError(new SerializationException("Deserialized object not of class %s"));
+ });
}
}
diff --git a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroDeserializer.java b/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroDeserializer.java
deleted file mode 100644
index c683ebc2ec9d2..0000000000000
--- a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroDeserializer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.data.schemaregistry.avro;
-
-import com.azure.data.schemaregistry.AbstractDataDeserializer;
-import com.azure.data.schemaregistry.SerializationException;
-import com.azure.data.schemaregistry.client.CachedSchemaRegistryClient;
-
-/**
- * A deserializer implementation capable of automatedly deserializing encoded byte array payloads into Java objects by
- * fetching payload-specified schemas from the Azure Schema Registry store.
- *
- * SchemaRegistryAvroDeserializer instances should be built using the static Builder class.
- *
- * Pluggable with the core Azure SDK Deserializer interface.
- *
- * @see AbstractDataDeserializer See AbstractDataDeserializer for internal deserialization implementation
- */
-public class SchemaRegistryAvroDeserializer extends AbstractDataDeserializer {
- SchemaRegistryAvroDeserializer(CachedSchemaRegistryClient registryClient, boolean avroSpecificReader) {
- super(registryClient);
-
- loadByteDecoder(new AvroByteDecoder(avroSpecificReader));
- }
-
- /**
- * Deserializes byte array into Java object using payload-specified schema.
- *
- * @param data Byte array containing serialized bytes
- * @return decoded Java object
- *
- * @throws SerializationException Throws on deserialization failure.
- * Exception may contain inner exceptions detailing failure condition.
- */
- public Object deserialize(byte[] data) throws SerializationException {
- return super.deserialize(data);
- }
-}
diff --git a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroDeserializerBuilder.java b/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroDeserializerBuilder.java
deleted file mode 100644
index a385d9bbfa748..0000000000000
--- a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroDeserializerBuilder.java
+++ /dev/null
@@ -1,112 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.data.schemaregistry.avro;
-
-import com.azure.core.credential.TokenCredential;
-import com.azure.data.schemaregistry.client.CachedSchemaRegistryClient;
-import com.azure.data.schemaregistry.client.CachedSchemaRegistryClientBuilder;
-
-import java.util.Objects;
-
-/**
- * Builder class for constructing {@link SchemaRegistryAvroDeserializer} and {@link SchemaRegistryAvroAsyncDeserializer}
- */
-public class SchemaRegistryAvroDeserializerBuilder {
-
- private String registryUrl;
- private TokenCredential credential;
- private boolean avroSpecificReader;
- private Integer maxSchemaMapSize;
-
- /**
- * Instantiates instance of Builder class.
- * Supplies default avro.specific.reader value.
- *
- */
- public SchemaRegistryAvroDeserializerBuilder() {
- this.registryUrl = null;
- this.credential = null;
- this.avroSpecificReader = false;
- this.maxSchemaMapSize = null;
- }
-
- /**
- * Sets the service endpoint for the Azure Schema Registry instance.
- *
- * @return The updated {@link SchemaRegistryAvroDeserializerBuilder} object.
- * @param schemaRegistryUrl The URL of the Azure Schema Registry instance
- * @throws NullPointerException if {@code schemaRegistryUrl} is null
- */
- public SchemaRegistryAvroDeserializerBuilder schemaRegistryUrl(String schemaRegistryUrl) {
- Objects.requireNonNull(schemaRegistryUrl, "'schemaRegistryUrl' cannot be null.");
- this.registryUrl = schemaRegistryUrl;
- return this;
- }
-
- /**
- *
- * @param credential TokenCredential to be used for authenticating with Azure Schema Registry Service
- * @return updated {@link SchemaRegistryAvroDeserializerBuilder} instance
- */
- public SchemaRegistryAvroDeserializerBuilder credential(TokenCredential credential) {
- this.credential = credential;
- return this;
- }
-
- /**
- * Specifies if objects should be deserialized into Avro SpecificRecords via Avro SpecificDatumReader
- * @param avroSpecificReader specific reader flag
- * @return updated {@link SchemaRegistryAvroDeserializerBuilder} instance
- */
- public SchemaRegistryAvroDeserializerBuilder avroSpecificReader(boolean avroSpecificReader) {
- this.avroSpecificReader = avroSpecificReader;
- return this;
- }
-
- /**
- * Specifies maximum schema object cache size for underlying CachedSchemaRegistryClient. If specified cache
- * size is exceeded, all caches are recycled.
- *
- * @param maxSchemaMapSize maximum number of schemas per cache
- * @return updated {@link SchemaRegistryAvroDeserializerBuilder} instance
- */
- public SchemaRegistryAvroDeserializerBuilder maxSchemaMapSize(int maxSchemaMapSize) {
- this.maxSchemaMapSize = maxSchemaMapSize;
- return this;
- }
-
- /**
- * Construct instance of {@link SchemaRegistryAvroAsyncDeserializer}
- *
- * @return {@link SchemaRegistryAvroAsyncDeserializer} instance
- *
- * @throws NullPointerException if parameters are incorrectly set.
- * @throws IllegalArgumentException if credential is not set.
- */
- public SchemaRegistryAvroAsyncDeserializer buildAsyncClient() {
- return new SchemaRegistryAvroAsyncDeserializer(this.buildSyncClient());
- }
-
- /**
- * Construct instance of {@link SchemaRegistryAvroDeserializer}
- *
- * @return {@link SchemaRegistryAvroDeserializer} instance
- *
- * @throws NullPointerException if parameters are incorrectly set.
- * @throws IllegalArgumentException if credential is not set.
- */
- public SchemaRegistryAvroDeserializer buildSyncClient() {
- CachedSchemaRegistryClientBuilder builder = new CachedSchemaRegistryClientBuilder()
- .endpoint(registryUrl)
- .credential(credential);
-
- if (maxSchemaMapSize != null) {
- builder.maxSchemaMapSize(maxSchemaMapSize);
- }
-
- CachedSchemaRegistryClient client = builder.buildClient();
-
- return new SchemaRegistryAvroDeserializer(client, this.avroSpecificReader);
- }
-}
diff --git a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroSerializer.java b/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroSerializer.java
index 03791b6ba2646..942b36eb9fa32 100644
--- a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroSerializer.java
+++ b/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroSerializer.java
@@ -3,9 +3,11 @@
package com.azure.data.schemaregistry.avro;
-import com.azure.data.schemaregistry.AbstractDataSerializer;
+import com.azure.data.schemaregistry.AbstractSchemaRegistrySerializer;
import com.azure.data.schemaregistry.SerializationException;
-import com.azure.data.schemaregistry.client.CachedSchemaRegistryClient;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
/**
* A serializer implementation capable of serializing objects and automatedly storing serialization schemas
@@ -15,31 +17,44 @@
*
* Pluggable with the core Azure SDK Serializer interface.
*
- * @see AbstractDataSerializer See AbstractDataSerializer for internal serialization implementation
+ * @see AbstractSchemaRegistrySerializer See AbstractSchemaRegistrySerializer for internal serialization implementation
*/
-public class SchemaRegistryAvroSerializer extends AbstractDataSerializer {
- SchemaRegistryAvroSerializer(CachedSchemaRegistryClient registryClient,
- String schemaGroup,
- boolean autoRegisterSchemas) {
- super(registryClient);
-
- setByteEncoder(new AvroByteEncoder());
-
- this.autoRegisterSchemas = autoRegisterSchemas;
- this.schemaGroup = schemaGroup;
+public class SchemaRegistryAvroSerializer {
+ private final SchemaRegistryAvroAsyncSerializer serializer;
+ SchemaRegistryAvroSerializer(SchemaRegistryAvroAsyncSerializer serializer) {
+ this.serializer = serializer;
}
/**
* Serializes object into byte array payload using the configured byte encoder.
* @param object target of serialization
- * @return byte array containing GUID reference to schema, then the object serialized into bytes
+ * @return byte array containing unique ID reference to schema, then the object serialized into bytes
* @throws SerializationException Throws on serialization failure.
*/
public byte[] serialize(Object object) throws SerializationException {
if (object == null) {
return null;
}
- return serializeImpl(object);
+
+ ByteArrayOutputStream s = serializer.serialize(new ByteArrayOutputStream(), object).block();
+ if (s != null){
+ s.toByteArray();
+ }
+
+ throw new SerializationException("Serialization failed, null output stream returned.");
+ }
+
+ /**
+ * Deserializes byte array into Java object using payload-specified schema.
+ *
+ * @param data Byte array containing serialized bytes
+ * @return decoded Java object
+ *
+ * @throws SerializationException Throws on deserialization failure.
+ * Exception may contain inner exceptions detailing failure condition.
+ */
+ public Object deserialize(byte[] data) throws SerializationException {
+ return serializer.deserialize(new ByteArrayInputStream(data), Object.class);
}
}
diff --git a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroSerializerBuilder.java b/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroSerializerBuilder.java
index 2cd693190016c..83136b90893bb 100644
--- a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroSerializerBuilder.java
+++ b/sdk/schemaregistry/azure-data-schemaregistry-avro/src/main/java/com/azure/data/schemaregistry/avro/SchemaRegistryAvroSerializerBuilder.java
@@ -4,8 +4,8 @@
package com.azure.data.schemaregistry.avro;
import com.azure.core.credential.TokenCredential;
-import com.azure.data.schemaregistry.AbstractDataSerializer;
-import com.azure.data.schemaregistry.client.CachedSchemaRegistryClient;
+import com.azure.data.schemaregistry.AbstractSchemaRegistrySerializer;
+import com.azure.data.schemaregistry.client.CachedSchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryClientBuilder;
import java.util.Objects;
@@ -19,6 +19,7 @@ public final class SchemaRegistryAvroSerializerBuilder {
private boolean autoRegisterSchemas;
private String schemaGroup;
private Integer maxSchemaMapSize;
+ private boolean avroSpecificReader;
/**
* Instantiates instance of Builder class.
@@ -27,9 +28,10 @@ public final class SchemaRegistryAvroSerializerBuilder {
public SchemaRegistryAvroSerializerBuilder() {
this.registryUrl = null;
this.credential = null;
- this.autoRegisterSchemas = AbstractDataSerializer.AUTO_REGISTER_SCHEMAS_DEFAULT;
- this.schemaGroup = AbstractDataSerializer.SCHEMA_GROUP_DEFAULT;
+ this.autoRegisterSchemas = AbstractSchemaRegistrySerializer.AUTO_REGISTER_SCHEMAS_DEFAULT;
+ this.schemaGroup = AbstractSchemaRegistrySerializer.SCHEMA_GROUP_DEFAULT;
this.maxSchemaMapSize = null;
+ this.avroSpecificReader = false;
}
/**
@@ -87,7 +89,17 @@ public SchemaRegistryAvroSerializerBuilder autoRegisterSchema(boolean autoRegist
}
/**
- * Specifies maximum schema object cache size for underlying CachedSchemaRegistryClient. If specified cache
+ * Specifies if objects should be deserialized into Avro SpecificRecords via Avro SpecificDatumReader
+ * @param avroSpecificReader specific reader flag
+ * @return updated {@link SchemaRegistryAvroSerializerBuilder} instance
+ */
+ public SchemaRegistryAvroSerializerBuilder avroSpecificReader(boolean avroSpecificReader) {
+ this.avroSpecificReader = avroSpecificReader;
+ return this;
+ }
+
+ /**
+ * Specifies maximum schema object cache size for underlying CachedSchemaRegistryAsyncClient. If specified cache
* size is exceeded, all caches are recycled.
*
* @param maxSchemaMapSize maximum number of schemas per cache
@@ -99,24 +111,24 @@ public SchemaRegistryAvroSerializerBuilder maxSchemaMapSize(int maxSchemaMapSize
}
/**
- * Instantiates SchemaRegistry
- * @return {@link SchemaRegistryAvroAsyncSerializer} instance
+ * Instantiates {@link SchemaRegistryAvroSerializer}
+ * @return {@link SchemaRegistryAvroSerializer} instance
*
* @throws NullPointerException if parameters are incorrectly set.
* @throws IllegalArgumentException if credential is not set.
*/
- public SchemaRegistryAvroAsyncSerializer buildAsyncClient() {
- return new SchemaRegistryAvroAsyncSerializer(this.buildSyncClient());
+ public SchemaRegistryAvroSerializer buildClient() {
+ return new SchemaRegistryAvroSerializer(this.buildAsyncClient());
}
/**
- * Instantiates {@link SchemaRegistryAvroSerializer}
- * @return {@link SchemaRegistryAvroSerializer} instance
+ * Instantiates SchemaRegistry
+ * @return {@link SchemaRegistryAvroAsyncSerializer} instance
*
* @throws NullPointerException if parameters are incorrectly set.
* @throws IllegalArgumentException if credential is not set.
*/
- public SchemaRegistryAvroSerializer buildSyncClient() {
+ public SchemaRegistryAvroAsyncSerializer buildAsyncClient() {
CachedSchemaRegistryClientBuilder builder = new CachedSchemaRegistryClientBuilder()
.endpoint(registryUrl)
.credential(credential);
@@ -125,8 +137,13 @@ public SchemaRegistryAvroSerializer buildSyncClient() {
builder.maxSchemaMapSize(maxSchemaMapSize);
}
- CachedSchemaRegistryClient client = builder.buildClient();
+ AvroCodec codec = new AvroCodec(this.avroSpecificReader);
+
+ CachedSchemaRegistryAsyncClient client = builder
+ .addSchemaParser(codec)
+ .buildAsyncClient();
- return new SchemaRegistryAvroSerializer(client, this.schemaGroup, this.autoRegisterSchemas);
+ return new SchemaRegistryAvroAsyncSerializer(client, codec, this.schemaGroup,
+ this.autoRegisterSchemas);
}
}
diff --git a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/test/java/com/azure/data/schemaregistry/avro/AvroByteDecoderTest.java b/sdk/schemaregistry/azure-data-schemaregistry-avro/src/test/java/com/azure/data/schemaregistry/avro/AvroByteDecoderTest.java
deleted file mode 100644
index 6732085e8911d..0000000000000
--- a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/test/java/com/azure/data/schemaregistry/avro/AvroByteDecoderTest.java
+++ /dev/null
@@ -1,15 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.data.schemaregistry.avro;
-
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class AvroByteDecoderTest {
- @Test
- public void testShouldAnswerWithTrue() {
- assertTrue(true);
- }
-}
diff --git a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/test/java/com/azure/data/schemaregistry/avro/AvroByteEncoderTest.java b/sdk/schemaregistry/azure-data-schemaregistry-avro/src/test/java/com/azure/data/schemaregistry/avro/AvroByteEncoderTest.java
deleted file mode 100644
index 2f31889899175..0000000000000
--- a/sdk/schemaregistry/azure-data-schemaregistry-avro/src/test/java/com/azure/data/schemaregistry/avro/AvroByteEncoderTest.java
+++ /dev/null
@@ -1,22 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.data.schemaregistry.avro;
-
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class AvroByteEncoderTest {
- private static final String MOCK_AVRO_SCHEMA_STRING = "{\"namespace\":\"example2.avro\",\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\": [\"int\", \"null\"]}]}";
-
- @Test
- public void testPlaceholder() {
- getEncoder();
- assertTrue(true);
- }
-
- private AvroByteEncoder getEncoder() {
- return new AvroByteEncoder();
- }
-}
diff --git a/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/AbstractDataDeserializer.java b/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/AbstractDataDeserializer.java
deleted file mode 100644
index d757e3876ab38..0000000000000
--- a/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/AbstractDataDeserializer.java
+++ /dev/null
@@ -1,126 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.data.schemaregistry;
-
-import com.azure.core.util.logging.ClientLogger;
-import com.azure.data.schemaregistry.client.SchemaRegistryObject;
-import com.azure.data.schemaregistry.client.SchemaRegistryClient;
-import com.azure.data.schemaregistry.client.SchemaRegistryClientException;
-
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-/**
- * Common implementation for all registry-based deserializers.
- */
-public abstract class AbstractDataDeserializer extends AbstractDataSerDe {
- private final ClientLogger logger = new ClientLogger(AbstractDataDeserializer.class);
-
- private final Map byteDecoderMap = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
-
- /**
- * Constructor called by all concrete implementation constructors.
- * Should only call parent constructor.
- * @param schemaRegistryClient client to be used for fetching schemas by ID
- */
- protected AbstractDataDeserializer(SchemaRegistryClient schemaRegistryClient) {
- super(schemaRegistryClient);
- }
-
- /**
- * Special case constructor for Kafka deserializer's empty constructors.
- */
- protected AbstractDataDeserializer() { }
-
- /**
- * Fetches schema referenced by prefixed ID and deserializes the subsequent payload into Java object.
- *
- * @param payload byte payload, produced by an Azure Schema Registry client producer
- * @return object, deserialized with the prefixed schema
- * @throws SerializationException if deserialization of registry schema or message payload fails.
- */
- protected Object deserialize(byte[] payload) throws SerializationException {
- if (payload == null) {
- return null;
- }
-
- ByteBuffer buffer = ByteBuffer.wrap(payload);
- String schemaGuid = getSchemaGuidFromPayload(buffer);
- SchemaRegistryObject registryObject;
- Object payloadSchema;
-
- try {
- registryObject = this.schemaRegistryClient.getSchemaByGuid(schemaGuid);
- payloadSchema = registryObject.deserialize();
- } catch (SchemaRegistryClientException e) {
- throw logger.logExceptionAsError(
- new SerializationException(String.format("Failed to retrieve schema for id %s", schemaGuid), e));
- }
-
- if (payloadSchema == null) {
- throw logger.logExceptionAsError(
- new SerializationException(
- String.format("Payload schema returned as null. Schema type: %s, Schema ID: %s",
- registryObject.getSchemaType(), registryObject.getSchemaId())));
- }
-
- int start = buffer.position() + buffer.arrayOffset();
- int length = buffer.limit() - AbstractDataSerDe.SCHEMA_ID_SIZE;
- byte[] b = Arrays.copyOfRange(buffer.array(), start, start + length);
-
- ByteDecoder byteDecoder = getByteDecoder(registryObject);
- return byteDecoder.decodeBytes(b, payloadSchema);
- }
-
-
- /**
- * Fetches the correct ByteDecoder based on schema type of the message.
- *
- * @param registryObject object returned from SchemaRegistryClient, contains schema type
- * @return ByteDecoder to be used to deserialize encoded payload bytes
- * @throws SerializationException if decoder for the required schema type has not been loaded
- */
- private ByteDecoder getByteDecoder(SchemaRegistryObject registryObject) throws SerializationException {
- ByteDecoder decoder = byteDecoderMap.get(registryObject.getSchemaType());
- if (decoder == null) {
- throw logger.logExceptionAsError(
- new SerializationException(
- String.format("No decoder class found for schema type '%s'.", registryObject.getSchemaType())
- ));
- }
- return decoder;
- }
-
- /**
- * @param buffer full payload bytes
- * @return String representation of schema ID
- * @throws SerializationException if schema ID could not be extracted from payload
- */
- private String getSchemaGuidFromPayload(ByteBuffer buffer) throws SerializationException {
- byte[] schemaGuidByteArray = new byte[AbstractDataSerDe.SCHEMA_ID_SIZE];
- try {
- buffer.get(schemaGuidByteArray);
- } catch (BufferUnderflowException e) {
- throw logger.logExceptionAsError(new SerializationException("Payload too short, no readable guid.", e));
- }
-
- return new String(schemaGuidByteArray, schemaRegistryClient.getEncoding());
- }
-
- /**
- * Loads a ByteDecoder to be used for decoding message payloads of specified schema type.
- * @param decoder ByteDecoder class instance to be loaded
- */
- protected void loadByteDecoder(ByteDecoder decoder) {
- if (decoder == null) {
- throw logger.logExceptionAsError(new SerializationException("ByteDecoder cannot be null"));
- }
-
- this.byteDecoderMap.put(decoder.schemaType(), decoder);
- this.schemaRegistryClient.addSchemaParser(decoder);
- }
-}
diff --git a/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/AbstractDataSerDe.java b/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/AbstractDataSerDe.java
deleted file mode 100644
index b2bc450371b63..0000000000000
--- a/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/AbstractDataSerDe.java
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.data.schemaregistry;
-
-import com.azure.core.util.logging.ClientLogger;
-import com.azure.data.schemaregistry.client.SchemaRegistryClient;
-
-/**
- * Common fields and helper methods for both the serializer and the deserializer.
- */
-public abstract class AbstractDataSerDe {
- private final ClientLogger logger = new ClientLogger(AbstractDataSerDe.class);
-
- public static final int SCHEMA_ID_SIZE = 32;
-
- protected SchemaRegistryClient schemaRegistryClient;
-
- /**
- * Base constructor for all SerDe implementations.
- * @param schemaRegistryClient client to be used for sending or fetching schemas.
- * @throws IllegalArgumentException schemaRegistryClient parameter cannot be null
- */
- protected AbstractDataSerDe(SchemaRegistryClient schemaRegistryClient) {
- if (schemaRegistryClient == null) {
- throw logger.logExceptionAsError(
- new IllegalArgumentException("Schema registry client must be initialized and passed into builder."));
- }
- this.schemaRegistryClient = schemaRegistryClient;
- }
-
- /**
- * Special case for Kafka serializer/deserializer implementations.
- */
- // special case for Kafka serializer/deserializer
- public AbstractDataSerDe() {
-
- }
-}
diff --git a/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/AbstractDataSerializer.java b/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/AbstractDataSerializer.java
deleted file mode 100644
index 74784e54e37d6..0000000000000
--- a/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/AbstractDataSerializer.java
+++ /dev/null
@@ -1,130 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.data.schemaregistry;
-
-import com.azure.core.util.logging.ClientLogger;
-import com.azure.data.schemaregistry.client.SchemaRegistryClient;
-import com.azure.data.schemaregistry.client.SchemaRegistryClientException;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-
-/**
- * Common implementation for all registry-based serializers.
- */
-public abstract class AbstractDataSerializer extends AbstractDataSerDe {
- private final ClientLogger logger = new ClientLogger(AbstractDataSerializer.class);
-
- public static final Boolean AUTO_REGISTER_SCHEMAS_DEFAULT = false;
- public static final String SCHEMA_GROUP_DEFAULT = "$default";
-
- protected ByteEncoder byteEncoder = null;
- protected String schemaType;
- protected Boolean autoRegisterSchemas = AbstractDataSerializer.AUTO_REGISTER_SCHEMAS_DEFAULT;
- protected String schemaGroup = AbstractDataSerializer.SCHEMA_GROUP_DEFAULT;
-
- /**
- * @param schemaRegistryClient registry client to be used for storing schemas. Not null.
- */
- public AbstractDataSerializer(SchemaRegistryClient schemaRegistryClient) {
- super(schemaRegistryClient);
- }
-
- /**
- * Special case constructor for Kafka serializer.
- */
- public AbstractDataSerializer() {
- }
-
- /**
- * Set ByteEncoder class to be used for serialized objects into bytes
- * @param byteEncoder ByteEncoder instance
- */
- protected void setByteEncoder(ByteEncoder byteEncoder) {
- if (this.byteEncoder != null) {
- throw logger.logExceptionAsError(
- new IllegalArgumentException("Setting multiple encoders on serializer not permitted"));
- }
- this.byteEncoder = byteEncoder;
- this.schemaType = byteEncoder.schemaType();
- this.schemaRegistryClient.addSchemaParser(byteEncoder);
- }
-
- /**
- * Core implementation of registry-based serialization.
- * ID for data schema is fetched from the registry and prefixed to the encoded byte array
- * representation of the object param.
- *
- * @param object object to be serialized
- * @return byte array containing encoded bytes with prefixed schema ID
- * @throws SerializationException if serialization operation fails during runtime.
- */
- protected byte[] serializeImpl(Object object) {
- if (object == null) {
- throw logger.logExceptionAsError(new SerializationException(
- "Null object, behavior should be defined in concrete serializer implementation."));
- }
-
- if (byteEncoder == null) {
- throw logger.logExceptionAsError(
- new SerializationException("Byte encoder null, serializer must be initialized with a byte encoder."));
- }
-
- if (schemaType == null) {
- schemaType = byteEncoder.schemaType();
- }
-
- String schemaString = byteEncoder.getSchemaString(object);
- String schemaName = byteEncoder.getSchemaName(object);
-
- try {
- String schemaGuid = maybeRegisterSchema(
- this.schemaGroup, schemaName, schemaString, this.schemaType);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteBuffer guidBuffer = ByteBuffer.allocate(AbstractDataSerDe.SCHEMA_ID_SIZE)
- .put(schemaGuid.getBytes(StandardCharsets.UTF_8));
- out.write(guidBuffer.array());
- byteEncoder.encode(object).writeTo(out);
- return out.toByteArray();
- } catch (SchemaRegistryClientException | IOException e) {
- if (this.autoRegisterSchemas) {
- throw logger.logExceptionAsError(
- new SerializationException(
- String.format("Error registering Avro schema. Group: %s, name: %s", schemaGroup, schemaName),
- e));
- } else {
- throw logger.logExceptionAsError(
- new SerializationException(
- String.format("Error retrieving Avro schema. Group: %s, name: %s", schemaGroup, schemaName),
- e));
- }
- }
- }
-
- /**
- * If auto-registering is enabled, register schema against SchemaRegistryClient.
- * If auto-registering is disabled, fetch schema ID for provided schema. Requires pre-registering of schema
- * against registry.
- *
- * @param schemaGroup Schema group where schema should be registered.
- * @param schemaName name of schema
- * @param schemaString string representation of schema being stored - must match group schema type
- * @param schemaType type of schema being stored, e.g. avro
- * @return string representation of schema ID
- * @throws SchemaRegistryClientException upon registry client operation failure
- */
- private String maybeRegisterSchema(
- String schemaGroup, String schemaName, String schemaString, String schemaType)
- throws SchemaRegistryClientException {
- if (this.autoRegisterSchemas) {
- return this.schemaRegistryClient.register(schemaGroup, schemaName, schemaString, schemaType)
- .getSchemaId();
- } else {
- return this.schemaRegistryClient.getSchemaId(
- schemaGroup, schemaName, schemaString, schemaType);
- }
- }
-}
diff --git a/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/AbstractSchemaRegistrySerializer.java b/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/AbstractSchemaRegistrySerializer.java
new file mode 100644
index 0000000000000..dcf9abf2d9306
--- /dev/null
+++ b/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/AbstractSchemaRegistrySerializer.java
@@ -0,0 +1,285 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.data.schemaregistry;
+
+import com.azure.core.exception.HttpResponseException;
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.data.schemaregistry.client.CachedSchemaRegistryAsyncClient;
+import com.azure.data.schemaregistry.client.SchemaRegistryClientException;
+import com.azure.data.schemaregistry.client.SchemaRegistryObject;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import static com.azure.core.util.FluxUtil.monoError;
+
+/**
+ * Common implementation for all registry-based serializers.
+ */
+public abstract class AbstractSchemaRegistrySerializer {
+ private final ClientLogger logger = new ClientLogger(AbstractSchemaRegistrySerializer.class);
+
+ public static final Boolean AUTO_REGISTER_SCHEMAS_DEFAULT = false;
+ public static final String SCHEMA_GROUP_DEFAULT = "$default";
+ public static final int SCHEMA_ID_SIZE = 32;
+
+ protected CachedSchemaRegistryAsyncClient schemaRegistryClient;
+
+ protected Codec serializerCodec = null;
+ private final Map deserializerCodecMap = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
+ protected String schemaType;
+ protected Boolean autoRegisterSchemas = AbstractSchemaRegistrySerializer.AUTO_REGISTER_SCHEMAS_DEFAULT;
+ protected String schemaGroup = AbstractSchemaRegistrySerializer.SCHEMA_GROUP_DEFAULT;
+
+ /**
+ * @param schemaRegistryClient registry client to be used for storing schemas. Not null.
+ */
+ public AbstractSchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryClient,
+ Codec serializerCodec, Map deserializerCodecMap) {
+ if (schemaRegistryClient == null) {
+ throw logger.logExceptionAsError(
+ new IllegalArgumentException("Schema registry client must be initialized and passed into builder."));
+ }
+ this.schemaRegistryClient = schemaRegistryClient;
+ this.serializerCodec = serializerCodec;
+ this.deserializerCodecMap.putAll(deserializerCodecMap);
+ }
+
+ /**
+ * Set Codec class to be used for serialized objects into bytes
+ *
+ * @param codec Codec instance
+ */
+ protected void setSerializerCodec(Codec codec) {
+ if (this.serializerCodec != null) {
+ throw logger.logExceptionAsError(
+ new IllegalArgumentException("Setting multiple encoders on serializer not permitted"));
+ }
+ this.serializerCodec = codec;
+ this.schemaType = codec.getSchemaType();
+ }
+
+ /**
+ * Core implementation of registry-based serialization.
+ * ID for data schema is fetched from the registry and prefixed to the encoded byte array
+ * representation of the object param.
+ *
+ * @param s Output stream destination for encoded bytes
+ * @param object object to be serialized
+ * @param Type of the output stream parameter.
+ * @return byte array containing encoded bytes with prefixed schema ID
+ * @throws SerializationException if serialization operation fails during runtime.
+ */
+ protected Mono serializeImpl(T s, Object object) {
+ if (object == null) {
+ return monoError(logger, new SerializationException(
+ "Null object, behavior should be defined in concrete serializer implementation."));
+ }
+
+ if (serializerCodec == null) {
+ return monoError(logger, new SerializationException(
+ "Byte encoder null, serializer must be initialized with a byte encoder."));
+ }
+
+ if (schemaType == null) {
+ schemaType = serializerCodec.getSchemaType();
+ }
+
+ String schemaString = serializerCodec.getSchemaString(object);
+ String schemaName = serializerCodec.getSchemaName(object);
+
+ return this.maybeRegisterSchema(this.schemaGroup, schemaName, schemaString, this.schemaType)
+ .onErrorMap(e -> {
+ if (e instanceof SchemaRegistryClientException) {
+ StringBuilder builder = new StringBuilder();
+ if (this.autoRegisterSchemas) {
+ builder.append(String.format("Error registering Avro schema. Group: %s, name: %s. ",
+ schemaGroup, schemaName));
+ } else {
+ builder.append(String.format("Error retrieving Avro schema. Group: %s, name: %s. ",
+ schemaGroup, schemaName));
+ }
+
+ if (e.getCause() instanceof HttpResponseException) {
+ HttpResponseException httpException = (HttpResponseException) e.getCause();
+ builder.append("HTTP ")
+ .append(httpException.getResponse().getStatusCode())
+ .append(" ")
+ .append(httpException.getResponse().getBodyAsString());
+ } else {
+ builder.append(e.getCause().getMessage());
+ }
+
+ return logger.logExceptionAsError(new SerializationException(builder.toString(), e));
+ } else {
+ return logger.logExceptionAsError(new SerializationException(e.getMessage(), e));
+ }
+ })
+ .handle((id, sink) -> {
+ ByteBuffer idBuffer = ByteBuffer.allocate(AbstractSchemaRegistrySerializer.SCHEMA_ID_SIZE)
+ .put(id.getBytes(StandardCharsets.UTF_8));
+ try {
+ s.write(idBuffer.array());
+ serializerCodec.encode(object).writeTo(s);
+ } catch (IOException e) {
+ sink.error(new SerializationException(e.getMessage(), e));
+ }
+ sink.next(s);
+ });
+ }
+
+
+ /**
+ * Core implementation for registry-based deserialization.
+ * Fetches schema referenced by prefixed ID and deserializes the subsequent payload into Java object.
+ *
+ * @param s InputStream containing bytes encoded by an Azure Schema Registry producer
+ * @return object, deserialized with the prefixed schema
+ * @throws SerializationException if deserialization of registry schema or message payload fails.
+ */
+ protected Mono