-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve error handling logic for effectively once #5271
Conversation
@@ -125,7 +129,8 @@ | |||
public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback { | |||
|
|||
// Managed ledger associated with the topic | |||
protected final ManagedLedger ledger; | |||
@VisibleForTesting | |||
ManagedLedger ledger; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we retain the final
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Show resolved
Hide resolved
// close all producers | ||
List<CompletableFuture<Void>> futures = Lists.newArrayList(); | ||
producers.forEach(producer -> futures.add(producer.disconnect())); | ||
FutureUtil.waitForAll(futures); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will just return a new future that tracks all the futures in the list, without blocking (which is actually what we want).
To ensure we decrement only after all the connections are actually closed, we'd need to do like:
FutureUtil.waitForAll(futures).thenHandle((ex, v) -> {
decrementPendingWriteOpsAndCheck();
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change
callback.completed(new PersistenceException(exception), -1, -1); | ||
} | ||
|
||
long pending = pendingWriteOps.decrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, call decrementPendingWriteOpsAndCheck()
from the future callback instead of here
rerun cpp tests |
1 similar comment
rerun cpp tests |
@@ -272,17 +290,41 @@ private PersistentSubscription createPersistentSubscription(String subscriptionN | |||
|
|||
@Override | |||
public void publishMessage(ByteBuf headersAndPayload, PublishContext publishContext) { | |||
pendingWriteOps.incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a comment here with the logic behind the "increment then check the fence status" operation, because it will not be evident to a reader here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there a fundamental problem here.
What if the client produces [M1, seq:1],[M2, seq:2],[M3, seq:3]
asynchronously. M1 succeeds, M2 fails with a BK error, the managed ledger recovers from the error, then M3 hits the broker and is persisted. At this point, M2 can retry, but the message is lost because seq:2 is lower than seq:3.
List<CompletableFuture<Void>> futures = Lists.newArrayList(); | ||
producers.forEach(producer -> futures.add(producer.disconnect())); | ||
FutureUtil.waitForAll(futures).handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> { | ||
decrementPendingWriteOpsAndCheck(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment here that the write op being decremented is the one incremented in the call that eventually triggered addFailed. Otherwise it looks like you're decrementing for each producer closed.
run integration tests |
The current guard against this scenario is that managed ledger will reject all the writes for a period of 10sec. In practical terms, this should avoid all races between threads (for non-blocking ops), though of course it does not give 100% proof. The next step is to have managed ledger to stay in "error mode" after write failure, until we manually set it back into normal mode, after all the pending ops are done and we got the chance of resetting the topic. |
rerun integration tests |
2 similar comments
rerun integration tests |
rerun integration tests |
Maybe we need to rebrand our "exactly-once" again from "effectively-once" to "probably-once". Maybe we should have some sort of epoch to represent the client <-> producer relationship? When an error occurs on a write, all subsequent writes from that epoch should fail. The error should be kicked back to the client, which should then have to reestablish it's current position before preceeding. |
In this PR, the sequence of handling errors will be:
|
rerun integration tests |
7 similar comments
rerun integration tests |
rerun integration tests |
rerun integration tests |
rerun integration tests |
rerun integration tests |
rerun integration tests |
rerun integration tests |
* Bug in Message Deduplication that may cause incorrect behavior * add tests * fix error message * fix client backoff * fix tests * cleaning up * Fix handling of BK write failures for message dedup * tests and clean up * refactoring code * fixing bugs * addressing comments * add missing license header (cherry picked from commit 8e95f43)
…s unloaded (#7735) ### Motivation When a topic is unloaded and moved to another broker, the producer for geo-replication often remains unclosed. Because of this, geo-replication is not possible on the broker to which the topic was moved and messages accumulate in the replication backlog. ``` 18:56:55.166 [pulsar-io-21-6] ERROR o.a.pulsar.client.impl.ProducerImpl - [persistent://xxx/yyy/zzz] [pulsar.repl.dc2] Failed to create producer: Producer with name 'pulsar.repl.dc2' is already connected to topic ``` When this issue occurs, the following log is output on the broker where the topic is unloaded. ``` 17:14:36.424 [bookkeeper-ml-workers-OrderedExecutor-18-0] INFO o.a.p.b.s.persistent.PersistentTopic - [persistent://xxx/yyy/zzz] Un-fencing topic... ``` Unloaded topics are usually fenced to prevent new clients from connecting. In this case, however, the producers reconnected to the topic because it had been unfenced, and the replicator was restarted. I think this is due to #5271. If a topic is fenced to close or delete, we should not unfence it. ### Modifications When closing or deleting the `PersistentTopic` instance, set the `isClosingOrDeleting` flag to true. If `isClosingOrDeleting` is true, do not unfence the topic unless closing or deleting fails.
…s unloaded (apache#7735) ### Motivation When a topic is unloaded and moved to another broker, the producer for geo-replication often remains unclosed. Because of this, geo-replication is not possible on the broker to which the topic was moved and messages accumulate in the replication backlog. ``` 18:56:55.166 [pulsar-io-21-6] ERROR o.a.pulsar.client.impl.ProducerImpl - [persistent://xxx/yyy/zzz] [pulsar.repl.dc2] Failed to create producer: Producer with name 'pulsar.repl.dc2' is already connected to topic ``` When this issue occurs, the following log is output on the broker where the topic is unloaded. ``` 17:14:36.424 [bookkeeper-ml-workers-OrderedExecutor-18-0] INFO o.a.p.b.s.persistent.PersistentTopic - [persistent://xxx/yyy/zzz] Un-fencing topic... ``` Unloaded topics are usually fenced to prevent new clients from connecting. In this case, however, the producers reconnected to the topic because it had been unfenced, and the replicator was restarted. I think this is due to apache#5271. If a topic is fenced to close or delete, we should not unfence it. ### Modifications When closing or deleting the `PersistentTopic` instance, set the `isClosingOrDeleting` flag to true. If `isClosingOrDeleting` is true, do not unfence the topic unless closing or deleting fails.
…s unloaded (apache#7735) ### Motivation When a topic is unloaded and moved to another broker, the producer for geo-replication often remains unclosed. Because of this, geo-replication is not possible on the broker to which the topic was moved and messages accumulate in the replication backlog. ``` 18:56:55.166 [pulsar-io-21-6] ERROR o.a.pulsar.client.impl.ProducerImpl - [persistent://xxx/yyy/zzz] [pulsar.repl.dc2] Failed to create producer: Producer with name 'pulsar.repl.dc2' is already connected to topic ``` When this issue occurs, the following log is output on the broker where the topic is unloaded. ``` 17:14:36.424 [bookkeeper-ml-workers-OrderedExecutor-18-0] INFO o.a.p.b.s.persistent.PersistentTopic - [persistent://xxx/yyy/zzz] Un-fencing topic... ``` Unloaded topics are usually fenced to prevent new clients from connecting. In this case, however, the producers reconnected to the topic because it had been unfenced, and the replicator was restarted. I think this is due to apache#5271. If a topic is fenced to close or delete, we should not unfence it. ### Modifications When closing or deleting the `PersistentTopic` instance, set the `isClosingOrDeleting` flag to true. If `isClosingOrDeleting` is true, do not unfence the topic unless closing or deleting fails.
…s unloaded (apache#7735) ### Motivation When a topic is unloaded and moved to another broker, the producer for geo-replication often remains unclosed. Because of this, geo-replication is not possible on the broker to which the topic was moved and messages accumulate in the replication backlog. ``` 18:56:55.166 [pulsar-io-21-6] ERROR o.a.pulsar.client.impl.ProducerImpl - [persistent://xxx/yyy/zzz] [pulsar.repl.dc2] Failed to create producer: Producer with name 'pulsar.repl.dc2' is already connected to topic ``` When this issue occurs, the following log is output on the broker where the topic is unloaded. ``` 17:14:36.424 [bookkeeper-ml-workers-OrderedExecutor-18-0] INFO o.a.p.b.s.persistent.PersistentTopic - [persistent://xxx/yyy/zzz] Un-fencing topic... ``` Unloaded topics are usually fenced to prevent new clients from connecting. In this case, however, the producers reconnected to the topic because it had been unfenced, and the replicator was restarted. I think this is due to apache#5271. If a topic is fenced to close or delete, we should not unfence it. ### Modifications When closing or deleting the `PersistentTopic` instance, set the `isClosingOrDeleting` flag to true. If `isClosingOrDeleting` is true, do not unfence the topic unless closing or deleting fails.
…s unloaded (apache#7735) ### Motivation When a topic is unloaded and moved to another broker, the producer for geo-replication often remains unclosed. Because of this, geo-replication is not possible on the broker to which the topic was moved and messages accumulate in the replication backlog. ``` 18:56:55.166 [pulsar-io-21-6] ERROR o.a.pulsar.client.impl.ProducerImpl - [persistent://xxx/yyy/zzz] [pulsar.repl.dc2] Failed to create producer: Producer with name 'pulsar.repl.dc2' is already connected to topic ``` When this issue occurs, the following log is output on the broker where the topic is unloaded. ``` 17:14:36.424 [bookkeeper-ml-workers-OrderedExecutor-18-0] INFO o.a.p.b.s.persistent.PersistentTopic - [persistent://xxx/yyy/zzz] Un-fencing topic... ``` Unloaded topics are usually fenced to prevent new clients from connecting. In this case, however, the producers reconnected to the topic because it had been unfenced, and the replicator was restarted. I think this is due to apache#5271. If a topic is fenced to close or delete, we should not unfence it. ### Modifications When closing or deleting the `PersistentTopic` instance, set the `isClosingOrDeleting` flag to true. If `isClosingOrDeleting` is true, do not unfence the topic unless closing or deleting fails.
Motivation
As a part of solving #5218
Modifications
When there are BK write errors we need to fence the topic and reset highestSequencedPushed -> highestSequencedPersisted