diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index 0eff36b54ce94..e1c9b13e60f0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -270,6 +270,9 @@ public CompletableFuture findSchemaVersion(String schemaId, SchemaData sch @Override public CompletableFuture checkConsumerCompatibility(String schemaId, SchemaData schemaData, SchemaCompatibilityStrategy strategy) { + if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) { + return CompletableFuture.completedFuture(null); + } return getSchema(schemaId).thenCompose(existingSchema -> { if (existingSchema != null && !existingSchema.schema.isDeleted()) { if (strategy == SchemaCompatibilityStrategy.BACKWARD diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index 02913c628f27f..293f71d5f878f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -395,6 +395,28 @@ public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityS } + @Test + public void testAutoProduceSchemaAlwaysCompatible() throws Exception { + final String tenant = PUBLIC_TENANT; + final String topic = "topic" + randomName(16); + + String namespace = "test-namespace-" + randomName(16); + String topicName = TopicName.get( + TopicDomain.persistent.value(), tenant, namespace, topic).toString(); + NamespaceName namespaceName = NamespaceName.get(tenant, namespace); + admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet(CLUSTER_NAME)); + + // set ALWAYS_COMPATIBLE + admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + + Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create(); + // should not fail + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).subscriptionName("my-sub").topic(topicName).subscribe(); + + producer.close(); + consumer.close(); + } + @Test(dataProvider = "CanReadLastSchemaCompatibilityStrategy") public void testConsumerWithNotCompatibilitySchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception { final String tenant = PUBLIC_TENANT;