Skip to content

Commit

Permalink
HDDS-11239. Fix KeyOutputStream's exception handling when calling hsy…
Browse files Browse the repository at this point in the history
…nc concurrently (#7047)
  • Loading branch information
duongkame authored Aug 21, 2024
1 parent fb43023 commit 7f47535
Show file tree
Hide file tree
Showing 13 changed files with 382 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;

/**
* Client side error injector allowing simulating receiving errors from server side.
*/
@FunctionalInterface
public interface ErrorInjector {
RaftClientReply getResponse(ContainerProtos.ContainerCommandRequestProto request, ClientId id, Pipeline pipeline);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
* Factory for XceiverClientSpi implementations. Client instances are not cached.
*/
public class XceiverClientCreator implements XceiverClientFactory {
private static ErrorInjector errorInjector;

public static void enableErrorInjection(ErrorInjector injector) {
errorInjector = injector;
}

private final ConfigurationSource conf;
private final boolean topologyAwareRead;
private final ClientTrustManager trustManager;
Expand Down Expand Up @@ -60,7 +66,7 @@ protected XceiverClientSpi newClient(Pipeline pipeline) throws IOException {
XceiverClientSpi client;
switch (pipeline.getType()) {
case RATIS:
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, trustManager);
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, trustManager, errorInjector);
break;
case STAND_ALONE:
client = new XceiverClientGrpc(pipeline, conf, trustManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
public class XceiverClientManager extends XceiverClientCreator {
private static final Logger LOG =
LoggerFactory.getLogger(XceiverClientManager.class);

private final Cache<String, XceiverClientSpi> clientCache;
private final CacheMetrics cacheMetrics;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ public final class XceiverClientRatis extends XceiverClientSpi {
public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
ConfigurationSource ozoneConf) {
return newXceiverClientRatis(pipeline, ozoneConf, null);
return newXceiverClientRatis(pipeline, ozoneConf, null, null);
}

public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
ConfigurationSource ozoneConf, ClientTrustManager trustManager) {
ConfigurationSource ozoneConf, ClientTrustManager trustManager, ErrorInjector errorInjector) {
final String rpcType = ozoneConf
.get(ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
Expand All @@ -93,7 +93,7 @@ public static XceiverClientRatis newXceiverClientRatis(
SecurityConfig(ozoneConf), trustManager);
return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType),
retryPolicy, tlsConfig, ozoneConf);
retryPolicy, tlsConfig, ozoneConf, errorInjector);
}

private final Pipeline pipeline;
Expand All @@ -110,13 +110,14 @@ public static XceiverClientRatis newXceiverClientRatis(
= XceiverClientManager.getXceiverClientMetrics();
private final RaftProtos.ReplicationLevel watchType;
private final int majority;
private final ErrorInjector errorInjector;

/**
* Constructs a client.
*/
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
ConfigurationSource configuration) {
ConfigurationSource configuration, ErrorInjector errorInjector) {
super();
this.pipeline = pipeline;
this.majority = (pipeline.getReplicationConfig().getRequiredNodes() / 2) + 1;
Expand All @@ -142,6 +143,7 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(),
new Throwable("TRACE"));
}
this.errorInjector = errorInjector;
}

