Skip to content

Commit

Permalink
HDDS-11340. Avoid extra PubBlock call when a full block is closed (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenSammi authored Aug 20, 2024
1 parent ebad350 commit 93ee77d
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -94,6 +95,9 @@ public class BlockOutputStream extends OutputStream {
KeyValue.newBuilder().setKey(FULL_CHUNK).build();

private AtomicReference<BlockID> blockID;
// planned block full size
private long blockSize;
private AtomicBoolean eofSent = new AtomicBoolean(false);
private final AtomicReference<ChunkInfo> previousChunkInfo
= new AtomicReference<>();

Expand Down Expand Up @@ -164,6 +168,7 @@ public class BlockOutputStream extends OutputStream {
@SuppressWarnings("checkstyle:ParameterNumber")
public BlockOutputStream(
BlockID blockID,
long blockSize,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
BufferPool bufferPool,
Expand All @@ -175,6 +180,7 @@ public BlockOutputStream(
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
this.blockID = new AtomicReference<>(blockID);
this.blockSize = blockSize;
replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
Expand Down Expand Up @@ -530,15 +536,17 @@ CompletableFuture<PutBlockResult> executePutBlock(boolean close,
final XceiverClientReply asyncReply;
try {
BlockData blockData = containerBlockData.build();
LOG.debug("sending PutBlock {}", blockData);
LOG.debug("sending PutBlock {} flushPos {}", blockData, flushPos);

if (config.getIncrementalChunkList()) {
// remove any chunks in the containerBlockData list.
// since they are sent.
containerBlockData.clearChunks();
}

asyncReply = putBlockAsync(xceiverClient, blockData, close, tokenString);
// if block is full, send the eof
boolean isBlockFull = (blockSize != -1 && flushPos == blockSize);
asyncReply = putBlockAsync(xceiverClient, blockData, close || isBlockFull, tokenString);
CompletableFuture<ContainerCommandResponseProto> future = asyncReply.getResponse();
flushFuture = future.thenApplyAsync(e -> {
try {
Expand All @@ -550,6 +558,7 @@ CompletableFuture<PutBlockResult> executePutBlock(boolean close,
if (getIoException() == null && !force) {
handleSuccessfulPutBlock(e.getPutBlock().getCommittedBlockLength(),
asyncReply, flushPos, byteBufferList);
eofSent.set(close || isBlockFull);
}
return e;
}, responseExecutor).exceptionally(e -> {
Expand Down Expand Up @@ -690,7 +699,7 @@ private synchronized CompletableFuture<Void> handleFlushInternalSynchronized(boo
// There're no pending written data, but there're uncommitted data.
updatePutBlockLength();
putBlockResultFuture = executePutBlock(close, false);
} else if (close) {
} else if (close && !eofSent.get()) {
// forcing an "empty" putBlock if stream is being closed without new
// data since latest flush - we need to send the "EOF" flag
updatePutBlockLength();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public ECBlockOutputStream(
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> executorServiceSupplier
) throws IOException {
super(blockID, xceiverClientManager,
super(blockID, -1, xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier);
// In EC stream, there will be only one node in pipeline.
this.datanodeDetails = pipeline.getClosestNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class RatisBlockOutputStream extends BlockOutputStream
@SuppressWarnings("checkstyle:ParameterNumber")
public RatisBlockOutputStream(
BlockID blockID,
long blockSize,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
BufferPool bufferPool,
Expand All @@ -80,7 +81,7 @@ public RatisBlockOutputStream(
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> blockOutputStreamResourceProvider
) throws IOException {
super(blockID, xceiverClientManager, pipeline,
super(blockID, blockSize, xceiverClientManager, pipeline,
bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)

return new RatisBlockOutputStream(
new BlockID(1L, 1L),
-1,
xcm,
pipeline,
bufferPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void checkStream() throws IOException {
* @throws IOException
*/
void createOutputStream() throws IOException {
outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager,
outputStream = new RatisBlockOutputStream(blockID, length, xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs,
executorServiceSupplier);
}
Expand Down

0 comments on commit 93ee77d

Please sign in to comment.