Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11208. Change RatisBlockOutputStream to use HDDS-11174. #7072

Merged
merged 4 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ XceiverClientReply watchOnLastIndex() throws IOException {
*
* @param commitIndex log index to watch for
* @return minimum commit index replicated to all nodes
* @throws IOException IOException in case watch gets timed out
*/
CompletableFuture<XceiverClientReply> watchForCommitAsync(long commitIndex) {
final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,8 @@ private void doFlushOrWatchIfNeeded() throws IOException {
}

private void recordWatchForCommitAsync(CompletableFuture<PutBlockResult> putBlockResultFuture) {
recordFlushFuture(watchForCommitAsync(putBlockResultFuture));
}
final CompletableFuture<Void> flushFuture = putBlockResultFuture.thenCompose(x -> watchForCommit(x.commitIndex));

private void recordFlushFuture(CompletableFuture<Void> flushFuture) {
Preconditions.checkState(Thread.holdsLock(this));
this.lastFlushFuture = flushFuture;
this.allPendingFlushFutures = allPendingFlushFutures.thenCombine(flushFuture, (last, curr) -> null);
Expand Down Expand Up @@ -444,7 +442,8 @@ public synchronized void writeOnRetry(long len) throws IOException {
writeChunk(buffer);
putBlockFuture = executePutBlock(false, false);
}
CompletableFuture<Void> watchForCommitAsync = watchForCommitAsync(putBlockFuture);
CompletableFuture<Void> watchForCommitAsync =
putBlockFuture.thenCompose(x -> watchForCommit(x.commitIndex));
try {
watchForCommitAsync.get();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -477,33 +476,44 @@ void releaseBuffersOnException() {
}

/**
* Watch for a specific commit index.
* Send a watch request to wait until the given index became committed.
* When watch is not needed (e.g. EC), this is a NOOP.
*
* @param index the log index to wait for.
* @return the future of the reply.
*/
XceiverClientReply sendWatchForCommit(long commitIndex)
throws IOException {
return null;
CompletableFuture<XceiverClientReply> sendWatchForCommit(long index) {
return CompletableFuture.completedFuture(null);
}

private void watchForCommit(long commitIndex) throws IOException {
checkOpen();
private CompletableFuture<Void> watchForCommit(long commitIndex) {
try {
LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex);
final XceiverClientReply reply = sendWatchForCommit(commitIndex);
if (reply != null) {
List<DatanodeDetails> dnList = reply.getDatanodes();
if (!dnList.isEmpty()) {
Pipeline pipe = xceiverClient.getPipeline();

LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}",
blockID, pipe, dnList);
failedServers.addAll(dnList);
}
}
} catch (IOException ioe) {
setIoException(ioe);
throw getIoException();
checkOpen();
} catch (IOException e) {
throw new FlushRuntimeException(e);
}

LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex);
return sendWatchForCommit(commitIndex)
.thenAccept(this::checkReply)
.exceptionally(e -> {
throw new FlushRuntimeException(setIoException(e));
})
.whenComplete((r, e) -> LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex));
}

private void checkReply(XceiverClientReply reply) {
if (reply == null) {
return;
}
final List<DatanodeDetails> dnList = reply.getDatanodes();
if (dnList.isEmpty()) {
return;
}
LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex);

LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}",
blockID, xceiverClient.getPipeline(), dnList);
failedServers.addAll(dnList);
}

void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
Expand Down Expand Up @@ -723,16 +733,6 @@ private synchronized CompletableFuture<Void> handleFlushInternalSynchronized(boo
return lastFlushFuture;
}

private CompletableFuture<Void> watchForCommitAsync(CompletableFuture<PutBlockResult> putBlockResultFuture) {
return putBlockResultFuture.thenAccept(x -> {
try {
watchForCommit(x.commitIndex);
} catch (IOException e) {
throw new FlushRuntimeException(e);
}
});
}

@Override
public void close() throws IOException {
if (xceiverClientFactory != null && xceiverClient != null) {
Expand Down Expand Up @@ -771,7 +771,7 @@ void validateResponse(
}


public void setIoException(Exception e) {
public IOException setIoException(Throwable e) {
IOException ioe = getIoException();
if (ioe == null) {
IOException exception = new IOException(EXCEPTION_MSG + e.toString(), e);
Expand All @@ -782,6 +782,7 @@ public void setIoException(Exception e) {
"so subsequent request also encounters " +
"Storage Container Exception {}", ioe, e);
}
return getIoException();
}

void cleanup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ void releaseBuffersOnException() {
}

@Override
XceiverClientReply sendWatchForCommit(long commitIndex) throws IOException {
return commitWatcher.watchForCommit(commitIndex);
CompletableFuture<XceiverClientReply> sendWatchForCommit(long index) {
return commitWatcher.watchForCommitAsync(index);
}

@Override
Expand Down