private long updateCommitInfosMap(RaftClientReply reply, RaftProtos.ReplicationLevel level) {
Expand Down Expand Up @@ -248,6 +250,12 @@ public ConcurrentMap<UUID, Long> getCommitInfoMap() {

private CompletableFuture<RaftClientReply> sendRequestAsync(
ContainerCommandRequestProto request) {
if (errorInjector != null) {
RaftClientReply response = errorInjector.getResponse(request, getClient().getId(), pipeline);
if (response != null) {
return CompletableFuture.completedFuture(response);
}
}
return TracingUtil.executeInNewSpan(
"XceiverClientRatis." + request.getCmdType().name(),
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class BlockOutputStream extends OutputStream {
= new AtomicReference<>();

private final BlockData.Builder containerBlockData;
private XceiverClientFactory xceiverClientFactory;
private volatile XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private OzoneClientConfig config;
private StreamBufferArgs streamBufferArgs;
Expand Down Expand Up @@ -216,7 +216,8 @@ public BlockOutputStream(
this.token.encodeToUrlString();

//number of buffers used before doing a flush
refreshCurrentBuffer();
currentBuffer = null;
currentBufferRemaining = 0;
flushPeriod = (int) (streamBufferArgs.getStreamBufferFlushSize() / streamBufferArgs
.getStreamBufferSize());

Expand Down Expand Up @@ -254,12 +255,6 @@ private boolean allDataNodesSupportPiggybacking() {
return true;
}

synchronized void refreshCurrentBuffer() {
currentBuffer = bufferPool.getCurrentBuffer();
currentBufferRemaining =
currentBuffer != null ? currentBuffer.remaining() : 0;
}

public BlockID getBlockID() {
return blockID.get();
}
Expand Down Expand Up @@ -418,42 +413,44 @@ private void updatePutBlockLength() {
* @param len length of data to write
* @throws IOException if error occurred
*/

// In this case, the data is already cached in the currentBuffer.
public synchronized void writeOnRetry(long len) throws IOException {
if (len == 0) {
return;
}

// In this case, the data from the failing (previous) block already cached in the allocated buffers in
// the BufferPool. For each pending buffers in the BufferPool, we sequentially flush it and wait synchronously.

List<ChunkBuffer> allocatedBuffers = bufferPool.getAllocatedBuffers();
if (LOG.isDebugEnabled()) {
LOG.debug("Retrying write length {} for blockID {}", len, blockID);
LOG.debug("{}: Retrying write length {} on target blockID {}, {} buffers", this, len, blockID,
allocatedBuffers.size());
}
Preconditions.checkArgument(len <= streamBufferArgs.getStreamBufferMaxSize());
int count = 0;
List<ChunkBuffer> allocatedBuffers = bufferPool.getAllocatedBuffers();
while (len > 0) {
ChunkBuffer buffer = allocatedBuffers.get(count);
long writeLen = Math.min(buffer.position(), len);
if (!buffer.hasRemaining()) {
writeChunk(buffer);
}
len -= writeLen;
count++;
writtenDataLength += writeLen;
// we should not call isBufferFull/shouldFlush here.
// The buffer might already be full as whole data is already cached in
// the buffer. We should just validate
// if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
// call for handling full buffer/flush buffer condition.
if (writtenDataLength % streamBufferArgs.getStreamBufferFlushSize() == 0) {
// reset the position to zero as now we will be reading the
// next buffer in the list
updateWriteChunkLength();
updatePutBlockLength();
CompletableFuture<PutBlockResult> putBlockResultFuture = executePutBlock(false, false);
recordWatchForCommitAsync(putBlockResultFuture);
updateWriteChunkLength();
updatePutBlockLength();
LOG.debug("Write chunk on retry buffer = {}", buffer);
CompletableFuture<PutBlockResult> putBlockFuture;
if (allowPutBlockPiggybacking) {
putBlockFuture = writeChunkAndPutBlock(buffer, false);
} else {
writeChunk(buffer);
putBlockFuture = executePutBlock(false, false);
}
if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) {
handleFullBuffer();
CompletableFuture<Void> watchForCommitAsync = watchForCommitAsync(putBlockFuture);
try {
watchForCommitAsync.get();
} catch (InterruptedException e) {
handleInterruptedException(e, true);
} catch (ExecutionException e) {
handleExecutionException(e);
}
}
}
Expand All @@ -479,14 +476,6 @@ private void handleFullBuffer() throws IOException {
void releaseBuffersOnException() {
}

// It may happen that once the exception is encountered , we still might
// have successfully flushed up to a certain index. Make sure the buffers
// only contain data which have not been sufficiently replicated
private void adjustBuffersOnException() {
releaseBuffersOnException();
refreshCurrentBuffer();
}

/**
* Watch for a specific commit index.
*/
Expand Down Expand Up @@ -633,6 +622,9 @@ private CompletableFuture<PutBlockResult> writeChunkAndPutBlock(ChunkBuffer buff
protected void handleFlush(boolean close) throws IOException {
try {
handleFlushInternal(close);
if (close) {
waitForAllPendingFlushes();
}
} catch (ExecutionException e) {
handleExecutionException(e);
} catch (InterruptedException ex) {
Expand Down Expand Up @@ -675,6 +667,17 @@ private void handleFlushInternal(boolean close)
}
}

public void waitForAllPendingFlushes() throws IOException {
// When closing, must wait for all flush futures to complete.
try {
allPendingFlushFutures.get();
} catch (InterruptedException e) {
handleInterruptedException(e, true);
} catch (ExecutionException e) {
handleExecutionException(e);
}
}

private synchronized CompletableFuture<Void> handleFlushInternalSynchronized(boolean close) throws IOException {
CompletableFuture<PutBlockResult> putBlockResultFuture = null;
// flush the last chunk data residing on the currentBuffer
Expand Down Expand Up @@ -740,6 +743,7 @@ public void close() throws IOException {
// Preconditions.checkArgument(buffer.position() == 0);
// bufferPool.checkBufferPoolEmpty();
} else {
waitForAllPendingFlushes();
cleanup(false);
}
}
Expand Down Expand Up @@ -783,7 +787,7 @@ public void setIoException(Exception e) {
void cleanup() {
}

public void cleanup(boolean invalidateClient) {
public synchronized void cleanup(boolean invalidateClient) {
if (xceiverClientFactory != null) {
xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
}
Expand Down Expand Up @@ -811,7 +815,6 @@ void checkOpen() throws IOException {
if (isClosed()) {
throw new IOException("BlockOutputStream has been closed.");
} else if (getIoException() != null) {
adjustBuffersOnException();
throw getIoException();
}
}
Expand Down Expand Up @@ -1148,7 +1151,6 @@ void handleInterruptedException(Exception ex,
*/
private void handleExecutionException(Exception ex) throws IOException {
setIoException(ex);
adjustBuffersOnException();
throw getIoException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,10 @@ private CompletableFuture<Message> writeStateMachineData(
writeChunkFuture.thenApply(r -> {
if (r.getResult() != ContainerProtos.Result.SUCCESS
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
// After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and
// that should not crash the pipeline.
&& r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" +
Expand Down Expand Up @@ -1061,7 +1064,8 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
// unhealthy
if (r.getResult() != ContainerProtos.Result.SUCCESS
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
&& r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(
Expand Down
Loading

0 comments on commit 7f47535

Please sign in to comment.