-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix schema does not replicate successfully #17049
[fix][broker] Fix schema does not replicate successfully #17049
Conversation
### Motivation apache#11441 supports replicate schema to remote clusters. But there is a mistake that the returned schema state is incorrect. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770 Because the replicator used MessageImpl will not have the schema. And this will cause the producer to skip the schema upload. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149 We should remove https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768 To return the correct schema state. And then we should also provide the correct schema hash. If the message is used by the replicator, the schema hash should be based on the replicator schema. Otherwise, it should use based on the schema of the message. ### Modification - Fixed the incorrect returned schema state - Provide the method for getting schema hash for MessageImpl ### Verification Update the test only to create producer to one cluster. Because if create a producer for other clusters, the producer will upload the schema. This is the reason that why the test can get pass before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/pulsarbot rerun-failure-checks |
23a7e95
to
a995883
Compare
Position deletedPos = cursor.getMarkDeletedPosition(); | ||
Position readPos = cursor.getReadPosition(); | ||
|
||
Awaitility.await().timeout(30, TimeUnit.SECONDS).until( | ||
() -> cursor.getMarkDeletedPosition().getEntryId() != (cursor.getReadPosition().getEntryId() - 1)); | ||
|
||
assertNotEquals((readPos.getEntryId() - 1), deletedPos.getEntryId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed here because the producer might be closed with pending messages. After the cursor change to the close state, the producer will also be closed after a read entries failure. So that all the pending messages will fail with the Producer Already Closed
exception, then the cursor will rewind
Line 425 in 9f40cc1
replicator.cursor.rewind(); |
This will make the check failed.
There should be another issue related to this part Line 367 in 9f40cc1
Here might break the message replication order. I'm working on a fix for now. |
/pulsarbot run-failure-checks |
### Motivation #11441 supports replicate schema to remote clusters. But there is a mistake that the returned schema state is incorrect. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770 Because the replicator used MessageImpl will not have the schema. And this will cause the producer to skip the schema upload. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149 We should remove https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768 To return the correct schema state. And then we should also provide the correct schema hash. If the message is used by the replicator, the schema hash should be based on the replicator schema. Otherwise, it should use based on the schema of the message. ### Modification - Fixed the incorrect returned schema state - Provide the method for getting schema hash for MessageImpl
But there is a mistake that the returned schema state is incorrect. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770 Because the replicator used MessageImpl will not have the schema. And this will cause the producer to skip the schema upload. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149 We should remove https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768 To return the correct schema state. And then we should also provide the correct schema hash. If the message is used by the replicator, the schema hash should be based on the replicator schema. Otherwise, it should use based on the schema of the message. - Fixed the incorrect returned schema state - Provide the method for getting schema hash for MessageImpl (cherry picked from commit 7689133)
But there is a mistake that the returned schema state is incorrect. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770 Because the replicator used MessageImpl will not have the schema. And this will cause the producer to skip the schema upload. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149 We should remove https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768 To return the correct schema state. And then we should also provide the correct schema hash. If the message is used by the replicator, the schema hash should be based on the replicator schema. Otherwise, it should use based on the schema of the message. - Fixed the incorrect returned schema state - Provide the method for getting schema hash for MessageImpl (cherry picked from commit 7689133) (cherry picked from commit c1d305b)
But there is a mistake that the returned schema state is incorrect. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770 Because the replicator used MessageImpl will not have the schema. And this will cause the producer to skip the schema upload. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149 We should remove https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768 To return the correct schema state. And then we should also provide the correct schema hash. If the message is used by the replicator, the schema hash should be based on the replicator schema. Otherwise, it should use based on the schema of the message. - Fixed the incorrect returned schema state - Provide the method for getting schema hash for MessageImpl (cherry picked from commit 7689133)
@@ -134,7 +134,7 @@ function test_group_other() { | |||
-Dexclude='**/ManagedLedgerTest.java, | |||
**/OffloadersCacheTest.java | |||
**/PrimitiveSchemaTest.java, | |||
BlobStoreManagedLedgerOffloaderTest.java' | |||
BlobStoreManagedLedgerOffloaderTest.java' -DtestReuseFork=false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui do you remember why this was needed? I added #17943 to remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation is here #17152
Although I think it's safe to add it back
Motivation
#11441 supports replicate schema to remote clusters.
But there is a mistake that the returned schema state is incorrect.
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
Lines 765 to 770 in e826d84
Because the replicator used MessageImpl will not have the schema.
And this will cause the producer to skip the schema upload.
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Lines 2147 to 2149 in e826d84
We should remove
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
Lines 766 to 768 in e826d84
To return the correct schema state.
And then we should also provide the correct schema hash.
If the message is used by the replicator, the schema hash should
be based on the replicator schema. Otherwise, it should use based
on the schema of the message.
Modification
Verification
Update the test only to create a producer to one cluster.
Because if create a producer for other clusters, the producer will
upload the schema. This is the reason why the test can get
passed before.
doc-not-needed