From 2ec26ae6cab59a3a749f0e654ed74cb8fded460c Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Tue, 6 Aug 2024 16:52:31 -0700 Subject: [PATCH 01/13] Add test to verify exception handling when running hsync concurrently. --- .../apache/hadoop/hdds/scm/ErrorInjector.java | 31 +++++ .../hadoop/hdds/scm/XceiverClientManager.java | 7 + .../hadoop/hdds/scm/XceiverClientRatis.java | 18 ++- .../org/apache/hadoop/fs/ozone/TestHSync.java | 130 ++++++++++++++++++ 4 files changed, 182 insertions(+), 4 deletions(-) create mode 100644 hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ErrorInjector.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ErrorInjector.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ErrorInjector.java new file mode 100644 index 00000000000..35806967a5c --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ErrorInjector.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftClientReply; + +/** + * Client side error injector allowing simulating receiving errors from server side. + */ +@FunctionalInterface +public interface ErrorInjector { + RaftClientReply getResponse(ContainerProtos.ContainerCommandRequestProto request, ClientId id, Pipeline pipeline); +} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index 285a47ec574..58b766cd99d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -61,6 +61,13 @@ public class XceiverClientManager extends XceiverClientCreator { private static final Logger LOG = LoggerFactory.getLogger(XceiverClientManager.class); + + private static ErrorInjector errorInjector; + + public static void enableErrorInjection(ErrorInjector injector) { + errorInjector = injector; + } + private final Cache clientCache; private final CacheMetrics cacheMetrics; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index eb0ed0a885c..e7a7d1b708c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -29,6 +29,8 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; @@ -79,12 +81,12 @@ public final class XceiverClientRatis extends XceiverClientSpi { public static XceiverClientRatis newXceiverClientRatis( org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline, ConfigurationSource ozoneConf) { - return newXceiverClientRatis(pipeline, ozoneConf, null); + return newXceiverClientRatis(pipeline, ozoneConf, null, null); } public static XceiverClientRatis newXceiverClientRatis( org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline, - ConfigurationSource ozoneConf, ClientTrustManager trustManager) { + ConfigurationSource ozoneConf, ClientTrustManager trustManager, ErrorInjector errorInjector) { final String rpcType = ozoneConf .get(ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_KEY, ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); @@ -93,7 +95,7 @@ public static XceiverClientRatis newXceiverClientRatis( SecurityConfig(ozoneConf), trustManager); return new XceiverClientRatis(pipeline, SupportedRpcType.valueOfIgnoreCase(rpcType), - retryPolicy, tlsConfig, ozoneConf); + retryPolicy, tlsConfig, ozoneConf, errorInjector); } private final Pipeline pipeline; @@ -110,13 +112,14 @@ public static XceiverClientRatis newXceiverClientRatis( = XceiverClientManager.getXceiverClientMetrics(); private final RaftProtos.ReplicationLevel watchType; private final int majority; + private final ErrorInjector errorInjector; /** * Constructs a client. */ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig, - ConfigurationSource configuration) { + ConfigurationSource configuration, ErrorInjector errorInjector) { super(); this.pipeline = pipeline; this.majority = (pipeline.getReplicationConfig().getRequiredNodes() / 2) + 1; @@ -142,6 +145,7 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(), new Throwable("TRACE")); } + this.errorInjector = errorInjector; } private long updateCommitInfosMap(RaftClientReply reply, RaftProtos.ReplicationLevel level) { @@ -248,6 +252,12 @@ public ConcurrentMap getCommitInfoMap() { private CompletableFuture sendRequestAsync( ContainerCommandRequestProto request) { + if (errorInjector != null) { + RaftClientReply response = errorInjector.getResponse(request, getClient().getId(), pipeline); + if (response != null) { + return CompletableFuture.completedFuture(response); + } + } return TracingUtil.executeInNewSpan( "XceiverClientRatis." + request.getCmdType().name(), () -> { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index f27ebaa9727..222d3864065 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -45,6 +45,9 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.scm.ErrorInjector; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.hdds.utils.IOUtils; @@ -95,6 +98,11 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeerId; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; @@ -698,6 +706,99 @@ static void runTestHSync(FileSystem fs, Path file, }, 500, 3000); } + + public static Stream concurrentExceptionHandling() { + return Stream.of( + Arguments.of(2, 1), + Arguments.of(4, 4), + Arguments.of(8, 4) + ); + } + + @ParameterizedTest + @MethodSource("concurrentExceptionHandling") + public void testConcurrentExceptionHandling(int syncerThreads, int errors) throws Exception { + final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY)); + CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + ErrorInjectorImpl errorInjector = new ErrorInjectorImpl(); + XceiverClientManager.enableErrorInjection(errorInjector); + + final String dir = OZONE_ROOT + bucket.getVolumeName() + OZONE_URI_DELIMITER + bucket.getName(); + + try (FileSystem fs = FileSystem.get(CONF)) { + final Path file = new Path(dir, "exceptionhandling"); + byte[] data = new byte[8]; + ThreadLocalRandom.current().nextBytes(data); + int writes; + try (FSDataOutputStream out = fs.create(file, true)) { + writes = runConcurrentWriteHSyncWithException(file, out, data, syncerThreads, errors, errorInjector); + } + validateWrittenFile(file, fs, data, writes); + fs.delete(file, false); + } + } + + private int runConcurrentWriteHSyncWithException(Path file, + final FSDataOutputStream out, byte[] data, int syncThreadsCount, int errors, + ErrorInjectorImpl errorInjector) throws Exception { + + AtomicReference writerException = new AtomicReference<>(); + AtomicReference syncerException = new AtomicReference<>(); + + LOG.info("runConcurrentWriteHSyncWithException {} with size {}", file, data.length); + AtomicInteger writes = new AtomicInteger(); + final long start = Time.monotonicNow(); + + Runnable syncer = () -> { + while ((Time.monotonicNow() - start < 10000)) { + try { + out.write(data); + writes.incrementAndGet(); + out.hsync(); + } catch (Exception e) { + LOG.error("Error calling hsync", e); + syncerException.compareAndSet(null, e); + throw new RuntimeException(e); + } + } + }; + + Thread[] syncThreads = new Thread[syncThreadsCount]; + for (int i = 0; i < syncThreadsCount; i++) { + syncThreads[i] = new Thread(syncer); + syncThreads[i].setName("Syncer-" + i); + syncThreads[i].start(); + } + + // Inject error at 3rd second. + Runnable startErrorInjector = () -> { + while ((Time.monotonicNow() - start <= 3000)) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + errorInjector.start(errors); + LOG.info("Enabled error injection in XceiverClientRatis"); + }; + + new Thread(startErrorInjector).start(); + + for (Thread sync : syncThreads) { + sync.join(); + } + + if (syncerException.get() != null) { + throw syncerException.get(); + } + if (writerException.get() != null) { + throw writerException.get(); + } + return writes.get(); + } + private int runConcurrentWriteHSync(Path file, final FSDataOutputStream out, byte[] data, int syncThreadsCount) throws Exception { @@ -1320,4 +1421,33 @@ private Map getAllDeletedKeys(Table= 0) { + ContainerProtos.ContainerCommandResponseProto proto = ContainerProtos.ContainerCommandResponseProto.newBuilder() + .setResult(ContainerProtos.Result.CLOSED_CONTAINER_IO) + .setMessage("Simulated error #" + errorNum) + .setCmdType(request.getCmdType()) + .build(); + RaftClientReply reply = RaftClientReply.newBuilder() + .setSuccess(true) + .setMessage(Message.valueOf(proto.toByteString())) + .setClientId(clientId) + .setServerId(RaftPeerId.getRaftPeerId(pipeline.getLeaderId().toString())) + .setGroupId(RaftGroupId.randomId()) + .build(); + return reply; + } + + return null; + } + } } From dd7615e02e7ff1145440b63bb498453cf3c1aacb Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Tue, 6 Aug 2024 19:43:45 -0700 Subject: [PATCH 02/13] HDDS-11239. Fix exception handling when calling hsync concurrently. --- .../hadoop/hdds/scm/XceiverClientRatis.java | 2 - .../hdds/scm/storage/BlockOutputStream.java | 47 +++++++++------ .../client/io/BlockOutputStreamEntry.java | 60 ++++++++++++++++++- .../client/io/BlockOutputStreamEntryPool.java | 17 +++--- .../io/ECBlockOutputStreamEntryPool.java | 2 +- .../ozone/client/io/ECKeyOutputStream.java | 2 +- .../ozone/client/io/KeyOutputStream.java | 36 +++++++++-- 7 files changed, 129 insertions(+), 37 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index e7a7d1b708c..b0ef85cfbf7 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -29,8 +29,6 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 43ac69818f9..2d37d456c24 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -216,7 +216,8 @@ public BlockOutputStream( this.token.encodeToUrlString(); //number of buffers used before doing a flush - refreshCurrentBuffer(); + currentBuffer = null; + currentBufferRemaining = 0; flushPeriod = (int) (streamBufferArgs.getStreamBufferFlushSize() / streamBufferArgs .getStreamBufferSize()); @@ -424,38 +425,31 @@ public synchronized void writeOnRetry(long len) throws IOException { if (len == 0) { return; } + List allocatedBuffers = bufferPool.getAllocatedBuffers(); if (LOG.isDebugEnabled()) { - LOG.debug("Retrying write length {} for blockID {}", len, blockID); + LOG.debug("{}: Retrying write length {} for blockID {}, {} buffers", this, len, blockID, allocatedBuffers.size()); } Preconditions.checkArgument(len <= streamBufferArgs.getStreamBufferMaxSize()); int count = 0; - List allocatedBuffers = bufferPool.getAllocatedBuffers(); while (len > 0) { ChunkBuffer buffer = allocatedBuffers.get(count); long writeLen = Math.min(buffer.position(), len); - if (!buffer.hasRemaining()) { - writeChunk(buffer); - } len -= writeLen; count++; writtenDataLength += writeLen; - // we should not call isBufferFull/shouldFlush here. - // The buffer might already be full as whole data is already cached in - // the buffer. We should just validate - // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to - // call for handling full buffer/flush buffer condition. - if (writtenDataLength % streamBufferArgs.getStreamBufferFlushSize() == 0) { - // reset the position to zero as now we will be reading the - // next buffer in the list - updateWriteChunkLength(); - updatePutBlockLength(); - CompletableFuture putBlockResultFuture = executePutBlock(false, false); - recordWatchForCommitAsync(putBlockResultFuture); - } + updateWriteChunkLength(); + LOG.info("Write chunk on retry buffer = {}", buffer); + writeChunk(buffer); if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) { handleFullBuffer(); } } + + // flush all pending data due in exception handling. + updatePutBlockLength(); + CompletableFuture putBlockResultFuture = executePutBlock(false, false); + recordWatchForCommitAsync(putBlockResultFuture); + waitForAllPendingFlushes(); } /** @@ -633,6 +627,9 @@ private CompletableFuture writeChunkAndPutBlock(ChunkBuffer buff protected void handleFlush(boolean close) throws IOException { try { handleFlushInternal(close); + if (close) { + waitForAllPendingFlushes(); + } } catch (ExecutionException e) { handleExecutionException(e); } catch (InterruptedException ex) { @@ -675,6 +672,17 @@ private void handleFlushInternal(boolean close) } } + public void waitForAllPendingFlushes() throws IOException { + // When closing, must wait for all flush futures to complete. + try { + allPendingFlushFutures.get(); + } catch (InterruptedException e) { + handleInterruptedException(e, true); + } catch (ExecutionException e) { + handleExecutionException(e); + } + } + private synchronized CompletableFuture handleFlushInternalSynchronized(boolean close) throws IOException { CompletableFuture putBlockResultFuture = null; // flush the last chunk data residing on the currentBuffer @@ -740,6 +748,7 @@ public void close() throws IOException { // Preconditions.checkArgument(buffer.position() == 0); // bufferPool.checkBufferPoolEmpty(); } else { + waitForAllPendingFlushes(); cleanup(false); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 5e6ecceefa1..3fa81652d55 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -22,6 +22,10 @@ import java.util.Collection; import java.util.Collections; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import org.apache.hadoop.fs.Syncable; @@ -41,6 +45,8 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.ratis.util.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A BlockOutputStreamEntry manages the data writes into the DataNodes. @@ -51,7 +57,7 @@ * but there can be other implementations that are using a different way. */ public class BlockOutputStreamEntry extends OutputStream { - + public static final Logger LOG = LoggerFactory.getLogger(BlockOutputStreamEntry.class); private final OzoneClientConfig config; private OutputStream outputStream; private BlockID blockID; @@ -69,6 +75,19 @@ public class BlockOutputStreamEntry extends OutputStream { private final StreamBufferArgs streamBufferArgs; private final Supplier executorServiceSupplier; + /** + * An indicator that this BlockOutputStream is created to handoff writes from another faulty BlockOutputStream. + * Once this flag is on, this BlockOutputStream can only handle writeOneRetry. + */ + private volatile boolean isHandlingRetry; + private final Lock handlingRetryLock = new ReentrantLock(); + private final Condition handlingRetryCondition = handlingRetryLock.newCondition(); + /** + * To record how many calls(write, flush) are being handled by this block. + */ + private AtomicInteger inflightCalls = new AtomicInteger(); + + BlockOutputStreamEntry(Builder b) { this.config = b.config; this.outputStream = null; @@ -83,6 +102,7 @@ public class BlockOutputStreamEntry extends OutputStream { this.clientMetrics = b.clientMetrics; this.streamBufferArgs = b.streamBufferArgs; this.executorServiceSupplier = b.executorServiceSupplier; + this.isHandlingRetry = b.forRetry; } @Override @@ -102,6 +122,38 @@ void checkStream() throws IOException { } } + void onCallReceived() { + inflightCalls.incrementAndGet(); + } + + boolean oncallFinished() { + return inflightCalls.decrementAndGet() == 0; + } + + void waitForRetryHandling() throws InterruptedException { + handlingRetryLock.lock(); + try { + while (isHandlingRetry) { + LOG.info("{} : Block to wait for retry handling.", this); + handlingRetryCondition.await(); + LOG.info("{} : Done waiting for retry handling.", this); + } + } finally { + handlingRetryLock.unlock(); + } + } + + void finishRetryHandling() { + LOG.info("{}: Exiting retry handling mode", this); + handlingRetryLock.lock(); + try { + isHandlingRetry = false; + handlingRetryCondition.signalAll(); + } finally { + handlingRetryLock.unlock(); + } + } + /** * Creates the outputStreams that are necessary to start the write. * Implementors can override this to instantiate multiple streams instead. @@ -368,6 +420,7 @@ public static class Builder { private ContainerClientMetrics clientMetrics; private StreamBufferArgs streamBufferArgs; private Supplier executorServiceSupplier; + private boolean forRetry; public Pipeline getPipeline() { return pipeline; @@ -433,6 +486,11 @@ public Builder setExecutorServiceSupplier(Supplier executorServ return this; } + public Builder setForRetry(boolean forRetry) { + this.forRetry = forRetry; + return this; + } + public BlockOutputStreamEntry build() { return new BlockOutputStreamEntry(this); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index 99899c6874e..3705a136377 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -141,7 +141,7 @@ public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version, lo // only the blocks allocated in this open session (block createVersion // equals to open session version) for (OmKeyLocationInfo subKeyInfo : version.getLocationList(openVersion)) { - addKeyLocationInfo(subKeyInfo); + addKeyLocationInfo(subKeyInfo, false); } } @@ -154,7 +154,7 @@ public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version, lo * key to be written. * @return a BlockOutputStreamEntry instance that handles how data is written. */ - BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { + BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo, boolean forRetry) { return new BlockOutputStreamEntry.Builder() .setBlockID(subKeyInfo.getBlockID()) @@ -168,12 +168,13 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { .setClientMetrics(clientMetrics) .setStreamBufferArgs(streamBufferArgs) .setExecutorServiceSupplier(executorServiceSupplier) + .setForRetry(forRetry) .build(); } - private synchronized void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) { + private synchronized void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo, boolean forRetry) { Preconditions.checkNotNull(subKeyInfo.getPipeline()); - streamEntries.add(createStreamEntry(subKeyInfo)); + streamEntries.add(createStreamEntry(subKeyInfo, forRetry)); } /** @@ -295,13 +296,13 @@ synchronized long getKeyLength() { * * @throws IOException */ - private void allocateNewBlock() throws IOException { + private void allocateNewBlock(boolean forRetry) throws IOException { if (!excludeList.isEmpty()) { LOG.debug("Allocating block with {}", excludeList); } OmKeyLocationInfo subKeyInfo = omClient.allocateBlock(keyArgs, openID, excludeList); - addKeyLocationInfo(subKeyInfo); + addKeyLocationInfo(subKeyInfo, forRetry); } /** @@ -379,7 +380,7 @@ BlockOutputStreamEntry getCurrentStreamEntry() { * @return the new current open stream to write to * @throws IOException if the block allocation failed. */ - synchronized BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException { + synchronized BlockOutputStreamEntry allocateBlockIfNeeded(boolean forRetry) throws IOException { BlockOutputStreamEntry streamEntry = getCurrentStreamEntry(); if (streamEntry != null && streamEntry.isClosed()) { // a stream entry gets closed either by : @@ -391,7 +392,7 @@ synchronized BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException { Preconditions.checkNotNull(omClient); // allocate a new block, if a exception happens, log an error and // throw exception to the caller directly, and the write fails. - allocateNewBlock(); + allocateNewBlock(forRetry); } // in theory, this condition should never violate due the check above // still do a sanity check. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java index 6eb9aed0d3a..f891724270e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java @@ -37,7 +37,7 @@ public ECBlockOutputStreamEntryPool(ECKeyOutputStream.Builder builder) { } @Override - ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { + ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo, boolean forRetry) { final ECBlockOutputStreamEntry.Builder b = new ECBlockOutputStreamEntry.Builder(); b.setBlockID(subKeyInfo.getBlockID()) .setKey(getKeyName()) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index 0de61f8485d..ea3a3592a55 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -315,7 +315,7 @@ private void generateParityCells() throws IOException { private void writeDataCells(ECChunkBuffers stripe) throws IOException { final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); - blockOutputStreamEntryPool.allocateBlockIfNeeded(); + blockOutputStreamEntryPool.allocateBlockIfNeeded(false); ByteBuffer[] dataCells = stripe.getDataBuffers(); for (int i = 0; i < numDataBlks; i++) { if (dataCells[i].limit() > 0) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 549607c59ad..50d25f5e627 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -109,6 +109,7 @@ enum StreamAction { private boolean atomicKeyCreation; private ContainerClientMetrics clientMetrics; private OzoneManagerVersion ozoneManagerVersion; + private final Object exceptionHandlingLock = new Object(); private final int maxConcurrentWritePerKey; private final KeyOutputStreamSemaphore keyOutputStreamSemaphore; @@ -242,7 +243,7 @@ void handleWrite(byte[] b, int off, long len, boolean retry) while (len > 0) { try { BlockOutputStreamEntry current = - blockOutputStreamEntryPool.allocateBlockIfNeeded(); + blockOutputStreamEntryPool.allocateBlockIfNeeded(retry); // length(len) will be in int range if the call is happening through // write API of blockOutputStream. Length can be in long range if it // comes via Exception path. @@ -272,12 +273,18 @@ private int writeToOutputStream(BlockOutputStreamEntry current, boolean retry, long len, byte[] b, int writeLen, int off, long currentPos) throws IOException { try { + current.onCallReceived(); if (retry) { current.writeOnRetry(len); } else { + current.waitForRetryHandling(); current.write(b, off, writeLen); offset += writeLen; } + current.oncallFinished(); + } catch (InterruptedException e) { + current.oncallFinished(); + throw new InterruptedIOException(); } catch (IOException ioe) { // for the current iteration, totalDataWritten - currentPos gives the // amount of data already written to the buffer @@ -296,10 +303,19 @@ private int writeToOutputStream(BlockOutputStreamEntry current, } LOG.debug("writeLen {}, total len {}", writeLen, len); handleException(current, ioe); + if (current.oncallFinished()) { + blockOutputStreamEntryPool.getCurrentStreamEntry().finishRetryHandling(); + } } return writeLen; } + private void handleException(BlockOutputStreamEntry streamEntry, IOException exception) throws IOException { + synchronized (exceptionHandlingLock) { + handleExceptionInternal(streamEntry, exception); + } + } + /** * It performs following actions : * a. Updates the committed length at datanode for the current stream in @@ -310,8 +326,7 @@ private int writeToOutputStream(BlockOutputStreamEntry current, * @param exception actual exception that occurred * @throws IOException Throws IOException if Write fails */ - private synchronized void handleException(BlockOutputStreamEntry streamEntry, - IOException exception) throws IOException { + private void handleExceptionInternal(BlockOutputStreamEntry streamEntry, IOException exception) throws IOException { Throwable t = HddsClientUtils.checkForException(exception); Preconditions.checkNotNull(t); boolean retryFailure = checkForRetryFailure(t); @@ -338,8 +353,6 @@ private synchronized void handleException(BlockOutputStreamEntry streamEntry, } Preconditions.checkArgument( bufferedDataLen <= streamBufferArgs.getStreamBufferMaxSize()); - Preconditions.checkArgument( - offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen); long containerId = streamEntry.getBlockID().getContainerID(); Collection failedServers = streamEntry.getFailedServers(); Preconditions.checkNotNull(failedServers); @@ -532,14 +545,27 @@ private void handleFlushOrClose(StreamAction op) throws IOException { BlockOutputStreamEntry entry = blockOutputStreamEntryPool.getCurrentStreamEntry(); if (entry != null) { + // If the current block is to handle retries, wait until all the retries are done. + entry.waitForRetryHandling(); + entry.onCallReceived(); try { handleStreamAction(entry, op); + entry.oncallFinished(); } catch (IOException ioe) { handleException(entry, ioe); + BlockOutputStreamEntry current = blockOutputStreamEntryPool.getCurrentStreamEntry(); + if (entry.oncallFinished()) { + current.finishRetryHandling(); + } continue; + } catch (Exception e) { + entry.oncallFinished(); + throw e; } } return; + } catch (InterruptedException e) { + throw new InterruptedIOException(); } catch (Exception e) { markStreamClosed(); throw e; From 0861983991937134a96d31bd25059bff48d22b70 Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Tue, 13 Aug 2024 10:54:09 -0700 Subject: [PATCH 03/13] New approach to handle retries. --- .../hdds/scm/storage/BlockOutputStream.java | 39 ++++++------------- 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 2d37d456c24..8726eabfc18 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -255,12 +255,6 @@ private boolean allDataNodesSupportPiggybacking() { return true; } - synchronized void refreshCurrentBuffer() { - currentBuffer = bufferPool.getCurrentBuffer(); - currentBufferRemaining = - currentBuffer != null ? currentBuffer.remaining() : 0; - } - public BlockID getBlockID() { return blockID.get(); } @@ -420,7 +414,7 @@ private void updatePutBlockLength() { * @throws IOException if error occurred */ - // In this case, the data is already cached in the currentBuffer. + // In this case, the data is already cached in the allocated buffers in the BufferPool. public synchronized void writeOnRetry(long len) throws IOException { if (len == 0) { return; @@ -438,18 +432,18 @@ public synchronized void writeOnRetry(long len) throws IOException { count++; writtenDataLength += writeLen; updateWriteChunkLength(); - LOG.info("Write chunk on retry buffer = {}", buffer); - writeChunk(buffer); - if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) { - handleFullBuffer(); + updatePutBlockLength(); + LOG.debug("Write chunk on retry buffer = {}", buffer); + CompletableFuture f = writeChunkAndPutBlock(buffer, false); + CompletableFuture watchForCommitAsync = watchForCommitAsync(f); + try { + watchForCommitAsync.get(); + } catch (InterruptedException e) { + handleInterruptedException(e, true); + } catch (ExecutionException e) { + handleExecutionException(e); } } - - // flush all pending data due in exception handling. - updatePutBlockLength(); - CompletableFuture putBlockResultFuture = executePutBlock(false, false); - recordWatchForCommitAsync(putBlockResultFuture); - waitForAllPendingFlushes(); } /** @@ -473,14 +467,6 @@ private void handleFullBuffer() throws IOException { void releaseBuffersOnException() { } - // It may happen that once the exception is encountered , we still might - // have successfully flushed up to a certain index. Make sure the buffers - // only contain data which have not been sufficiently replicated - private void adjustBuffersOnException() { - releaseBuffersOnException(); - refreshCurrentBuffer(); - } - /** * Watch for a specific commit index. */ @@ -820,7 +806,7 @@ void checkOpen() throws IOException { if (isClosed()) { throw new IOException("BlockOutputStream has been closed."); } else if (getIoException() != null) { - adjustBuffersOnException(); +// adjustBuffersOnException(); throw getIoException(); } } @@ -1157,7 +1143,6 @@ void handleInterruptedException(Exception ex, */ private void handleExecutionException(Exception ex) throws IOException { setIoException(ex); - adjustBuffersOnException(); throw getIoException(); } From eb358738a7d04d0c4fc7ee944a88ee74b96792c2 Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Fri, 16 Aug 2024 10:39:48 -0700 Subject: [PATCH 04/13] HDDS-11328. ContainerStateMachine should not crashed because of CHUNK_FILE_INCONSISTENCY --- .../transport/server/ratis/ContainerStateMachine.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 28b9e151ff3..f0c5d40dbff 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -581,7 +581,10 @@ private CompletableFuture writeStateMachineData( writeChunkFuture.thenApply(r -> { if (r.getResult() != ContainerProtos.Result.SUCCESS && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN - && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) { + && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO + // After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and + // that should now + && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) { StorageContainerException sce = new StorageContainerException(r.getMessage(), r.getResult()); LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" + @@ -1061,7 +1064,8 @@ public CompletableFuture applyTransaction(TransactionContext trx) { // unhealthy if (r.getResult() != ContainerProtos.Result.SUCCESS && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN - && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) { + && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO + && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) { StorageContainerException sce = new StorageContainerException(r.getMessage(), r.getResult()); LOG.error( From 1ce1bcccd48cb2e52f8aab3c4e28f001ec90512c Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Fri, 16 Aug 2024 13:03:25 -0700 Subject: [PATCH 05/13] Correct wait for trying implementation. --- .../hdds/scm/storage/BlockOutputStream.java | 3 +- .../client/io/BlockOutputStreamEntry.java | 40 ++++----- .../ozone/client/io/KeyOutputStream.java | 82 ++++++++++++------- .../org/apache/hadoop/fs/ozone/TestHSync.java | 2 +- 4 files changed, 71 insertions(+), 56 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 8726eabfc18..36fccbb0edc 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -421,7 +421,8 @@ public synchronized void writeOnRetry(long len) throws IOException { } List allocatedBuffers = bufferPool.getAllocatedBuffers(); if (LOG.isDebugEnabled()) { - LOG.debug("{}: Retrying write length {} for blockID {}, {} buffers", this, len, blockID, allocatedBuffers.size()); + LOG.debug("{}: Retrying write length {} on target blockID {}, {} buffers", this, len, blockID, + allocatedBuffers.size()); } Preconditions.checkArgument(len <= streamBufferArgs.getStreamBufferMaxSize()); int count = 0; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 3fa81652d55..00b5100179c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -24,8 +24,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import org.apache.hadoop.fs.Syncable; @@ -80,8 +78,7 @@ public class BlockOutputStreamEntry extends OutputStream { * Once this flag is on, this BlockOutputStream can only handle writeOneRetry. */ private volatile boolean isHandlingRetry; - private final Lock handlingRetryLock = new ReentrantLock(); - private final Condition handlingRetryCondition = handlingRetryLock.newCondition(); + /** * To record how many calls(write, flush) are being handled by this block. */ @@ -122,36 +119,31 @@ void checkStream() throws IOException { } } - void onCallReceived() { + /** Register when a call (write or flush) is received on this block. */ + void registerCallReceived() { inflightCalls.incrementAndGet(); } - boolean oncallFinished() { + /** + * Register when a call (write or flush) is finised on this block. + * @return true if all the calls are done. + */ + boolean registerCallFinished() { return inflightCalls.decrementAndGet() == 0; } - void waitForRetryHandling() throws InterruptedException { - handlingRetryLock.lock(); - try { - while (isHandlingRetry) { - LOG.info("{} : Block to wait for retry handling.", this); - handlingRetryCondition.await(); - LOG.info("{} : Done waiting for retry handling.", this); - } - } finally { - handlingRetryLock.unlock(); + void waitForRetryHandling(Condition retryHandlingCond) throws InterruptedException { + while (isHandlingRetry) { + LOG.info("{} : Block to wait for retry handling.", this); + retryHandlingCond.await(); + LOG.info("{} : Done waiting for retry handling.", this); } } - void finishRetryHandling() { + void finishRetryHandling(Condition retryHandlingCond) { LOG.info("{}: Exiting retry handling mode", this); - handlingRetryLock.lock(); - try { - isHandlingRetry = false; - handlingRetryCondition.signalAll(); - } finally { - handlingRetryLock.unlock(); - } + isHandlingRetry = false; + retryHandlingCond.signalAll(); } /** diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 50d25f5e627..c55edee7200 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -25,6 +25,9 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -58,6 +61,7 @@ import com.google.common.base.Preconditions; import org.apache.ratis.protocol.exceptions.AlreadyClosedException; import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; +import org.apache.ratis.util.function.CheckedRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +95,7 @@ enum StreamAction { // how much of data is actually written yet to underlying stream private long offset; // how much data has been ingested into the stream - private volatile long writeOffset; + private long writeOffset; // whether an exception is encountered while write and whole write could // not succeed private boolean isException; @@ -109,7 +113,8 @@ enum StreamAction { private boolean atomicKeyCreation; private ContainerClientMetrics clientMetrics; private OzoneManagerVersion ozoneManagerVersion; - private final Object exceptionHandlingLock = new Object(); + private final Lock writeLock = new ReentrantLock(); + private final Condition retryHandlingCondition = writeLock.newCondition(); private final int maxConcurrentWritePerKey; private final KeyOutputStreamSemaphore keyOutputStreamSemaphore; @@ -188,7 +193,7 @@ public KeyOutputStream(Builder b) { * @param version the set of blocks that are pre-allocated. * @param openVersion the version corresponding to the pre-allocation. */ - public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) { + public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) { blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion); } @@ -228,15 +233,24 @@ public void write(byte[] b, int off, int len) return; } - synchronized (this) { + doInWriteLock(() -> { handleWrite(b, off, len, false); writeOffset += len; - } + }); } finally { getRequestSemaphore().release(); } } + private void doInWriteLock(CheckedRunnable block) throws E { + writeLock.lock(); + try { + block.run(); + } finally { + writeLock.unlock(); + } + } + @VisibleForTesting void handleWrite(byte[] b, int off, long len, boolean retry) throws IOException { @@ -273,17 +287,17 @@ private int writeToOutputStream(BlockOutputStreamEntry current, boolean retry, long len, byte[] b, int writeLen, int off, long currentPos) throws IOException { try { - current.onCallReceived(); + current.registerCallReceived(); if (retry) { current.writeOnRetry(len); } else { - current.waitForRetryHandling(); + waitForRetryHandling(current); current.write(b, off, writeLen); offset += writeLen; } - current.oncallFinished(); + current.registerCallFinished(); } catch (InterruptedException e) { - current.oncallFinished(); + current.registerCallFinished(); throw new InterruptedIOException(); } catch (IOException ioe) { // for the current iteration, totalDataWritten - currentPos gives the @@ -302,18 +316,22 @@ private int writeToOutputStream(BlockOutputStreamEntry current, offset += writeLen; } LOG.debug("writeLen {}, total len {}", writeLen, len); - handleException(current, ioe); - if (current.oncallFinished()) { - blockOutputStreamEntryPool.getCurrentStreamEntry().finishRetryHandling(); - } + handleException(current, ioe, retry); } return writeLen; } - private void handleException(BlockOutputStreamEntry streamEntry, IOException exception) throws IOException { - synchronized (exceptionHandlingLock) { - handleExceptionInternal(streamEntry, exception); - } + private void handleException(BlockOutputStreamEntry entry, IOException exception, boolean fromRetry) + throws IOException { + doInWriteLock(() -> { + handleExceptionInternal(entry, exception); + BlockOutputStreamEntry current = blockOutputStreamEntryPool.getCurrentStreamEntry(); + if (!fromRetry && entry.registerCallFinished()) { + // When the faulty block finishes handling all its pending call, the current block can exit retry + // handling mode and unblock normal calls. + current.finishRetryHandling(retryHandlingCondition); + } + }); } /** @@ -511,12 +529,12 @@ public void hsync() throws IOException { final long hsyncPos = writeOffset; handleFlushOrClose(StreamAction.HSYNC); - synchronized (this) { + doInWriteLock(() -> { Preconditions.checkState(offset >= hsyncPos, "offset = %s < hsyncPos = %s", offset, hsyncPos); MetricUtil.captureLatencyNs(clientMetrics::addHsyncLatency, () -> blockOutputStreamEntryPool.hsyncKey(hsyncPos)); - } + }); } finally { getRequestSemaphore().release(); } @@ -546,20 +564,16 @@ private void handleFlushOrClose(StreamAction op) throws IOException { blockOutputStreamEntryPool.getCurrentStreamEntry(); if (entry != null) { // If the current block is to handle retries, wait until all the retries are done. - entry.waitForRetryHandling(); - entry.onCallReceived(); + waitForRetryHandling(entry); + entry.registerCallReceived(); try { handleStreamAction(entry, op); - entry.oncallFinished(); + entry.registerCallFinished(); } catch (IOException ioe) { - handleException(entry, ioe); - BlockOutputStreamEntry current = blockOutputStreamEntryPool.getCurrentStreamEntry(); - if (entry.oncallFinished()) { - current.finishRetryHandling(); - } + handleException(entry, ioe, false); continue; } catch (Exception e) { - entry.oncallFinished(); + entry.registerCallFinished(); throw e; } } @@ -574,6 +588,10 @@ private void handleFlushOrClose(StreamAction op) throws IOException { } } + private void waitForRetryHandling(BlockOutputStreamEntry currentEntry) throws InterruptedException { + doInWriteLock(() -> currentEntry.waitForRetryHandling(retryHandlingCondition)); + } + private void handleStreamAction(BlockOutputStreamEntry entry, StreamAction op) throws IOException { Collection failedServers = entry.getFailedServers(); @@ -609,7 +627,11 @@ private void handleStreamAction(BlockOutputStreamEntry entry, * @throws IOException */ @Override - public synchronized void close() throws IOException { + public void close() throws IOException { + doInWriteLock(this::closeInternal); + } + + private void closeInternal() throws IOException { if (closed) { return; } @@ -807,7 +829,7 @@ public KeyOutputStream build() { * the last state of the volatile {@link #closed} field. * @throws IOException if the connection is closed. */ - private synchronized void checkNotClosed() throws IOException { + private void checkNotClosed() throws IOException { if (closed) { throw new IOException( ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index 222d3864065..4b07cf3e885 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -709,7 +709,7 @@ static void runTestHSync(FileSystem fs, Path file, public static Stream concurrentExceptionHandling() { return Stream.of( - Arguments.of(2, 1), + Arguments.of(4, 1), Arguments.of(4, 4), Arguments.of(8, 4) ); From ae9118e57c8805a932b5a17ca6515000f576a201 Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Fri, 16 Aug 2024 14:30:40 -0700 Subject: [PATCH 06/13] Must wait for pending flushes finish before handling exception. --- .../hadoop/ozone/client/io/BlockOutputStreamEntry.java | 7 ++++++- .../apache/hadoop/ozone/client/io/KeyOutputStream.java | 8 ++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 00b5100179c..0b51414a628 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -57,7 +57,7 @@ public class BlockOutputStreamEntry extends OutputStream { public static final Logger LOG = LoggerFactory.getLogger(BlockOutputStreamEntry.class); private final OzoneClientConfig config; - private OutputStream outputStream; + private BlockOutputStream outputStream; private BlockID blockID; private final String key; private final XceiverClientFactory xceiverClientManager; @@ -146,6 +146,10 @@ void finishRetryHandling(Condition retryHandlingCond) { retryHandlingCond.signalAll(); } + void waitForAllPendingFlushes() throws IOException { + outputStream.waitForAllPendingFlushes(); + } + /** * Creates the outputStreams that are necessary to start the write. * Implementors can override this to instantiate multiple streams instead. @@ -188,6 +192,7 @@ void writeOnRetry(long len) throws IOException { BlockOutputStream out = (BlockOutputStream) getOutputStream(); out.writeOnRetry(len); incCurrentPosition(len); + LOG.info("{}: Finish retrying with len {}, currentPosition {}", this, len, currentPosition); } @Override diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index c55edee7200..5f174b13ee7 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -345,6 +345,14 @@ private void handleException(BlockOutputStreamEntry entry, IOException exception * @throws IOException Throws IOException if Write fails */ private void handleExceptionInternal(BlockOutputStreamEntry streamEntry, IOException exception) throws IOException { + try { + // Wait for all pending flushes in the faulty stream. It's possible that a prior write is pending completion + // successfully. Errors are ignored here and will be handled by the individual flush call. We just wall to ensure + // all the pending are complete before handling exception. + streamEntry.waitForAllPendingFlushes(); + } catch (IOException ignored) { + } + Throwable t = HddsClientUtils.checkForException(exception); Preconditions.checkNotNull(t); boolean retryFailure = checkForRetryFailure(t); From 46f4f4e285772cd49fe0adb5c833ae12f17ff4e7 Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Fri, 16 Aug 2024 16:52:36 -0700 Subject: [PATCH 07/13] Add synchronized to cleanup, to avoid interfere with write & hsync. --- .../org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java | 2 +- .../java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 36fccbb0edc..40a13c94a8a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -779,7 +779,7 @@ public void setIoException(Exception e) { void cleanup() { } - public void cleanup(boolean invalidateClient) { + public synchronized void cleanup(boolean invalidateClient) { if (xceiverClientFactory != null) { xceiverClientFactory.releaseClient(xceiverClient, invalidateClient); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 5f174b13ee7..243218f4141 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -95,7 +95,7 @@ enum StreamAction { // how much of data is actually written yet to underlying stream private long offset; // how much data has been ingested into the stream - private long writeOffset; + private volatile long writeOffset; // whether an exception is encountered while write and whole write could // not succeed private boolean isException; From 956045741e77a1c43a3c6b4fe04ccb87cae4b81e Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Tue, 20 Aug 2024 00:18:10 -0700 Subject: [PATCH 08/13] PR comments. --- .../hdds/scm/storage/BlockOutputStream.java | 16 ++++++++++++---- .../ozone/client/io/BlockOutputStreamEntry.java | 4 ++-- .../hadoop/ozone/client/io/KeyOutputStream.java | 2 +- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 40a13c94a8a..0cf309befc3 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -413,12 +413,14 @@ private void updatePutBlockLength() { * @param len length of data to write * @throws IOException if error occurred */ - - // In this case, the data is already cached in the allocated buffers in the BufferPool. public synchronized void writeOnRetry(long len) throws IOException { if (len == 0) { return; } + + // In this case, the data from the failing (previous) block already cached in the allocated buffers in + // the BufferPool. For each pending buffers in the BufferPool, we sequentially flush it and wait synchronously. + List allocatedBuffers = bufferPool.getAllocatedBuffers(); if (LOG.isDebugEnabled()) { LOG.debug("{}: Retrying write length {} on target blockID {}, {} buffers", this, len, blockID, @@ -435,8 +437,14 @@ public synchronized void writeOnRetry(long len) throws IOException { updateWriteChunkLength(); updatePutBlockLength(); LOG.debug("Write chunk on retry buffer = {}", buffer); - CompletableFuture f = writeChunkAndPutBlock(buffer, false); - CompletableFuture watchForCommitAsync = watchForCommitAsync(f); + CompletableFuture putBlockFuture; + if (allowPutBlockPiggybacking) { + putBlockFuture = writeChunkAndPutBlock(buffer, false); + } else { + writeChunk(buffer); + putBlockFuture = executePutBlock(false, false); + } + CompletableFuture watchForCommitAsync = watchForCommitAsync(putBlockFuture); try { watchForCommitAsync.get(); } catch (InterruptedException e) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 0b51414a628..2ae3e475531 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -75,7 +75,7 @@ public class BlockOutputStreamEntry extends OutputStream { /** * An indicator that this BlockOutputStream is created to handoff writes from another faulty BlockOutputStream. - * Once this flag is on, this BlockOutputStream can only handle writeOneRetry. + * Once this flag is on, this BlockOutputStream can only handle writeOnRetry. */ private volatile boolean isHandlingRetry; @@ -125,7 +125,7 @@ void registerCallReceived() { } /** - * Register when a call (write or flush) is finised on this block. + * Register when a call (write or flush) is finished on this block. * @return true if all the calls are done. */ boolean registerCallFinished() { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 243218f4141..50af6846c7f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -347,7 +347,7 @@ private void handleException(BlockOutputStreamEntry entry, IOException exception private void handleExceptionInternal(BlockOutputStreamEntry streamEntry, IOException exception) throws IOException { try { // Wait for all pending flushes in the faulty stream. It's possible that a prior write is pending completion - // successfully. Errors are ignored here and will be handled by the individual flush call. We just wall to ensure + // successfully. Errors are ignored here and will be handled by the individual flush call. We just want to ensure // all the pending are complete before handling exception. streamEntry.waitForAllPendingFlushes(); } catch (IOException ignored) { From a184773ee2315238a7a5fed53318a1e6bc9cd35d Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Tue, 20 Aug 2024 10:00:03 -0700 Subject: [PATCH 09/13] Adapt to new code change in master. --- .../org/apache/hadoop/hdds/scm/XceiverClientCreator.java | 8 +++++++- .../org/apache/hadoop/hdds/scm/XceiverClientManager.java | 6 ------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java index cd46bc49a1c..75ae01c1005 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java @@ -31,6 +31,12 @@ * Factory for XceiverClientSpi implementations. Client instances are not cached. */ public class XceiverClientCreator implements XceiverClientFactory { + private static ErrorInjector errorInjector; + + public static void enableErrorInjection(ErrorInjector injector) { + errorInjector = injector; + } + private final ConfigurationSource conf; private final boolean topologyAwareRead; private final ClientTrustManager trustManager; @@ -60,7 +66,7 @@ protected XceiverClientSpi newClient(Pipeline pipeline) throws IOException { XceiverClientSpi client; switch (pipeline.getType()) { case RATIS: - client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, trustManager); + client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, trustManager, errorInjector); break; case STAND_ALONE: client = new XceiverClientGrpc(pipeline, conf, trustManager); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index 58b766cd99d..07b70441721 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -62,12 +62,6 @@ public class XceiverClientManager extends XceiverClientCreator { private static final Logger LOG = LoggerFactory.getLogger(XceiverClientManager.class); - private static ErrorInjector errorInjector; - - public static void enableErrorInjection(ErrorInjector injector) { - errorInjector = injector; - } - private final Cache clientCache; private final CacheMetrics cacheMetrics; From 5fe1f26889f0c7eddb63c324b424eab753ac4d91 Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Tue, 20 Aug 2024 15:28:41 -0700 Subject: [PATCH 10/13] Make firebug happy. --- .../org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 0cf309befc3..e88b097c499 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -102,7 +102,7 @@ public class BlockOutputStream extends OutputStream { = new AtomicReference<>(); private final BlockData.Builder containerBlockData; - private XceiverClientFactory xceiverClientFactory; + private volatile XceiverClientFactory xceiverClientFactory; private XceiverClientSpi xceiverClient; private OzoneClientConfig config; private StreamBufferArgs streamBufferArgs; @@ -815,7 +815,6 @@ void checkOpen() throws IOException { if (isClosed()) { throw new IOException("BlockOutputStream has been closed."); } else if (getIoException() != null) { -// adjustBuffersOnException(); throw getIoException(); } } From f8e0bb8549e044db21219d50deee5bc2e8111ad5 Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Tue, 20 Aug 2024 16:02:13 -0700 Subject: [PATCH 11/13] Fix TestKeyOutputStream#testConcurrentWriteLimitOne. --- .../hadoop/ozone/client/io/KeyOutputStream.java | 12 ++++++++++++ .../hadoop/ozone/client/io/TestKeyOutputStream.java | 4 ++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 50af6846c7f..4f9e5db49a9 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -119,10 +119,22 @@ enum StreamAction { private final int maxConcurrentWritePerKey; private final KeyOutputStreamSemaphore keyOutputStreamSemaphore; + @VisibleForTesting KeyOutputStreamSemaphore getRequestSemaphore() { return keyOutputStreamSemaphore; } + /** Required to spy the object in testing. */ + @VisibleForTesting + @SuppressWarnings("unused") + KeyOutputStream() { + maxConcurrentWritePerKey = 0; + keyOutputStreamSemaphore = null; + blockOutputStreamEntryPool = null; + retryPolicyMap = null; + replication = null; + } + public KeyOutputStream(ReplicationConfig replicationConfig, BlockOutputStreamEntryPool blockOutputStreamEntryPool) { this.replication = replicationConfig; closed = false; diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java index 6b6abceff36..d90a335321b 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java @@ -34,7 +34,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -54,7 +54,7 @@ static void init() { void testConcurrentWriteLimitOne() throws Exception { // Verify the semaphore is working to limit the number of concurrent writes allowed. KeyOutputStreamSemaphore sema1 = new KeyOutputStreamSemaphore(1); - KeyOutputStream keyOutputStream = mock(KeyOutputStream.class); + KeyOutputStream keyOutputStream = spy(KeyOutputStream.class); when(keyOutputStream.getRequestSemaphore()).thenReturn(sema1); final AtomicInteger countWrite = new AtomicInteger(0); From 572eb7a5e449393fb2a97f66618b9553d731966a Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Tue, 20 Aug 2024 17:36:36 -0700 Subject: [PATCH 12/13] Finish the sentence. --- .../common/transport/server/ratis/ContainerStateMachine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index f0c5d40dbff..b3398de07ad 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -583,7 +583,7 @@ private CompletableFuture writeStateMachineData( && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO // After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and - // that should now + // that should not crash the pipeline. && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) { StorageContainerException sce = new StorageContainerException(r.getMessage(), r.getResult()); From 7a19ccfa721d8deb2639ec4d5867768e6c5b3c3e Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Wed, 21 Aug 2024 11:30:33 -0700 Subject: [PATCH 13/13] Fix failing testUncommittedBlocks due to file deletes from other tests. --- .../src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index 4b07cf3e885..49b515d53c5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -526,6 +526,7 @@ private List getOpenKeyInfo(BucketLayout bucketLayout) { @Test public void testUncommittedBlocks() throws Exception { + waitForEmptyDeletedTable(); // Set the fs.defaultFS final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));