From 7832eb69da2b681edba8e68c0f41e80d29d8c80a Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 1 Jul 2024 21:41:43 +0800 Subject: [PATCH] [improve][broker] Improve exception for topic does not have schema to check (#22974) (cherry picked from commit 4c84788340b4a3df975bf4a919c7223b31835976) --- .../nonpersistent/NonPersistentTopic.java | 13 +++++- .../service/persistent/PersistentTopic.java | 13 +++++- .../schema/SchemaRegistryServiceImpl.java | 3 +- .../exceptions/NotExistSchemaException.java | 43 +++++++++++++++++++ .../org/apache/pulsar/schema/SchemaTest.java | 16 ++++--- 5 files changed, 80 insertions(+), 8 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/NotExistSchemaException.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index a6f65f6da3284..b5d9e53c8eb80 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -66,6 +66,8 @@ import org.apache.pulsar.broker.service.TopicAttributes; import org.apache.pulsar.broker.service.TopicPolicyListener; import org.apache.pulsar.broker.service.TransportCnx; +import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; +import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.client.api.MessageId; @@ -1239,7 +1241,16 @@ public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schem || (!producers.isEmpty()) || (numActiveConsumersWithoutAutoSchema != 0) || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) { - return checkSchemaCompatibleForConsumer(schema); + return checkSchemaCompatibleForConsumer(schema) + .exceptionally(ex -> { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof NotExistSchemaException) { + throw FutureUtil.wrapToCompletionException( + new IncompatibleSchemaException("Failed to add schema to an active topic" + + " with empty(BYTES) schema: new schema type " + schema.getType())); + } + throw FutureUtil.wrapToCompletionException(realCause); + }); } else { return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index dbbb0b07ce358..e52a69a1e2f28 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -130,6 +130,8 @@ import org.apache.pulsar.broker.service.TransportCnx; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; +import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; +import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.broker.stats.ReplicationMetrics; @@ -3996,7 +3998,16 @@ public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schem || (userCreatedProducerCount > 0) || (numActiveConsumersWithoutAutoSchema != 0) || (ledger.getTotalSize() != 0)) { - return checkSchemaCompatibleForConsumer(schema); + return checkSchemaCompatibleForConsumer(schema) + .exceptionally(ex -> { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof NotExistSchemaException) { + throw FutureUtil.wrapToCompletionException( + new IncompatibleSchemaException("Failed to add schema to an active topic" + + " with empty(BYTES) schema: new schema type " + schema.getType())); + } + throw FutureUtil.wrapToCompletionException(realCause); + }); } else { return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index ae56df248d85d..2bbe2e366b68b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -48,6 +48,7 @@ import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; +import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException; import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; @@ -393,7 +394,7 @@ public CompletableFuture checkConsumerCompatibility(String schemaId, Schem return checkCompatibilityWithAll(schemaId, schemaData, strategy); } } else { - return FutureUtil.failedFuture(new IncompatibleSchemaException("Topic does not have schema to check")); + return FutureUtil.failedFuture(new NotExistSchemaException("Topic does not have schema to check")); } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/NotExistSchemaException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/NotExistSchemaException.java new file mode 100644 index 0000000000000..2fe0a09237545 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/NotExistSchemaException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.exceptions; + +/** + * Exception is thrown when an schema not exist. + */ +public class NotExistSchemaException extends SchemaException { + + private static final long serialVersionUID = -8342983749283749283L; + + public NotExistSchemaException() { + super("The schema does not exist"); + } + + public NotExistSchemaException(String message) { + super(message); + } + + public NotExistSchemaException(String message, Throwable e) { + super(message, e); + } + + public NotExistSchemaException(Throwable e) { + super(e); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index d21e853ba0982..ae9ea6d5ae6f4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -96,6 +96,7 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -125,6 +126,11 @@ public void cleanup() throws Exception { super.internalCleanup(); } + @DataProvider(name = "topicDomain") + public static Object[] topicDomain() { + return new Object[] { "persistent://", "non-persistent://" }; + } + @Test public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws Exception{ final String tenant = PUBLIC_TENANT; @@ -1336,19 +1342,19 @@ private void testIncompatibleSchema() throws Exception { * the new consumer to register new schema. But before we can solve this problem, we need to modify * "CmdProducer" to let the Broker know that the Producer uses a schema of type "AUTO_PRODUCE_BYTES". */ - @Test - public void testAutoProduceAndSpecifiedConsumer() throws Exception { + @Test(dataProvider = "topicDomain") + public void testAutoProduceAndSpecifiedConsumer(String domain) throws Exception { final String namespace = PUBLIC_TENANT + "/ns_" + randomName(16); admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME)); - final String topicName = "persistent://" + namespace + "/tp_" + randomName(16); + final String topicName = domain + namespace + "/tp_" + randomName(16); admin.topics().createNonPartitionedTopic(topicName); Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create(); try { pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub1").subscribe(); - fail("Should throw ex: Topic does not have schema to check"); + fail("Should throw ex: Failed to add schema to an active topic with empty(BYTES) schema"); } catch (Exception ex){ - assertTrue(ex.getMessage().contains("Topic does not have schema to check")); + assertTrue(ex.getMessage().contains("Failed to add schema to an active topic with empty(BYTES) schema")); } // Cleanup.