Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11239. Fix KeyOutputStream's exception handling when calling hsync concurrently #7047

Merged
merged 13 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;

/**
* Client side error injector allowing simulating receiving errors from server side.
*/
@FunctionalInterface
public interface ErrorInjector {
RaftClientReply getResponse(ContainerProtos.ContainerCommandRequestProto request, ClientId id, Pipeline pipeline);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
* Factory for XceiverClientSpi implementations. Client instances are not cached.
*/
public class XceiverClientCreator implements XceiverClientFactory {
private static ErrorInjector errorInjector;

public static void enableErrorInjection(ErrorInjector injector) {
errorInjector = injector;
}

private final ConfigurationSource conf;
private final boolean topologyAwareRead;
private final ClientTrustManager trustManager;
Expand Down Expand Up @@ -60,7 +66,7 @@ protected XceiverClientSpi newClient(Pipeline pipeline) throws IOException {
XceiverClientSpi client;
switch (pipeline.getType()) {
case RATIS:
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, trustManager);
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, trustManager, errorInjector);
break;
case STAND_ALONE:
client = new XceiverClientGrpc(pipeline, conf, trustManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
public class XceiverClientManager extends XceiverClientCreator {
private static final Logger LOG =
LoggerFactory.getLogger(XceiverClientManager.class);

private final Cache<String, XceiverClientSpi> clientCache;
private final CacheMetrics cacheMetrics;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ public final class XceiverClientRatis extends XceiverClientSpi {
public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
ConfigurationSource ozoneConf) {
return newXceiverClientRatis(pipeline, ozoneConf, null);
return newXceiverClientRatis(pipeline, ozoneConf, null, null);
}

public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
ConfigurationSource ozoneConf, ClientTrustManager trustManager) {
ConfigurationSource ozoneConf, ClientTrustManager trustManager, ErrorInjector errorInjector) {
final String rpcType = ozoneConf
.get(ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
Expand All @@ -93,7 +93,7 @@ public static XceiverClientRatis newXceiverClientRatis(
SecurityConfig(ozoneConf), trustManager);
return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType),
retryPolicy, tlsConfig, ozoneConf);
retryPolicy, tlsConfig, ozoneConf, errorInjector);
}

private final Pipeline pipeline;
Expand All @@ -110,13 +110,14 @@ public static XceiverClientRatis newXceiverClientRatis(
= XceiverClientManager.getXceiverClientMetrics();
private final RaftProtos.ReplicationLevel watchType;
private final int majority;
private final ErrorInjector errorInjector;

/**
* Constructs a client.
*/
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
ConfigurationSource configuration) {
ConfigurationSource configuration, ErrorInjector errorInjector) {
super();
this.pipeline = pipeline;
this.majority = (pipeline.getReplicationConfig().getRequiredNodes() / 2) + 1;
Expand All @@ -142,6 +143,7 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(),
new Throwable("TRACE"));
}
this.errorInjector = errorInjector;
}

