Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Aug 16, 2024
1 parent da11b5b commit 850b81d
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ XceiverClientReply watchOnLastIndex() throws IOException {
*
* @param commitIndex log index to watch for
* @return minimum commit index replicated to all nodes
* @throws IOException IOException in case watch gets timed out
*/
CompletableFuture<XceiverClientReply> watchForCommitAsync(long commitIndex) {
final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,8 @@ private void doFlushOrWatchIfNeeded() throws IOException {
}

private void recordWatchForCommitAsync(CompletableFuture<PutBlockResult> putBlockResultFuture) {
recordFlushFuture(watchForCommitAsync(putBlockResultFuture));
}
final CompletableFuture<Void> flushFuture = putBlockResultFuture.thenCompose(x -> watchForCommit(x.commitIndex));

private void recordFlushFuture(CompletableFuture<Void> flushFuture) {
Preconditions.checkState(Thread.holdsLock(this));
this.lastFlushFuture = flushFuture;
this.allPendingFlushFutures = allPendingFlushFutures.thenCombine(flushFuture, (last, curr) -> null);
Expand Down Expand Up @@ -475,13 +473,23 @@ private void adjustBuffersOnException() {
}

/**
* Watch for a specific commit index.
* Send a watch request to wait until the given index became committed.
* When watch is not needed (e.g. EC), this is a NOOP.
*
* @param index the log index to wait for.
* @return the future of the reply.
*/
CompletableFuture<XceiverClientReply> sendWatchForCommit(long commitIndex) {
CompletableFuture<XceiverClientReply> sendWatchForCommit(long index) {
return CompletableFuture.completedFuture(null);
}

private CompletableFuture<Void> watchForCommit(long commitIndex) {
try {
checkOpen();
} catch (IOException e) {
throw new FlushRuntimeException(e);
}

LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex);
return sendWatchForCommit(commitIndex)
.thenAccept(this::checkReply)
Expand All @@ -491,7 +499,7 @@ private CompletableFuture<Void> watchForCommit(long commitIndex) {
.whenComplete((r, e) -> LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex));
}

void checkReply(XceiverClientReply reply) {
private void checkReply(XceiverClientReply reply) {
if (reply == null) {
return;
}
Expand Down Expand Up @@ -705,17 +713,6 @@ private synchronized CompletableFuture<Void> handleFlushInternalSynchronized(boo
return lastFlushFuture;
}

private CompletableFuture<Void> watchForCommitAsync(CompletableFuture<PutBlockResult> putBlockResultFuture) {
return putBlockResultFuture.thenCompose(x -> {
try {
checkOpen();
} catch (IOException e) {
throw new FlushRuntimeException(e);
}
return watchForCommit(x.commitIndex);
});
}

@Override
public void close() throws IOException {
if (xceiverClientFactory != null && xceiverClient != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ void releaseBuffersOnException() {
}

@Override
CompletableFuture<XceiverClientReply> sendWatchForCommit(long commitIndex) {
return commitWatcher.watchForCommitAsync(commitIndex);
CompletableFuture<XceiverClientReply> sendWatchForCommit(long index) {
return commitWatcher.watchForCommitAsync(index);
}

@Override
Expand Down

0 comments on commit 850b81d

Please sign in to comment.