Skip to content

Commit

Permalink
Add getSchema using group name, schema name, and version (#31167)
Browse files Browse the repository at this point in the history
* Regenerating swagger classes.

* Update README.md version.

* Regeneration added empty spaces.

* Add GetSchemaVersion after generation.

* Adding --add-exports and add-opens for tests.

* Adding public surface

* Referencing unreleased azure-core

* Deserializing response for schema call.

* Adding test recording for async.

* Adding sync test case.

* Adding CHANGELOG entry.

* Fixing return documentation.
  • Loading branch information
conniey authored Sep 29, 2022
1 parent 74617c7 commit 12ace8e
Show file tree
Hide file tree
Showing 26 changed files with 1,019 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.32.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.33.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
8 changes: 8 additions & 0 deletions sdk/schemaregistry/azure-data-schemaregistry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@

### Features Added

- Added `getVersion` to `SchemaProperties`.
- Added the following methods in `SchemaRegistryAsyncClient`:
- `Mono<SchemaRegistrySchema> getSchema(String groupName, String schemaName, int schemaVersion)`
- `Mono<Response<SchemaRegistrySchema>> getSchemaWithResponse(String groupName, String schemaName, int schemaVersion)`
- Added the following methods in `SchemaRegistryClient`:
- `SchemaRegistrySchema getSchema(String groupName, String schemaName, int schemaVersion)`
- `Response<SchemaRegistrySchema> getSchemaWithResponse(String groupName, String schemaName, int schemaVersion, Context context)`

### Breaking Changes

### Bugs Fixed
Expand Down
5 changes: 4 additions & 1 deletion sdk/schemaregistry/azure-data-schemaregistry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
<properties>
<!-- Configures the Java 9+ run to perform the required module exports, opens, and reads that are necessary for testing but shouldn't be part of the module-info. -->
<javaModulesSurefireArgLine>
--add-opens com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED
--add-opens com.azure.data.schemaregistry/com.azure.data.schemaregistry=ALL-UNNAMED

--add-exports com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED
--add-exports com.azure.data.schemaregistry/com.azure.data.schemaregistry.implementation.models=ALL-UNNAMED
</javaModulesSurefireArgLine>
<checkstyle.excludes>**/implementation/**/*.java</checkstyle.excludes>
Expand All @@ -49,7 +52,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.32.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.33.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,23 @@
import com.azure.core.exception.ResourceNotFoundException;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.implementation.AzureSchemaRegistryImpl;
import com.azure.data.schemaregistry.implementation.SchemaRegistryHelper;
import com.azure.data.schemaregistry.implementation.models.ErrorException;
import com.azure.data.schemaregistry.implementation.models.SchemasQueryIdByContentHeaders;
import com.azure.data.schemaregistry.models.SchemaFormat;
import com.azure.data.schemaregistry.models.SchemaProperties;
import com.azure.data.schemaregistry.models.SchemaRegistrySchema;
import reactor.core.publisher.Mono;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

Expand Down Expand Up @@ -160,9 +165,10 @@ Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, St
logger.verbose("Registering schema. Group: '{}', name: '{}', serialization type: '{}', payload: '{}'",
groupName, name, format, schemaDefinition);

final String contentType = getContentType(format);
final BinaryData binaryData = BinaryData.fromString(schemaDefinition);

return restService.getSchemas().registerWithResponseAsync(groupName, name, schemaDefinition, contentType, context)
return restService.getSchemas().registerWithResponseAsync(groupName, name, binaryData, binaryData.getLength(),
context)
.map(response -> {
final SchemaProperties registered = SchemaRegistryHelper.getSchemaProperties(response);

Expand All @@ -177,47 +183,136 @@ Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, St
*
* @param schemaId The unique identifier of the schema.
*
* @return The {@link SchemaProperties} associated with the given {@code schemaId}.
* @return The {@link SchemaRegistrySchema} associated with the given {@code schemaId}.
*
* @throws NullPointerException if {@code schemaId} is null.
* @throws ResourceNotFoundException if a schema with the matching {@code schemaId} could not be found.
* @throws HttpResponseException if an issue was encountered while fetching the schema.
* @throws UncheckedIOException if an error occurred while deserializing response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<SchemaRegistrySchema> getSchema(String schemaId) {
return getSchemaWithResponse(schemaId).map(Response::getValue);
}

/**
* Gets the schema properties of the schema associated with the group name, schema name, and schema version.
*
* @param groupName Group name for the schema
* @param schemaName Name of the schema
* @param schemaVersion Version of schema
*
* @return The {@link SchemaRegistrySchema} matching the parameters.
*
* @throws NullPointerException if {@code groupName} or {@code schemaName} is null.
* @throws ResourceNotFoundException if a schema with the matching {@code groupName} or {@code schemaName} could
* not be found.
* @throws HttpResponseException if an issue was encountered while fetching the schema.
* @throws UncheckedIOException if an error occurred while deserializing response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<SchemaRegistrySchema> getSchema(String groupName, String schemaName, int schemaVersion) {
return getSchemaWithResponse(groupName, schemaName, schemaVersion).map(Response::getValue);
}

/**
* Gets the schema properties of the schema associated with the unique schema id.
*
* @param schemaId The unique identifier of the schema.
*
* @return The {@link SchemaProperties} associated with the given {@code schemaId} along with the HTTP response.
* @return The {@link SchemaRegistrySchema} associated with the given {@code schemaId} along with the HTTP response.
*
* @throws NullPointerException if {@code schemaId} is null.
* @throws ResourceNotFoundException if a schema with the matching {@code schemaId} could not be found.
* @throws HttpResponseException if an issue was encountered while fetching the schema.
* @throws UncheckedIOException if an error occurred while deserializing response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<SchemaRegistrySchema>> getSchemaWithResponse(String schemaId) {
return FluxUtil.withContext(context -> getSchemaWithResponse(schemaId, context));
}

/**
* Gets the schema properties of the schema associated with the group name, schema name, and schema version.
*
* @param groupName Group name for the schema
* @param schemaName Name of the schema
* @param schemaVersion Version of schema
*
* @return The {@link SchemaRegistrySchema} matching the parameters.
*
* @throws NullPointerException if {@code groupName} or {@code schemaName} is null.
* @throws ResourceNotFoundException if a schema with the matching {@code groupName} or {@code schemaName} could
* not be found.
* @throws HttpResponseException if an issue was encountered while fetching the schema.
* @throws UncheckedIOException if an error occurred while deserializing response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<SchemaRegistrySchema>> getSchemaWithResponse(String groupName, String schemaName,
int schemaVersion) {

return FluxUtil.withContext(context -> getSchemaWithResponse(groupName, schemaName, schemaVersion, context));
}

Mono<Response<SchemaRegistrySchema>> getSchemaWithResponse(String schemaId, Context context) {
if (Objects.isNull(schemaId)) {
return monoError(logger, new NullPointerException("'schemaId' should not be null."));
}

return this.restService.getSchemas().getByIdWithResponseAsync(schemaId, context)
.onErrorMap(ErrorException.class, SchemaRegistryAsyncClient::remapError)
.map(response -> {
.handle((response, sink) -> {
final SchemaProperties schemaObject = SchemaRegistryHelper.getSchemaProperties(response);
final String schema = new String(response.getValue(), StandardCharsets.UTF_8);
final String schema;

return new SimpleResponse<>(
try {
schema = convertToString(response.getValue());
} catch (UncheckedIOException e) {
sink.error(e);
return;
}

sink.next(new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
response.getHeaders(), new SchemaRegistrySchema(schemaObject, schema)));
sink.complete();
});
}

Mono<Response<SchemaRegistrySchema>> getSchemaWithResponse(String groupName, String schemaName, int schemaVersion,
Context context) {

if (Objects.isNull(groupName)) {
return monoError(logger, new NullPointerException("'groupName' should not be null."));
}

return this.restService.getSchemas().getSchemaVersionWithResponseAsync(groupName, schemaName, schemaVersion,
context)
.onErrorMap(ErrorException.class, SchemaRegistryAsyncClient::remapError)
.handle((response, sink) -> {
final InputStream schemaInputStream = response.getValue();
final SchemaProperties schemaObject = SchemaRegistryHelper.getSchemaProperties(response);
final String schema;

if (schemaInputStream == null) {
sink.error(new IllegalArgumentException(String.format(
"Schema definition should not be null. Group Name: %s. Schema Name: %s. Version: %d",
groupName, schemaName, schemaVersion)));

return;
}

try {
schema = convertToString(schemaInputStream);
} catch (UncheckedIOException e) {
sink.error(e);
return;
}

sink.next(new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
response.getHeaders(), new SchemaRegistrySchema(schemaObject, schema));
response.getHeaders(), new SchemaRegistrySchema(schemaObject, schema)));
sink.complete();
});
}

Expand All @@ -232,8 +327,8 @@ Mono<Response<SchemaRegistrySchema>> getSchemaWithResponse(String schemaId, Cont
*
* @return A mono that completes with the properties for a matching schema.
*
* @throws NullPointerException if {@code groupName}, {@code name}, {@code schemaDefinition}, or {@code format} is
* null.
* @throws NullPointerException if {@code groupName}, {@code name}, {@code schemaDefinition}, or {@code format}
* is null.
* @throws ResourceNotFoundException if a schema with matching parameters could not be located.
* @throws HttpResponseException if an issue was encountered while finding a matching schema.
*/
Expand All @@ -255,8 +350,8 @@ public Mono<SchemaProperties> getSchemaProperties(String groupName, String name,
*
* @return A mono that completes with the properties for a matching schema.
*
* @throws NullPointerException if {@code groupName}, {@code name}, {@code schemaDefinition}, or {@code format} is
* null.
* @throws NullPointerException if {@code groupName}, {@code name}, {@code schemaDefinition}, or {@code format}
* is null.
* @throws ResourceNotFoundException if a schema with matching parameters could not be located.
* @throws HttpResponseException if an issue was encountered while finding a matching schema.
*/
Expand All @@ -279,8 +374,8 @@ public Mono<Response<SchemaProperties>> getSchemaPropertiesWithResponse(String g
*
* @return A mono that completes with the properties for a matching schema.
*
* @throws NullPointerException if {@code groupName}, {@code name}, {@code schemaDefinition}, or {@code format} is
* null.
* @throws NullPointerException if {@code groupName}, {@code name}, {@code schemaDefinition}, or {@code format}
* is null.
* @throws ResourceNotFoundException if a schema with matching parameters could not be located.
* @throws HttpResponseException if an issue was encountered while finding a matching schema.
*/
Expand All @@ -301,13 +396,12 @@ Mono<Response<SchemaProperties>> getSchemaPropertiesWithResponse(String groupNam
context = Context.NONE;
}

final String contentType = getContentType(format);
final BinaryData binaryData = BinaryData.fromString(schemaDefinition);

return restService.getSchemas()
.queryIdByContentWithResponseAsync(groupName, name, schemaDefinition, contentType, context)
.queryIdByContentWithResponseAsync(groupName, name, binaryData, binaryData.getLength(), context)
.onErrorMap(ErrorException.class, SchemaRegistryAsyncClient::remapError)
.map(response -> {
final SchemasQueryIdByContentHeaders deserializedHeaders = response.getDeserializedHeaders();
final SchemaProperties properties = SchemaRegistryHelper.getSchemaProperties(response);

return new SimpleResponse<>(
Expand Down Expand Up @@ -338,7 +432,30 @@ private static Throwable remapError(ErrorException error) {
return error;
}

private static String getContentType(SchemaFormat schemaFormat) {
return "application/json; serialization=" + schemaFormat;
/**
* Converts an input stream into its string representation.
*
* @param inputStream Input stream.
*
* @return A string representation.
*
* @throws UncheckedIOException if an {@link IOException} is thrown when creating the readers.
*/
private static String convertToString(InputStream inputStream) {
final StringBuilder builder = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {

String str;

while ((str = reader.readLine()) != null) {
builder.append(str);
}

} catch (IOException exception) {
throw new UncheckedIOException("Error occurred while deserializing schemaContent.", exception);
}

return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import com.azure.data.schemaregistry.models.SchemaProperties;
import com.azure.data.schemaregistry.models.SchemaRegistrySchema;

import java.io.UncheckedIOException;

/**
* HTTP-based client that interacts with Azure Schema Registry service to store and retrieve schemas on demand.
*
Expand Down Expand Up @@ -129,34 +131,78 @@ public Response<SchemaProperties> registerSchemaWithResponse(String groupName, S
*
* @param schemaId The unique identifier of the schema.
*
* @return The {@link SchemaProperties} associated with the given {@code schemaId}.
* @return The {@link SchemaRegistrySchema} associated with the given {@code schemaId}.
*
* @throws NullPointerException if {@code schemaId} is null.
* @throws ResourceNotFoundException if a schema with the matching {@code schemaId} could not be found.
* @throws HttpResponseException if an issue was encountered while fetching the schema.
* @throws UncheckedIOException if an error occurred while deserializing response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public SchemaRegistrySchema getSchema(String schemaId) {
return this.asyncClient.getSchema(schemaId).block();
}

/**
* Gets the schema properties of the schema associated with the group name, schema name, and schema version.
*
* @param groupName Group name for the schema
* @param schemaName Name of the schema
* @param schemaVersion Version of schema
*
* @return The {@link SchemaRegistrySchema} matching the parameters.
*
* @throws NullPointerException if {@code groupName} or {@code schemaName} is null.
* @throws ResourceNotFoundException if a schema with the matching {@code groupName} or {@code schemaName} could
* not be found.
* @throws HttpResponseException if an issue was encountered while fetching the schema.
* @throws UncheckedIOException if an error occurred while deserializing response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public SchemaRegistrySchema getSchema(String groupName, String schemaName, int schemaVersion) {
return this.asyncClient.getSchema(groupName, schemaName, schemaVersion).block();
}

/**
* Gets the schema properties of the schema associated with the unique schema id.
*
* @param schemaId The unique identifier of the schema.
* @param context The context to pass to the Http pipeline.
*
* @return The {@link SchemaProperties} associated with the given {@code schemaId} and its HTTP response.
* @return The {@link SchemaRegistrySchema} associated with the given {@code schemaId} and its HTTP response.
*
* @throws NullPointerException if {@code schemaId} is null.
* @throws ResourceNotFoundException if a schema with the matching {@code schemaId} could not be found.
* @throws HttpResponseException if an issue was encountered while fetching the schema.
* @throws UncheckedIOException if an error occurred while deserializing response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SchemaRegistrySchema> getSchemaWithResponse(String schemaId, Context context) {
return this.asyncClient.getSchemaWithResponse(schemaId, context).block();
}

/**
* Gets the schema properties of the schema associated with the group name, schema name, and schema version.
*
* @param groupName Group name for the schema
* @param schemaName Name of the schema
* @param schemaVersion Version of schema
* @param context The context to pass to the Http pipeline.
*
* @return The {@link SchemaRegistrySchema} matching the parameters.
*
* @throws NullPointerException if {@code groupName} or {@code schemaName} is null.
* @throws ResourceNotFoundException if a schema with the matching {@code groupName} or {@code schemaName} could
* not be found.
* @throws HttpResponseException if an issue was encountered while fetching the schema.
* @throws UncheckedIOException if an error occurred while deserializing response.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SchemaRegistrySchema> getSchemaWithResponse(String groupName, String schemaName,
int schemaVersion, Context context) {
return this.asyncClient.getSchemaWithResponse(groupName, schemaName, schemaVersion, context).block();
}

/**
* Gets schema properties for a schema with matching {@code groupName}, {@code name}, {@code schemaDefinition}, and
* {@code format}.
Expand Down
Loading

0 comments on commit 12ace8e

Please sign in to comment.