From c1d305b1f3c60014d9982f0c189eecf3d0facafa Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Thu, 18 Aug 2022 11:21:14 +0800 Subject: [PATCH] [fix][broker] Fix schema does not replicate successfully (#17049) But there is a mistake that the returned schema state is incorrect. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770 Because the replicator used MessageImpl will not have the schema. And this will cause the producer to skip the schema upload. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149 We should remove https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768 To return the correct schema state. And then we should also provide the correct schema hash. If the message is used by the replicator, the schema hash should be based on the replicator schema. Otherwise, it should use based on the schema of the message. - Fixed the incorrect returned schema state - Provide the method for getting schema hash for MessageImpl (cherry picked from commit 7689133adfd930a50c2690ecca1f2068cafa8bcb) --- build/run_unit_group.sh | 2 +- .../pulsar/broker/service/ReplicatorTest.java | 57 +++++++++++-------- .../pulsar/client/impl/MessageImpl.java | 12 +++- .../pulsar/client/impl/ProducerImpl.java | 9 +-- .../impl/MultiTopicsConsumerImplTest.java | 2 +- .../common/protocol/schema/SchemaHash.java | 7 ++- 6 files changed, 52 insertions(+), 37 deletions(-) diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh index 19dc713d70078..a5af838e4d4d0 100755 --- a/build/run_unit_group.sh +++ b/build/run_unit_group.sh @@ -117,7 +117,7 @@ function other() { **/ManagedLedgerTest.java, **/TestPulsarKeyValueSchemaHandler.java, **/PrimitiveSchemaTest.java, - BlobStoreManagedLedgerOffloaderTest.java' + BlobStoreManagedLedgerOffloaderTest.java' -DtestReuseFork=false $MVN_TEST_COMMAND -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java, **/OffloadersCacheTest.java' diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index c31373c05fcd6..21ca713aa30e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.State; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; @@ -70,6 +71,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -78,6 +80,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -391,19 +394,24 @@ public void testReplicationWithSchema() throws Exception { final String subName = "my-sub"; @Cleanup - Producer producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class)) - .topic(topic.toString()) - .create(); - @Cleanup - Producer producer2 = client2.newProducer(Schema.AVRO(Schemas.PersonOne.class)) - .topic(topic.toString()) - .create(); - @Cleanup - Producer producer3 = client3.newProducer(Schema.AVRO(Schemas.PersonOne.class)) + Producer producer = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class)) .topic(topic.toString()) .create(); - List> producers = Lists.newArrayList(producer1, producer2, producer3); + admin1.topics().createSubscription(topic.toString(), subName, MessageId.earliest); + admin2.topics().createSubscription(topic.toString(), subName, MessageId.earliest); + admin3.topics().createSubscription(topic.toString(), subName, MessageId.earliest); + + + for (int i = 0; i < 10; i++) { + producer.send(new Schemas.PersonOne(i)); + } + + Awaitility.await().untilAsserted(() -> { + assertTrue(admin1.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0); + assertTrue(admin2.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0); + assertTrue(admin3.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0); + }); @Cleanup Consumer consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class)) @@ -423,8 +431,7 @@ public void testReplicationWithSchema() throws Exception { .subscriptionName(subName) .subscribe(); - for (int i = 0; i < 3; i++) { - producers.get(i).send(new Schemas.PersonOne(i)); + for (int i = 0; i < 10; i++) { Message msg1 = consumer1.receive(); Message msg2 = consumer2.receive(); Message msg3 = consumer3.receive(); @@ -1390,15 +1397,21 @@ public void testReplicatorWithFailedAck() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString(), false) .getNow(null).get(); + MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get(); + Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId()); ConcurrentOpenHashMap replicators = topic.getReplicators(); PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2"); - Awaitility.await().timeout(50, TimeUnit.SECONDS) + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS) .untilAsserted(() -> assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started, replicator.getState())); assertEquals(replicator.getState(), org.apache.pulsar.broker.service.AbstractReplicator.State.Started); ManagedCursorImpl cursor = (ManagedCursorImpl) replicator.getCursor(); + + // Make sure all the data has replicated to the remote cluster before close the cursor. + Awaitility.await().untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), lastPosition)); + cursor.setState(State.Closed); Field field = ManagedCursorImpl.class.getDeclaredField("state"); @@ -1407,22 +1420,16 @@ public void testReplicatorWithFailedAck() throws Exception { producer1.produce(10); - Position deletedPos = cursor.getMarkDeletedPosition(); - Position readPos = cursor.getReadPosition(); - - Awaitility.await().timeout(30, TimeUnit.SECONDS).until( - () -> cursor.getMarkDeletedPosition().getEntryId() != (cursor.getReadPosition().getEntryId() - 1)); - - assertNotEquals((readPos.getEntryId() - 1), deletedPos.getEntryId()); + // The cursor is closed, so the mark delete position will not move forward. + assertEquals(cursor.getMarkDeletedPosition(), lastPosition); field.set(cursor, State.Open); Awaitility.await().timeout(30, TimeUnit.SECONDS).until( - () -> cursor.getMarkDeletedPosition().getEntryId() == (cursor.getReadPosition().getEntryId() - 1)); - - deletedPos = cursor.getMarkDeletedPosition(); - readPos = cursor.getReadPosition(); - assertEquals((readPos.getEntryId() - 1), deletedPos.getEntryId()); + () -> { + log.info("++++++++++++ {}, {}", cursor.getMarkDeletedPosition(), cursor.getReadPosition()); + return cursor.getMarkDeletedPosition().getEntryId() == (cursor.getReadPosition().getEntryId() - 1); + }); } private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 2fb9311b4b547..6863a34f80f38 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -52,6 +52,7 @@ import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; +import org.apache.pulsar.common.protocol.schema.SchemaHash; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -64,6 +65,8 @@ public class MessageImpl implements Message { private ByteBuf payload; private Schema schema; + + private SchemaHash schemaHash; private SchemaInfo schemaInfoForReplicator; private SchemaState schemaState = SchemaState.None; private Optional encryptionCtx = Optional.empty(); @@ -91,6 +94,7 @@ public static MessageImpl create(MessageMetadata msgMetadata, ByteBuffer msg.payload = Unpooled.wrappedBuffer(payload); msg.properties = null; msg.schema = schema; + msg.schemaHash = SchemaHash.of(schema); msg.uncompressedSize = payload.remaining(); return msg; } @@ -431,9 +435,14 @@ public SchemaInfo getSchemaInfo() { return schema.getSchemaInfo(); } + public SchemaHash getSchemaHash() { + return schemaHash == null ? SchemaHash.of(new byte[0], null) : schemaHash; + } + public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) { if (msgMetadata.hasReplicatedFrom()) { this.schemaInfoForReplicator = schemaInfo; + this.schemaHash = SchemaHash.of(schemaInfo); } else { throw new IllegalArgumentException("Only allowed to set schemaInfoForReplicator for a replicated message."); } @@ -755,9 +764,6 @@ int getUncompressedSize() { } SchemaState getSchemaState() { - if (getSchemaInfo() == null) { - return SchemaState.Ready; - } return schemaState; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 9c25f269ceece..133a4eca0cddd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -661,8 +661,7 @@ private boolean populateMessageSchema(MessageImpl msg, SendCallback callback) { completeCallbackAndReleaseSemaphore(msg.getUncompressedSize(), callback, e); return false; } - SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal()); - byte[] schemaVersion = schemaCache.get(schemaHash); + byte[] schemaVersion = schemaCache.get(msg.getSchemaHash()); if (schemaVersion != null) { msgMetadataBuilder.setSchemaVersion(schemaVersion); msg.setSchemaState(MessageImpl.SchemaState.Ready); @@ -671,8 +670,7 @@ private boolean populateMessageSchema(MessageImpl msg, SendCallback callback) { } private boolean rePopulateMessageSchema(MessageImpl msg) { - SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal()); - byte[] schemaVersion = schemaCache.get(schemaHash); + byte[] schemaVersion = schemaCache.get(msg.getSchemaHash()); if (schemaVersion == null) { return false; } @@ -703,8 +701,7 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call // case, we should not cache the schema version so that the schema version of the message metadata will // be null, instead of an empty array. if (v.length != 0) { - SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal()); - schemaCache.putIfAbsent(schemaHash, v); + schemaCache.putIfAbsent(msg.getSchemaHash(), v); msg.getMessageBuilder().setSchemaVersion(v); } msg.setSchemaState(MessageImpl.SchemaState.Ready); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java index 297059b5feef1..89229c49ab1e6 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java @@ -122,7 +122,7 @@ public void testGetStats() throws Exception { // // Code under tests is using CompletableFutures. Theses may hang indefinitely if code is broken. // That's why a test timeout is defined. - @Test(timeOut = 5000) + @Test(timeOut = 10000) public void testParallelSubscribeAsync() throws Exception { String topicName = "parallel-subscribe-async-topic"; MultiTopicsConsumerImpl impl = createMultiTopicsConsumer(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java index 40220e6047a3b..8bbc18fbb703c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java @@ -54,7 +54,12 @@ public static SchemaHash of(SchemaData schemaData) { return of(schemaData.getData(), schemaData.getType()); } - private static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) { + public static SchemaHash of(SchemaInfo schemaInfo) { + return of(schemaInfo == null ? new byte[0] : schemaInfo.getSchema(), + schemaInfo == null ? null : schemaInfo.getType()); + } + + public static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) { return new SchemaHash(hashFunction.hashBytes(schemaBytes), schemaType); }