Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate sync stack in schema registry via AutoRest. #32576

Merged
merged 11 commits into from
Feb 22, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import com.azure.data.schemaregistry.models.SchemaFormat;
import com.azure.data.schemaregistry.models.SchemaProperties;
import com.azure.data.schemaregistry.models.SchemaRegistrySchema;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

Expand Down Expand Up @@ -152,7 +150,6 @@ public Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupN

Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, String name, String schemaDefinition,
SchemaFormat format, Context context) {

if (Objects.isNull(groupName)) {
return monoError(logger, new NullPointerException("'groupName' should not be null."));
} else if (Objects.isNull(name)) {
Expand All @@ -169,11 +166,10 @@ Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, St
final BinaryData binaryData = BinaryData.fromString(schemaDefinition);
final SchemaFormatImpl contentType = SchemaRegistryHelper.getContentType(format);

return restService.getSchemas().registerWithResponseAsync(groupName, name, contentType, binaryData,
return restService.getSchemas().registerWithResponseAsync(groupName, name, contentType.toString(), binaryData,
binaryData.getLength(), context)
.map(response -> {
final SchemaProperties registered = SchemaRegistryHelper.getSchemaProperties(response, format);

final SchemaProperties registered = SchemaRegistryHelper.getSchemaProperties(response.getDeserializedHeaders(), response.getHeaders(), format);
return new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
response.getHeaders(), registered);
Expand Down Expand Up @@ -263,21 +259,12 @@ Mono<Response<SchemaRegistrySchema>> getSchemaWithResponse(String schemaId, Cont

return this.restService.getSchemas().getByIdWithResponseAsync(schemaId, context)
.onErrorMap(ErrorException.class, SchemaRegistryAsyncClient::remapError)
.handle((response, sink) -> {
final SchemaProperties schemaObject = SchemaRegistryHelper.getSchemaProperties(response);
final String schema;

try {
schema = convertToString(response.getValue());
} catch (UncheckedIOException e) {
sink.error(e);
return;
}

sink.next(new SimpleResponse<>(
.flatMap(response -> {
final SchemaProperties schemaObject = SchemaRegistryHelper.getSchemaProperties(response.getDeserializedHeaders(), response.getHeaders());
return convertToString(response.getValue())
.map(schema -> new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
response.getHeaders(), new SchemaRegistrySchema(schemaObject, schema)));
sink.complete();
});
}

Expand All @@ -291,30 +278,19 @@ Mono<Response<SchemaRegistrySchema>> getSchemaWithResponse(String groupName, Str
return this.restService.getSchemas().getSchemaVersionWithResponseAsync(groupName, schemaName, schemaVersion,
context)
.onErrorMap(ErrorException.class, SchemaRegistryAsyncClient::remapError)
.handle((response, sink) -> {
final InputStream schemaInputStream = response.getValue();
final SchemaProperties schemaObject = SchemaRegistryHelper.getSchemaProperties(response);
final String schema;
.flatMap(response -> {
final Flux<ByteBuffer> schemaFlux = response.getValue();
final SchemaProperties schemaObject = SchemaRegistryHelper.getSchemaProperties(response.getDeserializedHeaders(), response.getHeaders());

if (schemaInputStream == null) {
sink.error(new IllegalArgumentException(String.format(
if (schemaFlux == null) {
return Mono.error(new IllegalArgumentException(String.format(
"Schema definition should not be null. Group Name: %s. Schema Name: %s. Version: %d",
groupName, schemaName, schemaVersion)));

return;
}

try {
schema = convertToString(schemaInputStream);
} catch (UncheckedIOException e) {
sink.error(e);
return;
}

sink.next(new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
response.getHeaders(), new SchemaRegistrySchema(schemaObject, schema)));
sink.complete();
return convertToString(schemaFlux)
.map(schema -> new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
response.getHeaders(), new SchemaRegistrySchema(schemaObject, schema)));
});
}

Expand Down Expand Up @@ -402,11 +378,12 @@ Mono<Response<SchemaProperties>> getSchemaPropertiesWithResponse(String groupNam
final SchemaFormatImpl contentType = SchemaRegistryHelper.getContentType(format);

return restService.getSchemas()
.queryIdByContentWithResponseAsync(groupName, name, contentType, binaryData, binaryData.getLength(),
.queryIdByContentWithResponseAsync(groupName, name, com.azure.data.schemaregistry.implementation.models.SchemaFormat.fromString(contentType.toString()),
binaryData, binaryData.getLength(),
context)
.onErrorMap(ErrorException.class, SchemaRegistryAsyncClient::remapError)
.map(response -> {
final SchemaProperties properties = SchemaRegistryHelper.getSchemaProperties(response, format);
final SchemaProperties properties = SchemaRegistryHelper.getSchemaProperties(response.getDeserializedHeaders(), response.getHeaders(), format);

return new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
Expand All @@ -421,45 +398,34 @@ Mono<Response<SchemaProperties>> getSchemaPropertiesWithResponse(String groupNam
*
* @return The remapped error.
*/
private static Throwable remapError(ErrorException error) {
static HttpResponseException remapError(ErrorException error) {
if (error.getResponse().getStatusCode() == 404) {
final String message;
if (error.getValue() != null && error.getValue().getError() != null) {
message = error.getValue().getError().getMessage();
} else {
message = error.getMessage();
}

return new ResourceNotFoundException(message, error.getResponse(), error);
}

return error;
}

/**
* Converts an input stream into its string representation.
* Converts a Flux of Byte Buffer into its string representation.
*
* @param inputStream Input stream.
* @param byteBufferFlux the Byte Buffer Flux input.
*
* @return A string representation.
*
* @throws UncheckedIOException if an {@link IOException} is thrown when creating the readers.
*/
private static String convertToString(InputStream inputStream) {
static Mono<String> convertToString(Flux<ByteBuffer> byteBufferFlux) {
g2vinay marked this conversation as resolved.
Show resolved Hide resolved
final StringBuilder builder = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {

String str;

while ((str = reader.readLine()) != null) {
builder.append(str);
}

} catch (IOException exception) {
throw new UncheckedIOException("Error occurred while deserializing schemaContent.", exception);
}

return builder.toString();
return byteBufferFlux
.map(byteBuffer -> {
builder.append(new String(byteBuffer.array(), StandardCharsets.UTF_8));
return Mono.empty();
}).then(Mono.defer(() -> Mono.just(builder.toString())));
g2vinay marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading