Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate sync stack in schema registry via AutoRest. #32576

Merged
merged 11 commits into from
Feb 22, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, St
return restService.getSchemas().registerWithResponseAsync(groupName, name, binaryData, binaryData.getLength(),
context)
.map(response -> {
final SchemaProperties registered = SchemaRegistryHelper.getSchemaProperties(response);
final SchemaProperties registered = SchemaRegistryHelper.getSchemaPropertiesFromSchemaRegisterHeaders(response);

return new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
Expand Down Expand Up @@ -262,11 +262,11 @@ Mono<Response<SchemaRegistrySchema>> getSchemaWithResponse(String schemaId, Cont
return this.restService.getSchemas().getByIdWithResponseAsync(schemaId, context)
.onErrorMap(ErrorException.class, SchemaRegistryAsyncClient::remapError)
.handle((response, sink) -> {
final SchemaProperties schemaObject = SchemaRegistryHelper.getSchemaProperties(response);
final SchemaProperties schemaObject = SchemaRegistryHelper.getSchemaPropertiesFromSchemasGetByIdHeaders(response);
final String schema;

try {
schema = convertToString(response.getValue());
schema = convertToString(response.getValue().toStream());
} catch (UncheckedIOException e) {
sink.error(e);
return;
Expand All @@ -290,8 +290,8 @@ Mono<Response<SchemaRegistrySchema>> getSchemaWithResponse(String groupName, Str
context)
.onErrorMap(ErrorException.class, SchemaRegistryAsyncClient::remapError)
.handle((response, sink) -> {
final InputStream schemaInputStream = response.getValue();
final SchemaProperties schemaObject = SchemaRegistryHelper.getSchemaProperties(response);
final InputStream schemaInputStream = response.getValue().toStream();
final SchemaProperties schemaObject = SchemaRegistryHelper.getSchemaPropertiesFromSchemasGetSchemaVersionHeaders(response);
final String schema;

if (schemaInputStream == null) {
Expand Down Expand Up @@ -402,7 +402,7 @@ Mono<Response<SchemaProperties>> getSchemaPropertiesWithResponse(String groupNam
.queryIdByContentWithResponseAsync(groupName, name, binaryData, binaryData.getLength(), context)
.onErrorMap(ErrorException.class, SchemaRegistryAsyncClient::remapError)
.map(response -> {
final SchemaProperties properties = SchemaRegistryHelper.getSchemaProperties(response);
final SchemaProperties properties = SchemaRegistryHelper.getSchemaPropertiesFromSchemasQueryIdByContentHeaders(response);

return new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
Expand All @@ -417,15 +417,14 @@ Mono<Response<SchemaProperties>> getSchemaPropertiesWithResponse(String groupNam
*
* @return The remapped error.
*/
private static Throwable remapError(ErrorException error) {
static HttpResponseException remapError(ErrorException error) {
if (error.getResponse().getStatusCode() == 404) {
final String message;
if (error.getValue() != null && error.getValue().getError() != null) {
message = error.getValue().getError().getMessage();
} else {
message = error.getMessage();
}

return new ResourceNotFoundException(message, error.getResponse(), error);
}

Expand All @@ -441,7 +440,7 @@ private static Throwable remapError(ErrorException error) {
*
* @throws UncheckedIOException if an {@link IOException} is thrown when creating the readers.
*/
private static String convertToString(InputStream inputStream) {
static String convertToString(InputStream inputStream) {
final StringBuilder builder = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,27 @@
import com.azure.core.exception.HttpResponseException;
import com.azure.core.exception.ResourceNotFoundException;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.ResponseBase;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.implementation.AzureSchemaRegistryImpl;
import com.azure.data.schemaregistry.implementation.SchemaRegistryHelper;
import com.azure.data.schemaregistry.implementation.models.ErrorException;
import com.azure.data.schemaregistry.implementation.models.SchemasGetByIdHeaders;
import com.azure.data.schemaregistry.implementation.models.SchemasGetSchemaVersionHeaders;
import com.azure.data.schemaregistry.implementation.models.SchemasRegisterHeaders;
import com.azure.data.schemaregistry.implementation.models.SchemasQueryIdByContentHeaders;
import com.azure.data.schemaregistry.models.SchemaFormat;
import com.azure.data.schemaregistry.models.SchemaProperties;
import com.azure.data.schemaregistry.models.SchemaRegistrySchema;

import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Objects;

import static com.azure.data.schemaregistry.SchemaRegistryAsyncClient.convertToString;

/**
* HTTP-based client that interacts with Azure Schema Registry service to store and retrieve schemas on demand.
Expand Down Expand Up @@ -57,10 +72,16 @@
*/
@ServiceClient(builder = SchemaRegistryClientBuilder.class)
public final class SchemaRegistryClient {
private final SchemaRegistryAsyncClient asyncClient;
private static final String HTTP_REST_PROXY_SYNC_PROXY_ENABLE = "com.azure.core.http.restproxy.syncproxy.enable";
private final ClientLogger logger = new ClientLogger(SchemaRegistryClient.class);
private final AzureSchemaRegistryImpl restService;


SchemaRegistryClient(SchemaRegistryAsyncClient asyncClient) {
this.asyncClient = asyncClient;
SchemaRegistryClient(AzureSchemaRegistryImpl restService) {
this.restService = restService;

// So the accessor is initialised because there were NullPointerExceptions before.
new SchemaProperties("", SchemaFormat.AVRO);
}

/**
Expand All @@ -69,7 +90,7 @@ public final class SchemaRegistryClient {
* @return The fully qualified namespace of the Schema Registry instance.
*/
public String getFullyQualifiedNamespace() {
return asyncClient.getFullyQualifiedNamespace();
return this.restService.getEndpoint();
}

/**
Expand All @@ -95,7 +116,7 @@ public String getFullyQualifiedNamespace() {
@ServiceMethod(returns = ReturnType.SINGLE)
public SchemaProperties registerSchema(String groupName, String name, String schemaDefinition,
SchemaFormat format) {
return this.asyncClient.registerSchema(groupName, name, schemaDefinition, format).block();
return registerSchemaWithResponse(groupName, name, schemaDefinition, format, Context.NONE).getValue();
}

/**
Expand All @@ -122,8 +143,27 @@ public SchemaProperties registerSchema(String groupName, String name, String sch
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SchemaProperties> registerSchemaWithResponse(String groupName, String name, String schemaDefinition,
SchemaFormat format, Context context) {
return this.asyncClient.registerSchemaWithResponse(groupName, name, schemaDefinition, format,
context).block();
if (Objects.isNull(groupName)) {
throw logger.logExceptionAsError(new NullPointerException("'groupName' should not be null."));
} else if (Objects.isNull(name)) {
throw logger.logExceptionAsError(new NullPointerException("'name' should not be null."));
} else if (Objects.isNull(schemaDefinition)) {
throw logger.logExceptionAsError(new NullPointerException("'schemaDefinition' should not be null."));
} else if (Objects.isNull(format)) {
throw logger.logExceptionAsError(new NullPointerException("'format' should not be null."));
}

logger.verbose("Registering schema. Group: '{}', name: '{}', serialization type: '{}', payload: '{}'",
groupName, name, format, schemaDefinition);

context = enableSyncRestProxy(context);
final BinaryData binaryData = BinaryData.fromString(schemaDefinition);

ResponseBase<SchemasRegisterHeaders, Void> response = restService.getSchemas().registerWithResponse(groupName, name, binaryData, binaryData.getLength(), context);
final SchemaProperties registered = SchemaRegistryHelper.getSchemaPropertiesFromSchemaRegisterHeaders(response);
return new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
response.getHeaders(), registered);
}

/**
Expand All @@ -140,7 +180,7 @@ public Response<SchemaProperties> registerSchemaWithResponse(String groupName, S
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public SchemaRegistrySchema getSchema(String schemaId) {
return this.asyncClient.getSchema(schemaId).block();
return getSchemaWithResponse(schemaId, Context.NONE).getValue();
}

/**
Expand All @@ -160,7 +200,7 @@ public SchemaRegistrySchema getSchema(String schemaId) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public SchemaRegistrySchema getSchema(String groupName, String schemaName, int schemaVersion) {
return this.asyncClient.getSchema(groupName, schemaName, schemaVersion).block();
return getSchemaWithResponse(groupName, schemaName, schemaVersion, Context.NONE).getValue();
}

/**
Expand All @@ -178,7 +218,19 @@ public SchemaRegistrySchema getSchema(String groupName, String schemaName, int s
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SchemaRegistrySchema> getSchemaWithResponse(String schemaId, Context context) {
return this.asyncClient.getSchemaWithResponse(schemaId, context).block();
if (Objects.isNull(schemaId)) {
throw logger.logExceptionAsError(new NullPointerException("'schemaId' should not be null."));
}
context = enableSyncRestProxy(context);
try {
ResponseBase<SchemasGetByIdHeaders, BinaryData> response = this.restService.getSchemas().getByIdWithResponse(schemaId, context);
final SchemaProperties schemaObject = SchemaRegistryHelper.getSchemaPropertiesFromSchemasGetByIdHeaders(response);
final String schema = convertToString(response.getValue().toStream());
return new SimpleResponse<>(response.getRequest(), response.getStatusCode(),
response.getHeaders(), new SchemaRegistrySchema(schemaObject, schema));
} catch (ErrorException ex) {
throw logger.logExceptionAsError(SchemaRegistryAsyncClient.remapError(ex));
}
}

/**
Expand All @@ -200,7 +252,26 @@ public Response<SchemaRegistrySchema> getSchemaWithResponse(String schemaId, Con
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SchemaRegistrySchema> getSchemaWithResponse(String groupName, String schemaName,
int schemaVersion, Context context) {
return this.asyncClient.getSchemaWithResponse(groupName, schemaName, schemaVersion, context).block();
if (Objects.isNull(groupName)) {
throw logger.logExceptionAsError(new NullPointerException("'groupName' should not be null."));
}
context = enableSyncRestProxy(context);

ResponseBase<SchemasGetSchemaVersionHeaders, BinaryData> response = this.restService.getSchemas().getSchemaVersionWithResponse(groupName, schemaName, schemaVersion,
context);
final InputStream schemaInputStream = response.getValue().toStream();
final SchemaProperties schemaObject = SchemaRegistryHelper.getSchemaPropertiesFromSchemasGetSchemaVersionHeaders(response);
final String schema;

if (schemaInputStream == null) {
throw logger.logExceptionAsError(new IllegalArgumentException(String.format(
"Schema definition should not be null. Group Name: %s. Schema Name: %s. Version: %d",
groupName, schemaName, schemaVersion)));
}
schema = convertToString(schemaInputStream);
return new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
response.getHeaders(), new SchemaRegistrySchema(schemaObject, schema));
}

/**
Expand All @@ -222,7 +293,7 @@ public Response<SchemaRegistrySchema> getSchemaWithResponse(String groupName, St
@ServiceMethod(returns = ReturnType.SINGLE)
public SchemaProperties getSchemaProperties(String groupName, String name, String schemaDefinition,
SchemaFormat format) {
return this.asyncClient.getSchemaProperties(groupName, name, schemaDefinition, format).block();
return getSchemaPropertiesWithResponse(groupName, name, schemaDefinition, format, Context.NONE).getValue();
}

/**
Expand All @@ -245,7 +316,36 @@ public SchemaProperties getSchemaProperties(String groupName, String name, Strin
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SchemaProperties> getSchemaPropertiesWithResponse(String groupName, String name,
String schemaDefinition, SchemaFormat format, Context context) {
return this.asyncClient.getSchemaPropertiesWithResponse(groupName, name, schemaDefinition, format, context)
.block();
if (Objects.isNull(groupName)) {
throw logger.logExceptionAsError(new NullPointerException("'groupName' cannot be null."));
} else if (Objects.isNull(name)) {
throw logger.logExceptionAsError(new NullPointerException("'name' cannot be null."));
} else if (Objects.isNull(schemaDefinition)) {
throw logger.logExceptionAsError(new NullPointerException("'schemaDefinition' cannot be null."));
} else if (Objects.isNull(format)) {
throw logger.logExceptionAsError(new NullPointerException("'format' cannot be null."));
}

if (context == null) {
context = Context.NONE;
}
context = enableSyncRestProxy(context);

final BinaryData binaryData = BinaryData.fromString(schemaDefinition);

try {
ResponseBase<SchemasQueryIdByContentHeaders, Void> response = restService.getSchemas()
.queryIdByContentWithResponse(groupName, name, binaryData, binaryData.getLength(), context);
final SchemaProperties properties = SchemaRegistryHelper.getSchemaPropertiesFromSchemasQueryIdByContentHeaders(response);
return new SimpleResponse<>(
response.getRequest(), response.getStatusCode(),
response.getHeaders(), properties);
} catch (ErrorException ex) {
throw logger.logExceptionAsError(SchemaRegistryAsyncClient.remapError(ex));
}
}

private Context enableSyncRestProxy(Context context) {
return context.addData(HTTP_REST_PROXY_SYNC_PROXY_ENABLE, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ public SchemaRegistryClientBuilder addPolicy(HttpPipelinePolicy policy) {
* and {@link #retryPolicy(RetryPolicy)} have been set.
*/
public SchemaRegistryAsyncClient buildAsyncClient() {
AzureSchemaRegistryImpl restService = getAzureSchemaREgistryImplService();

return new SchemaRegistryAsyncClient(restService);
}

private AzureSchemaRegistryImpl getAzureSchemaREgistryImplService() {
Objects.requireNonNull(credential,
"'credential' cannot be null and must be set via builder.credential(TokenCredential)");
Objects.requireNonNull(fullyQualifiedNamespace,
Expand Down Expand Up @@ -435,8 +441,7 @@ public SchemaRegistryAsyncClient buildAsyncClient() {
.apiVersion(version.getVersion())
.pipeline(buildPipeline)
.buildClient();

return new SchemaRegistryAsyncClient(restService);
return restService;
}

/**
Expand All @@ -449,6 +454,7 @@ public SchemaRegistryAsyncClient buildAsyncClient() {
* and {@link #retryPolicy(RetryPolicy)} have been set.
*/
public SchemaRegistryClient buildClient() {
return new SchemaRegistryClient(this.buildAsyncClient());
AzureSchemaRegistryImpl restService = getAzureSchemaREgistryImplService();
return new SchemaRegistryClient(restService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/** A builder for creating a new instance of the AzureSchemaRegistry type. */
Expand All @@ -48,7 +49,7 @@ public final class AzureSchemaRegistryImplBuilder

@Generated private static final String SDK_VERSION = "version";

@Generated private final Map<String, String> properties = new HashMap<>();
@Generated private static final Map<String, String> PROPERTIES = new HashMap<>();

@Generated private final List<HttpPipelinePolicy> pipelinePolicies;

Expand Down Expand Up @@ -127,6 +128,7 @@ public AzureSchemaRegistryImplBuilder retryOptions(RetryOptions retryOptions) {
@Generated
@Override
public AzureSchemaRegistryImplBuilder addPolicy(HttpPipelinePolicy customPolicy) {
Objects.requireNonNull(customPolicy, "'customPolicy' cannot be null.");
pipelinePolicies.add(customPolicy);
return this;
}
Expand Down Expand Up @@ -228,21 +230,17 @@ public AzureSchemaRegistryImpl buildClient() {
private HttpPipeline createHttpPipeline() {
Configuration buildConfiguration =
(configuration == null) ? Configuration.getGlobalConfiguration() : configuration;
if (httpLogOptions == null) {
httpLogOptions = new HttpLogOptions();
}
if (clientOptions == null) {
clientOptions = new ClientOptions();
}
HttpLogOptions localHttpLogOptions = this.httpLogOptions == null ? new HttpLogOptions() : this.httpLogOptions;
ClientOptions localClientOptions = this.clientOptions == null ? new ClientOptions() : this.clientOptions;
List<HttpPipelinePolicy> policies = new ArrayList<>();
String clientName = properties.getOrDefault(SDK_NAME, "UnknownName");
String clientVersion = properties.getOrDefault(SDK_VERSION, "UnknownVersion");
String applicationId = CoreUtils.getApplicationId(clientOptions, httpLogOptions);
String clientName = PROPERTIES.getOrDefault(SDK_NAME, "UnknownName");
String clientVersion = PROPERTIES.getOrDefault(SDK_VERSION, "UnknownVersion");
String applicationId = CoreUtils.getApplicationId(localClientOptions, localHttpLogOptions);
policies.add(new UserAgentPolicy(applicationId, clientName, clientVersion, buildConfiguration));
policies.add(new RequestIdPolicy());
policies.add(new AddHeadersFromContextPolicy());
HttpHeaders headers = new HttpHeaders();
clientOptions.getHeaders().forEach(header -> headers.set(header.getName(), header.getValue()));
localClientOptions.getHeaders().forEach(header -> headers.set(header.getName(), header.getValue()));
if (headers.getSize() > 0) {
policies.add(new AddHeadersPolicy(headers));
}
Expand All @@ -264,7 +262,7 @@ private HttpPipeline createHttpPipeline() {
new HttpPipelineBuilder()
.policies(policies.toArray(new HttpPipelinePolicy[0]))
.httpClient(httpClient)
.clientOptions(clientOptions)
.clientOptions(localClientOptions)
.build();
return httpPipeline;
}
Expand Down
Loading