-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid using same OpAddEntry between different ledger handles #5942
Conversation
run cpp tests |
run cpp tests |
2 similar comments
run cpp tests |
run cpp tests |
internalAsyncAddEntry(addOperation); | ||
})); | ||
} | ||
|
||
private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { | ||
pendingAddEntries.add(addOperation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this real root cause of #5588 or it just a patch to avoid such behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The root cause of #5588 is an entry is "re-used" between ledgers. The code at line 1297 is the fix.
internalAsyncAddEntry(addOperation); | ||
})); | ||
} | ||
|
||
private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { | ||
pendingAddEntries.add(addOperation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The root cause of #5588 is an entry is "re-used" between ledgers. The code at line 1297 is the fix.
@@ -1294,9 +1293,24 @@ public synchronized void updateLedgersIdsComplete(Stat stat) { | |||
log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size()); | |||
} | |||
|
|||
// Avoid use same OpAddEntry between different ledger handle | |||
int pendingSize = pendingAddEntries.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, this doesn't seem to be correct to me. you need to preserve the order when adding the newly created ops back to the queue.
what you need to do:
- drain the pendingAddEntries queue;
- create a new OpAddEntry for each entry
- add these ops into an intermediate list in the order of how they are drained
- after the pendingAddEntries are drained, add the intermediate list back to the pendingAddEntries queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pendingAddEntries is ConcurrentLinkedQueue, there is no drainTo method in ConcurrentLinkedQueue, maybe we can use pendingAddEntries.toArray() and then create a new OpAddEntry for each item of the array and add the new entry to an intermediate list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. it is a queue here.
LedgerHandle ledger; | ||
private long entryId; | ||
|
||
@SuppressWarnings("unused") | ||
private volatile AddEntryCallback callback; | ||
private static final AtomicReferenceFieldUpdater<OpAddEntry, AddEntryCallback> callbackUpdater = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need the change here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just copy the updater close to the field
|
||
if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, State.COMPLETED)) { | ||
log.warn("[{}] The add op is terminal legacy callback for entry {}-{} adding.", ml.getName(), lh.getId(), entryId); | ||
OpAddEntry.this.recycle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we are creating a new entry when retrying the ops on the new ledger, do we need to introduce the state
field and recycle here?
Since this op is only used by the old ledger handler, it will not be reused across ledgers. It should already be recycled correctly.
if (existsOp != null) { | ||
// If op is used by another ledger handle, we need to close it and create a new one | ||
if (existsOp.ledger != null) { | ||
existsOp.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure we need to close here. I think once we duplicate the operation, we can just let the original callback close the old op. so it seems to me that we don't need introducing another state
field here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to close the original op, otherwise when the old op callback, will poll the first op in the pendingAddEntries
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
Line 165 in ef23a4b
checkArgument(this == firstInQueue); |
But, the first op is the new op we replaced.
…5942) ### Motivation Avoid using same OpAddEntry between different ledger handles. ### Modifications Add state for OpAddEntry, if op handled by new ledger handle, the op will set to CLOSED state, after the legacy callback happens will check the op state, only INITIATED can be processed. When ledger rollover happens, pendingAddEntries will be processed. when process pendingAddEntries, will create a new OpAddEntry by the old OpAddEntry to avoid different ledger handles use same OpAddEntry.
…5942) ### Motivation Avoid using same OpAddEntry between different ledger handles. ### Modifications Add state for OpAddEntry, if op handled by new ledger handle, the op will set to CLOSED state, after the legacy callback happens will check the op state, only INITIATED can be processed. When ledger rollover happens, pendingAddEntries will be processed. when process pendingAddEntries, will create a new OpAddEntry by the old OpAddEntry to avoid different ledger handles use same OpAddEntry. (cherry picked from commit 7ec17b2)
### Motivation Avoid using same OpAddEntry between different ledger handles. ### Modifications Add state for OpAddEntry, if op handled by new ledger handle, the op will set to CLOSED state, after the legacy callback happens will check the op state, only INITIATED can be processed. When ledger rollover happens, pendingAddEntries will be processed. when process pendingAddEntries, will create a new OpAddEntry by the old OpAddEntry to avoid different ledger handles use same OpAddEntry. (cherry picked from commit 7ec17b2)
…5942) ### Motivation Avoid using same OpAddEntry between different ledger handles. ### Modifications Add state for OpAddEntry, if op handled by new ledger handle, the op will set to CLOSED state, after the legacy callback happens will check the op state, only INITIATED can be processed. When ledger rollover happens, pendingAddEntries will be processed. when process pendingAddEntries, will create a new OpAddEntry by the old OpAddEntry to avoid different ledger handles use same OpAddEntry. (cherry picked from commit 7ec17b2)
…5942) ### Motivation Avoid using same OpAddEntry between different ledger handles. ### Modifications Add state for OpAddEntry, if op handled by new ledger handle, the op will set to CLOSED state, after the legacy callback happens will check the op state, only INITIATED can be processed. When ledger rollover happens, pendingAddEntries will be processed. when process pendingAddEntries, will create a new OpAddEntry by the old OpAddEntry to avoid different ledger handles use same OpAddEntry.
Fixes #5588
Motivation
Avoid using same OpAddEntry between different ledger handles.
Modifications
Add state for OpAddEntry, if op handled by new ledger handle, the op will set to CLOSED state, after the legacy callback happens will check the op state, only INITIATED can be processed.
When ledger rollover happens, pendingAddEntries will be processed. when process pendingAddEntries, will create a new OpAddEntry by the old OpAddEntry to avoid different ledger handles use same OpAddEntry.
Verifying this change
Added new unit test
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation