From c04596fd56934f76991ef30140c15bed6b882e13 Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 27 Sep 2021 12:25:42 -0700 Subject: [PATCH 1/4] Remove builder caching references. --- .../SchemaRegistryClientBuilder.java | 34 +------------------ 1 file changed, 1 insertion(+), 33 deletions(-) diff --git a/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryClientBuilder.java b/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryClientBuilder.java index 24bd02e72cbaf..714df6b96a8f1 100644 --- a/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryClientBuilder.java +++ b/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryClientBuilder.java @@ -37,8 +37,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.function.Function; /** * Fluent builder for interacting with the Schema Registry service via {@link SchemaRegistryAsyncClient} and @@ -56,9 +54,6 @@ */ @ServiceClientBuilder(serviceClients = SchemaRegistryAsyncClient.class) public class SchemaRegistryClientBuilder { - static final int MAX_SCHEMA_MAP_SIZE_DEFAULT = 1000; - static final int MAX_SCHEMA_MAP_SIZE_MINIMUM = 10; - private final ClientLogger logger = new ClientLogger(SchemaRegistryClientBuilder.class); private static final String DEFAULT_SCOPE = "https://eventhubs.azure.net/.default"; @@ -69,8 +64,6 @@ public class SchemaRegistryClientBuilder { private static final AddHeadersPolicy API_HEADER_POLICY = new AddHeadersPolicy(new HttpHeaders() .set("api-version", "2020-09-01-preview")); - private final ConcurrentSkipListMap> typeParserMap; - private final List perCallPolicies = new ArrayList<>(); private final List perRetryPolicies = new ArrayList<>(); @@ -80,7 +73,6 @@ public class SchemaRegistryClientBuilder { private String endpoint; private String host; private HttpClient httpClient; - private Integer maxSchemaMapSize; private TokenCredential credential; private ClientOptions clientOptions; private HttpLogOptions httpLogOptions; @@ -93,8 +85,6 @@ public class SchemaRegistryClientBuilder { */ public SchemaRegistryClientBuilder() { this.httpLogOptions = new HttpLogOptions(); - this.maxSchemaMapSize = null; - this.typeParserMap = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER); this.httpClient = null; this.credential = null; this.retryPolicy = new RetryPolicy("retry-after-ms", ChronoUnit.MILLIS); @@ -132,24 +122,6 @@ public SchemaRegistryClientBuilder endpoint(String endpoint) { return this; } - /** - * Sets schema cache size limit. If limit is exceeded on any cache, all caches are recycled. - * - * @param maxCacheSize max size for internal schema caches in {@link SchemaRegistryAsyncClient} - * @return The updated {@link SchemaRegistryClientBuilder} object. - * @throws IllegalArgumentException on invalid maxCacheSize value - */ - SchemaRegistryClientBuilder maxCacheSize(int maxCacheSize) { - if (maxCacheSize < MAX_SCHEMA_MAP_SIZE_MINIMUM) { - throw logger.logExceptionAsError(new IllegalArgumentException( - String.format("Schema map size must be greater than %s entries", - MAX_SCHEMA_MAP_SIZE_MINIMUM))); - } - - this.maxSchemaMapSize = maxCacheSize; - return this; - } - /** * Sets the HTTP client to use for sending and receiving requests to and from the service. * @@ -333,11 +305,7 @@ public SchemaRegistryAsyncClient buildAsyncClient() { .pipeline(buildPipeline) .buildClient(); - int buildMaxSchemaMapSize = (maxSchemaMapSize == null) - ? MAX_SCHEMA_MAP_SIZE_DEFAULT - : maxSchemaMapSize; - - return new SchemaRegistryAsyncClient(restService, buildMaxSchemaMapSize, typeParserMap); + return new SchemaRegistryAsyncClient(restService); } /** From 5da18c9d39dc7716f084c2ac44ab6be57a017de2 Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 27 Sep 2021 12:32:36 -0700 Subject: [PATCH 2/4] Remove caching from SchemaRegistryAsyncClient. Make methods public for Response. --- .../SchemaRegistryAsyncClient.java | 84 ++++++------------- 1 file changed, 24 insertions(+), 60 deletions(-) diff --git a/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryAsyncClient.java b/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryAsyncClient.java index 7c2225f859356..300134b8d7ae2 100644 --- a/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryAsyncClient.java +++ b/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryAsyncClient.java @@ -20,11 +20,7 @@ import java.net.URI; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -53,20 +49,9 @@ public final class SchemaRegistryAsyncClient { private static final Pattern SCHEMA_PATTERN = Pattern.compile("/\\$schemagroups/(?.+)/schemas/(?.+?)/"); private final ClientLogger logger = new ClientLogger(SchemaRegistryAsyncClient.class); private final AzureSchemaRegistry restService; - private final Integer maxSchemaMapSize; - private final ConcurrentSkipListMap> typeParserMap; - private final Map idCache; - private final Map schemaStringCache; - SchemaRegistryAsyncClient( - AzureSchemaRegistry restService, - int maxSchemaMapSize, - ConcurrentSkipListMap> typeParserMap) { + SchemaRegistryAsyncClient(AzureSchemaRegistry restService) { this.restService = restService; - this.maxSchemaMapSize = maxSchemaMapSize; - this.typeParserMap = typeParserMap; - this.idCache = new ConcurrentHashMap<>(); - this.schemaStringCache = new ConcurrentHashMap<>(); } /** @@ -119,11 +104,6 @@ Mono> registerSchemaWithResponse(String groupName, St name, content.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING)); - schemaStringCache.putIfAbsent(getSchemaStringCacheKey(groupName, name, content), - registered); - idCache.putIfAbsent(schemaId.getId(), registered); - - logger.verbose("Cached schema string. Group: '{}', name: '{}'", groupName, name); SimpleResponse schemaRegistryObjectSimpleResponse = new SimpleResponse<>( response.getRequest(), response.getStatusCode(), response.getHeaders(), registered); @@ -140,10 +120,6 @@ Mono> registerSchemaWithResponse(String groupName, St */ @ServiceMethod(returns = ReturnType.SINGLE) public Mono getSchema(String id) { - if (idCache.containsKey(id)) { - logger.verbose("Cache hit for schema id '{}'", id); - return Mono.fromCallable(() -> idCache.get(id)); - } return getSchemaWithResponse(id).map(Response::getValue); } @@ -154,7 +130,8 @@ public Mono getSchema(String id) { * * @return The {@link SchemaProperties} associated with the given {@code id} along with the HTTP response. */ - Mono> getSchemaWithResponse(String id) { + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono> getSchemaWithResponse(String id) { return FluxUtil.withContext(context -> getSchemaWithResponse(id, context)); } @@ -174,21 +151,12 @@ Mono> getSchemaWithResponse(String id, Context contex return; } - final String schemaGroup = matcher.group("schemaGroup"); final String schemaName = matcher.group("schemaName"); final SchemaProperties schemaObject = new SchemaProperties(id, serializationType, schemaName, response.getValue()); - final String schemaCacheKey = getSchemaStringCacheKey(schemaGroup, schemaName, - new String(response.getValue(), SCHEMA_REGISTRY_SERVICE_ENCODING)); - - schemaStringCache.putIfAbsent(schemaCacheKey, schemaObject); - idCache.putIfAbsent(id, schemaObject); - - logger.verbose("Cached schema object. Path: '{}'", id); - - SimpleResponse schemaResponse = new SimpleResponse<>( + final SimpleResponse schemaResponse = new SimpleResponse<>( response.getRequest(), response.getStatusCode(), response.getHeaders(), schemaObject); @@ -210,16 +178,6 @@ Mono> getSchemaWithResponse(String id, Context contex @ServiceMethod(returns = ReturnType.SINGLE) public Mono getSchemaId(String groupName, String name, String content, SerializationType serializationType) { - - String schemaStringCacheKey = getSchemaStringCacheKey(groupName, name, content); - - if (schemaStringCache.containsKey(schemaStringCacheKey)) { - return Mono.fromCallable(() -> { - logger.verbose("Cache hit schema string. Group: '{}', name: '{}'", groupName, name); - return schemaStringCache.get(schemaStringCacheKey).getSchemaId(); - }); - } - return getSchemaIdWithResponse(groupName, name, content, serializationType) .map(response -> response.getValue()); } @@ -234,7 +192,8 @@ public Mono getSchemaId(String groupName, String name, String content, * * @return The unique identifier for this schema. */ - Mono> getSchemaIdWithResponse(String groupName, String name, String content, + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono> getSchemaIdWithResponse(String groupName, String name, String content, SerializationType serializationType) { return FluxUtil.withContext(context -> @@ -256,27 +215,32 @@ Mono> getSchemaIdWithResponse(String groupName, String name, St SerializationType serializationType, Context context) { return this.restService.getSchemas() - .queryIdByContentWithResponseAsync(groupName, name, - com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO, content) + .queryIdByContentWithResponseAsync(groupName, name, getSerialization(serializationType), content) .handle((response, sink) -> { SchemaId schemaId = response.getValue(); - SchemaProperties properties = new SchemaProperties(schemaId.getId(), serializationType, name, - content.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING)); - - schemaStringCache.putIfAbsent( - getSchemaStringCacheKey(groupName, name, content), properties); - idCache.putIfAbsent(schemaId.getId(), properties); - - logger.verbose("Cached schema string. Group: '{}', name: '{}'", groupName, name); - SimpleResponse schemaIdResponse = new SimpleResponse<>( response.getRequest(), response.getStatusCode(), response.getHeaders(), schemaId.getId()); + sink.next(schemaIdResponse); }); } - private static String getSchemaStringCacheKey(String groupName, String name, String content) { - return groupName + name + content; + /** + * Gets the matching implementation class serialization type. + * + * @param serializationType Model serialization type. + * + * @return Implementation serialization type. + * + * @throws UnsupportedOperationException if the serialization type is not supported. + */ + private static com.azure.data.schemaregistry.implementation.models.SerializationType getSerialization( + SerializationType serializationType) { + if (serializationType == SerializationType.AVRO) { + return com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO; + } else { + throw new UnsupportedOperationException("Serialization type is not supported: " + serializationType); + } } } From db777a0eee5ea34c8db361c20b1f6a93428428bc Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 27 Sep 2021 12:35:43 -0700 Subject: [PATCH 3/4] Remove cached tests. --- .../SchemaRegistryAsyncClientTests.java | 29 ------------------- .../SchemaRegistryClientTests.java | 23 --------------- ...lientTests.registerAndGetCachedSchema.json | 29 ------------------- ...lientTests.registerAndGetCachedSchema.json | 29 ------------------- 4 files changed, 110 deletions(-) delete mode 100644 sdk/schemaregistry/azure-data-schemaregistry/src/test/resources/session-records/SchemaRegistryAsyncClientTests.registerAndGetCachedSchema.json delete mode 100644 sdk/schemaregistry/azure-data-schemaregistry/src/test/resources/session-records/SchemaRegistryClientTests.registerAndGetCachedSchema.json diff --git a/sdk/schemaregistry/azure-data-schemaregistry/src/test/java/com/azure/data/schemaregistry/SchemaRegistryAsyncClientTests.java b/sdk/schemaregistry/azure-data-schemaregistry/src/test/java/com/azure/data/schemaregistry/SchemaRegistryAsyncClientTests.java index ddb6023996cdc..09b96bf395a3d 100644 --- a/sdk/schemaregistry/azure-data-schemaregistry/src/test/java/com/azure/data/schemaregistry/SchemaRegistryAsyncClientTests.java +++ b/sdk/schemaregistry/azure-data-schemaregistry/src/test/java/com/azure/data/schemaregistry/SchemaRegistryAsyncClientTests.java @@ -222,35 +222,6 @@ public void registerBadRequest() { }).verify(); } - /** - * Verifies that we can register a schema and then get it by its schemaId. - */ - @Test - public void registerAndGetCachedSchema() { - // Arrange - final String schemaName = testResourceNamer.randomName("sch", RESOURCE_LENGTH); - final SchemaRegistryAsyncClient client1 = builder.buildAsyncClient(); - - final AtomicReference schemaId = new AtomicReference<>(); - - // Act & Assert - StepVerifier.create(client1.registerSchema(schemaGroup, schemaName, SCHEMA_CONTENT, SerializationType.AVRO)) - .assertNext(response -> { - assertSchemaProperties(response, null, schemaName, SCHEMA_CONTENT); - schemaId.set(response.getSchemaId()); - }).verifyComplete(); - - // Assert that we can get a schema based on its id. We registered a schema with client1 and its response is - // cached, so it won't make a network call when getting the schema. client2 will not have this information. - final String schemaIdToGet = schemaId.get(); - assertNotNull(schemaIdToGet); - - // Act & Assert - StepVerifier.create(client1.getSchema(schemaIdToGet)) - .assertNext(schema -> assertSchemaProperties(schema, schemaIdToGet, schemaName, SCHEMA_CONTENT)) - .verifyComplete(); - } - /** * Verifies that we get 404 when non-existent schema returned. */ diff --git a/sdk/schemaregistry/azure-data-schemaregistry/src/test/java/com/azure/data/schemaregistry/SchemaRegistryClientTests.java b/sdk/schemaregistry/azure-data-schemaregistry/src/test/java/com/azure/data/schemaregistry/SchemaRegistryClientTests.java index 13f34288aa7f5..b72a023662ff4 100644 --- a/sdk/schemaregistry/azure-data-schemaregistry/src/test/java/com/azure/data/schemaregistry/SchemaRegistryClientTests.java +++ b/sdk/schemaregistry/azure-data-schemaregistry/src/test/java/com/azure/data/schemaregistry/SchemaRegistryClientTests.java @@ -184,29 +184,6 @@ public void registerBadRequest() { assertEquals(400, exception.getResponse().getStatusCode()); } - /** - * Verifies that we can register a schema and then get it by its schemaId. - */ - @Test - public void registerAndGetCachedSchema() { - // Arrange - final String schemaName = testResourceNamer.randomName("sch", RESOURCE_LENGTH); - final SchemaRegistryClient client1 = builder.buildClient(); - - // Act & Assert - final SchemaProperties response = client1.registerSchema(schemaGroup, schemaName, SCHEMA_CONTENT, - SerializationType.AVRO); - assertSchemaProperties(response, null, schemaName, SCHEMA_CONTENT); - - // Assert that we can get a schema based on its id. We registered a schema with client1 and its response is - // cached, so it won't make a network call when getting the schema. - final String schemaIdToGet = response.getSchemaId(); - - // Act & Assert - final SchemaProperties response2 = client1.getSchema(schemaIdToGet); - assertSchemaProperties(response2, schemaIdToGet, schemaName, SCHEMA_CONTENT); - } - /** * Verifies that we get 404 when non-existent schema returned. */ diff --git a/sdk/schemaregistry/azure-data-schemaregistry/src/test/resources/session-records/SchemaRegistryAsyncClientTests.registerAndGetCachedSchema.json b/sdk/schemaregistry/azure-data-schemaregistry/src/test/resources/session-records/SchemaRegistryAsyncClientTests.registerAndGetCachedSchema.json deleted file mode 100644 index 50e02ef10e85d..0000000000000 --- a/sdk/schemaregistry/azure-data-schemaregistry/src/test/resources/session-records/SchemaRegistryAsyncClientTests.registerAndGetCachedSchema.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "networkCallRecords" : [ { - "Method" : "PUT", - "Uri" : "https://REDACTED.servicebus.windows.net/$schemagroups/at/schemas/sch14841d17a?api-version=2020-09-01-preview", - "Headers" : { - "User-Agent" : "azsdk-java-UnknownName/UnknownVersion (11.0.5; Windows 10; 10.0)", - "x-ms-client-request-id" : "999c8cd6-6bf8-42ab-aee1-eb8a85df2cb4", - "Content-Type" : "text/plain; charset=utf-8" - }, - "Response" : { - "Transfer-Encoding" : "chunked", - "Schema-Version" : "1", - "Server" : "Microsoft-HTTPAPI/2.0", - "retry-after" : "0", - "Schema-Id-Location" : "https://conniey.servicebus.windows.net:443/$schemagroups/getschemabyid/8926cf463a534f02850c137cce582bd1?api-version=2020-09-01-preview", - "StatusCode" : "200", - "Date" : "Sun, 15 Aug 2021 17:51:05 GMT", - "Strict-Transport-Security" : "max-age=31536000", - "Schema-Id" : "8926cf463a534f02850c137cce582bd1", - "Serialization-Type" : "Avro", - "Body" : "{\"id\":\"8926cf463a534f02850c137cce582bd1\"}", - "Content-Type" : "application/json", - "Location" : "https://conniey.servicebus.windows.net:443/$schemagroups/at/schemas/sch14841d17a/versions/1?api-version=2020-09-01-preview", - "Schema-Versions-Location" : "https://conniey.servicebus.windows.net:443/$schemagroups/at/schemas/sch14841d17a/versions?api-version=2020-09-01-preview" - }, - "Exception" : null - } ], - "variables" : [ "sch14841d17a" ] -} \ No newline at end of file diff --git a/sdk/schemaregistry/azure-data-schemaregistry/src/test/resources/session-records/SchemaRegistryClientTests.registerAndGetCachedSchema.json b/sdk/schemaregistry/azure-data-schemaregistry/src/test/resources/session-records/SchemaRegistryClientTests.registerAndGetCachedSchema.json deleted file mode 100644 index 34e66a433d37b..0000000000000 --- a/sdk/schemaregistry/azure-data-schemaregistry/src/test/resources/session-records/SchemaRegistryClientTests.registerAndGetCachedSchema.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "networkCallRecords" : [ { - "Method" : "PUT", - "Uri" : "https://REDACTED.servicebus.windows.net/$schemagroups/at/schemas/sch39674576f?api-version=2020-09-01-preview", - "Headers" : { - "User-Agent" : "azsdk-java-UnknownName/UnknownVersion (11.0.5; Windows 10; 10.0)", - "x-ms-client-request-id" : "fddc65c9-4cf0-48d7-b0c6-3f31e466f127", - "Content-Type" : "text/plain; charset=utf-8" - }, - "Response" : { - "Transfer-Encoding" : "chunked", - "Schema-Version" : "1", - "Server" : "Microsoft-HTTPAPI/2.0", - "retry-after" : "0", - "Schema-Id-Location" : "https://conniey.servicebus.windows.net:443/$schemagroups/getschemabyid/1c1a73dd90854295887aa61839cf842a?api-version=2020-09-01-preview", - "StatusCode" : "200", - "Date" : "Sun, 15 Aug 2021 17:50:27 GMT", - "Strict-Transport-Security" : "max-age=31536000", - "Schema-Id" : "1c1a73dd90854295887aa61839cf842a", - "Serialization-Type" : "Avro", - "Body" : "{\"id\":\"1c1a73dd90854295887aa61839cf842a\"}", - "Content-Type" : "application/json", - "Location" : "https://conniey.servicebus.windows.net:443/$schemagroups/at/schemas/sch39674576f/versions/1?api-version=2020-09-01-preview", - "Schema-Versions-Location" : "https://conniey.servicebus.windows.net:443/$schemagroups/at/schemas/sch39674576f/versions?api-version=2020-09-01-preview" - }, - "Exception" : null - } ], - "variables" : [ "sch39674576f" ] -} \ No newline at end of file From 04babba3e330589e1ba8519ca1eed0ed66690201 Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 27 Sep 2021 12:41:16 -0700 Subject: [PATCH 4/4] Adding service annotation. --- .../azure/data/schemaregistry/SchemaRegistryClientBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryClientBuilder.java b/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryClientBuilder.java index 714df6b96a8f1..897e400eb8d91 100644 --- a/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryClientBuilder.java +++ b/sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryClientBuilder.java @@ -52,7 +52,7 @@ *

Instantiating with custom retry policy and HTTP log options

* {@codesnippet com.azure.data.schemaregistry.schemaregistryasyncclient.retrypolicy.instantiation} */ -@ServiceClientBuilder(serviceClients = SchemaRegistryAsyncClient.class) +@ServiceClientBuilder(serviceClients = {SchemaRegistryAsyncClient.class, SchemaRegistryClient.class}) public class SchemaRegistryClientBuilder { private final ClientLogger logger = new ClientLogger(SchemaRegistryClientBuilder.class);