private long updateCommitInfosMap(RaftClientReply reply, RaftProtos.ReplicationLevel level) {
Expand Down Expand Up @@ -248,6 +250,12 @@ public ConcurrentMap<UUID, Long> getCommitInfoMap() {

private CompletableFuture<RaftClientReply> sendRequestAsync(
ContainerCommandRequestProto request) {
if (errorInjector != null) {
RaftClientReply response = errorInjector.getResponse(request, getClient().getId(), pipeline);
if (response != null) {
return CompletableFuture.completedFuture(response);
}
}
return TracingUtil.executeInNewSpan(
"XceiverClientRatis." + request.getCmdType().name(),
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class BlockOutputStream extends OutputStream {
= new AtomicReference<>();

private final BlockData.Builder containerBlockData;
private XceiverClientFactory xceiverClientFactory;
private volatile XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private OzoneClientConfig config;
private StreamBufferArgs streamBufferArgs;
Expand Down Expand Up @@ -216,7 +216,8 @@ public BlockOutputStream(
this.token.encodeToUrlString();

//number of buffers used before doing a flush
refreshCurrentBuffer();
currentBuffer = null;
currentBufferRemaining = 0;
flushPeriod = (int) (streamBufferArgs.getStreamBufferFlushSize() / streamBufferArgs
.getStreamBufferSize());

Expand Down Expand Up @@ -254,12 +255,6 @@ private boolean allDataNodesSupportPiggybacking() {
return true;
}

synchronized void refreshCurrentBuffer() {
currentBuffer = bufferPool.getCurrentBuffer();
currentBufferRemaining =
currentBuffer != null ? currentBuffer.remaining() : 0;
}

public BlockID getBlockID() {
return blockID.get();
}
Expand Down Expand Up @@ -418,42 +413,44 @@ private void updatePutBlockLength() {
* @param len length of data to write
* @throws IOException if error occurred
*/

// In this case, the data is already cached in the currentBuffer.
public synchronized void writeOnRetry(long len) throws IOException {
if (len == 0) {
return;
}

// In this case, the data from the failing (previous) block already cached in the allocated buffers in
// the BufferPool. For each pending buffers in the BufferPool, we sequentially flush it and wait synchronously.

List<ChunkBuffer> allocatedBuffers = bufferPool.getAllocatedBuffers();
if (LOG.isDebugEnabled()) {
LOG.debug("Retrying write length {} for blockID {}", len, blockID);
LOG.debug("{}: Retrying write length {} on target blockID {}, {} buffers", this, len, blockID,
allocatedBuffers.size());
}
Preconditions.checkArgument(len <= streamBufferArgs.getStreamBufferMaxSize());
int count = 0;
List<ChunkBuffer> allocatedBuffers = bufferPool.getAllocatedBuffers();
while (len > 0) {
ChunkBuffer buffer = allocatedBuffers.get(count);
long writeLen = Math.min(buffer.position(), len);
if (!buffer.hasRemaining()) {
writeChunk(buffer);
}
len -= writeLen;
count++;
writtenDataLength += writeLen;
// we should not call isBufferFull/shouldFlush here.
// The buffer might already be full as whole data is already cached in
// the buffer. We should just validate
// if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
// call for handling full buffer/flush buffer condition.
if (writtenDataLength % streamBufferArgs.getStreamBufferFlushSize() == 0) {
// reset the position to zero as now we will be reading the
// next buffer in the list
updateWriteChunkLength();
updatePutBlockLength();
CompletableFuture<PutBlockResult> putBlockResultFuture = executePutBlock(false, false);
recordWatchForCommitAsync(putBlockResultFuture);
updateWriteChunkLength();
updatePutBlockLength();
LOG.debug("Write chunk on retry buffer = {}", buffer);
CompletableFuture<PutBlockResult> putBlockFuture;
if (allowPutBlockPiggybacking) {
putBlockFuture = writeChunkAndPutBlock(buffer, false);
} else {
writeChunk(buffer);
putBlockFuture = executePutBlock(false, false);
}
if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) {
handleFullBuffer();
CompletableFuture<Void> watchForCommitAsync = watchForCommitAsync(putBlockFuture);
try {
watchForCommitAsync.get();
} catch (InterruptedException e) {
handleInterruptedException(e, true);
} catch (ExecutionException e) {
handleExecutionException(e);
}
}
}
Expand All @@ -479,14 +476,6 @@ private void handleFullBuffer() throws IOException {
void releaseBuffersOnException() {
}

// It may happen that once the exception is encountered , we still might
// have successfully flushed up to a certain index. Make sure the buffers
// only contain data which have not been sufficiently replicated
private void adjustBuffersOnException() {
releaseBuffersOnException();
refreshCurrentBuffer();
}

/**
* Watch for a specific commit index.
*/
Expand Down Expand Up @@ -633,6 +622,9 @@ private CompletableFuture<PutBlockResult> writeChunkAndPutBlock(ChunkBuffer buff
protected void handleFlush(boolean close) throws IOException {
try {
handleFlushInternal(close);
if (close) {
waitForAllPendingFlushes();
}
} catch (ExecutionException e) {
handleExecutionException(e);
} catch (InterruptedException ex) {
Expand Down Expand Up @@ -675,6 +667,17 @@ private void handleFlushInternal(boolean close)
}
}

public void waitForAllPendingFlushes() throws IOException {
// When closing, must wait for all flush futures to complete.
try {
allPendingFlushFutures.get();
} catch (InterruptedException e) {
handleInterruptedException(e, true);
} catch (ExecutionException e) {
handleExecutionException(e);
}
}

private synchronized CompletableFuture<Void> handleFlushInternalSynchronized(boolean close) throws IOException {
CompletableFuture<PutBlockResult> putBlockResultFuture = null;
// flush the last chunk data residing on the currentBuffer
Expand Down Expand Up @@ -740,6 +743,7 @@ public void close() throws IOException {
// Preconditions.checkArgument(buffer.position() == 0);
// bufferPool.checkBufferPoolEmpty();
} else {
waitForAllPendingFlushes();
cleanup(false);
}
}
Expand Down Expand Up @@ -783,7 +787,7 @@ public void setIoException(Exception e) {
void cleanup() {
}

public void cleanup(boolean invalidateClient) {
public synchronized void cleanup(boolean invalidateClient) {
if (xceiverClientFactory != null) {
xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
}
Expand Down Expand Up @@ -811,7 +815,6 @@ void checkOpen() throws IOException {
if (isClosed()) {
throw new IOException("BlockOutputStream has been closed.");
} else if (getIoException() != null) {
adjustBuffersOnException();
throw getIoException();
}
}
Expand Down Expand Up @@ -1148,7 +1151,6 @@ void handleInterruptedException(Exception ex,
*/
private void handleExecutionException(Exception ex) throws IOException {
setIoException(ex);
adjustBuffersOnException();
throw getIoException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,10 @@ private CompletableFuture<Message> writeStateMachineData(
writeChunkFuture.thenApply(r -> {
if (r.getResult() != ContainerProtos.Result.SUCCESS
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
// After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and
jojochuang marked this conversation as resolved.
Show resolved Hide resolved
// that should not crash the pipeline.
&& r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" +
Expand Down Expand Up @@ -1061,7 +1064,8 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
// unhealthy
if (r.getResult() != ContainerProtos.Result.SUCCESS
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
&& r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(
Expand Down
Loading