diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 7cf4d28d428f5..18d0fc9a9fa2f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -33,18 +33,20 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; @@ -63,17 +65,20 @@ import org.elasticsearch.transport.RemoteTransportException; import java.io.Closeable; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.List; import java.util.Locale; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.StreamSupport; @@ -102,6 +107,7 @@ public class RecoverySourceHandler { private final int chunkSizeInBytes; private final RecoveryTargetHandler recoveryTarget; private final int maxConcurrentFileChunks; + private final List resources = new CopyOnWriteArrayList<>(); private final CancellableThreads cancellableThreads = new CancellableThreads(); public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget, final StartRecoveryRequest request, @@ -124,7 +130,6 @@ public StartRecoveryRequest getRequest() { * performs the recovery from the local engine to the target */ public void recoverToTarget(ActionListener listener) { - final List resources = new CopyOnWriteArrayList<>(); final Closeable releaseResources = () -> IOUtils.close(resources); final ActionListener wrappedListener = ActionListener.notifyOnce(listener); try { @@ -411,7 +416,11 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo( phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get())); - sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps); + final PlainActionFuture sendFileChunksFuture = new PlainActionFuture<>(); + final StoreFileMetaData[] filesSortedByLength = phase1Files.toArray(new StoreFileMetaData[0]); + ArrayUtil.timSort(filesSortedByLength, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first + sendFiles(store, filesSortedByLength, translogOps, sendFileChunksFuture); + cancellableThreads.execute(sendFileChunksFuture::actionGet); // Send the CLEAN_FILES request, which takes all of the files that // were transferred and renames them from their temporary file // names to the actual file names. It also writes checksums for @@ -680,72 +689,182 @@ public String toString() { '}'; } - void sendFiles(Store store, StoreFileMetaData[] files, Supplier translogOps) throws Exception { - ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first - final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); - final AtomicReference> error = new AtomicReference<>(); - final byte[] buffer = new byte[chunkSizeInBytes]; - for (final StoreFileMetaData md : files) { - if (error.get() != null) { - break; - } - try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); - InputStream in = new InputStreamIndexInput(indexInput, md.length())) { - long position = 0; - int bytesRead; - while ((bytesRead = in.read(buffer, 0, buffer.length)) != -1) { - final BytesArray content = new BytesArray(buffer, 0, bytesRead); - final boolean lastChunk = position + content.length() == md.length(); - final long requestSeqId = requestSeqIdTracker.generateSeqNo(); - cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqId - maxConcurrentFileChunks)); - cancellableThreads.checkForCancel(); - if (error.get() != null) { + void sendFiles(Store store, StoreFileMetaData[] files, Supplier translogOps, ActionListener listener) { + final MultiFileSender multiFileSender = new MultiFileSender(store, files, translogOps); + resources.add(multiFileSender); // need to register to the resource list so we can clean up if the recovery gets cancelled. + final ActionListener wrappedListener = ActionListener.wrap( + r -> { + multiFileSender.close(); + listener.onResponse(null); + }, + e -> { + IOUtils.closeWhileHandlingException(multiFileSender); + listener.onFailure(e); + }); + multiFileSender.sendFileChunks(ActionListener.notifyOnce(wrappedListener)); + } + + private final class MultiFileSender implements Closeable { + private final Store store; + private final List files; + private final Supplier translogOps; + private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); + private final Deque recycledBuffers; + private boolean closed; // ensure we don't reopen files if the recovery is cancelled + private StoreFileMetaData md; + private InputStream currentInput; + private int position; + private final Semaphore semaphore = new Semaphore(1); + + MultiFileSender(Store store, StoreFileMetaData[] files, Supplier translogOps) { + this.store = store; + this.files = new ArrayList<>(Arrays.asList(files)); + this.translogOps = translogOps; + this.recycledBuffers = ConcurrentCollections.newDeque(); + } + + /** + * File chunks are read sequentially by at most one thread. Other threads which are triggered by file-chunk responses + * will abort without waiting if another thread is reading files already. This is controlled via {@code semaphore}. + * + * This implementation can send up to {@code maxConcurrentFileChunks} consecutive file-chunk requests without waiting + * for the replies from the recovery target to reduce the recovery time in secure/compressed/high latency communication. + * We assign a seqId to every file-chunk request and stop reading/sending file chunks if the gap between max_seq_no + * and local_checkpoint is greater than {@code maxConcurrentFileChunks}. + */ + void sendFileChunks(ActionListener listener) { + assert ThreadPool.assertCurrentMethodIsNotCalledRecursively(); + while (true) { + cancellableThreads.checkForCancel(); + synchronized (this) { + if (semaphore.tryAcquire() == false) { break; } - final long requestFilePosition = position; - cancellableThreads.executeIO(() -> - recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.get(), + // don't send more, the number of unreplied chunks is already greater than maxConcurrentFileChunks, + if (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getCheckpoint() >= maxConcurrentFileChunks) { + semaphore.release(); + break; + } + } + try { + final byte[] reusedBuffer = recycledBuffers.pollFirst(); + final byte[] buffer = reusedBuffer != null ? reusedBuffer : new byte[chunkSizeInBytes]; + final FileChunk chunk = readChunk(buffer); + semaphore.release(); // other thread can read and send chunks + if (chunk == null) { + if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getCheckpoint()) { + listener.onResponse(null); + } + break; + } + cancellableThreads.execute(() -> + recoveryTarget.writeFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk, translogOps.get(), ActionListener.wrap( - r -> requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId), - e -> { - error.compareAndSet(null, Tuple.tuple(md, e)); - requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); - } + r -> { + recycledBuffers.addFirst(buffer); + requestSeqIdTracker.markSeqNoAsCompleted(chunk.seqId); + sendFileChunks(listener); + }, + e -> listener.onFailure(handleErrorOnSendFiles(chunk.md, e)) ))); - position += content.length(); + } catch (Exception e) { + listener.onFailure(e); + return; } + } + } + + FileChunk readChunk(final byte[] buffer) throws Exception { + try { + synchronized (this) { + if (closed) { + throw new IllegalStateException("chunk reader was closed"); + } + if (currentInput == null) { + if (files.isEmpty()) { + return null; + } + md = files.remove(0); + position = 0; + final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); + currentInput = new InputStreamIndexInput(indexInput, md.length()) { + @Override + public void close() throws IOException { + indexInput.close(); //InputStreamIndexInput's close is noop + } + }; + } + } + final int bytesRead = currentInput.read(buffer); + if (bytesRead == -1) { + throw new EOFException("position [" + position + "] md [" + md + "]"); + } + final boolean lastChunk; + final long chunkPosition; + synchronized (this) { + chunkPosition = this.position; + this.position += bytesRead; + lastChunk = this.position == md.length(); + if (lastChunk) { + IOUtils.close(currentInput, () -> currentInput = null); + } + } + final long seqId = requestSeqIdTracker.generateSeqNo(); + return new FileChunk(seqId, md, new BytesArray(buffer, 0, bytesRead), chunkPosition, lastChunk); } catch (Exception e) { - error.compareAndSet(null, Tuple.tuple(md, e)); - break; + throw handleErrorOnSendFiles(md, e); } } - // When we terminate exceptionally, we don't wait for the outstanding requests as we don't use their results anyway. - // This allows us to end quickly and eliminate the complexity of handling requestSeqIds in case of error. - if (error.get() == null) { - cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo())); + + @Override + public synchronized void close() throws IOException { + if (closed == false) { + closed = true; + IOUtils.close(currentInput); + } } - if (error.get() != null) { - handleErrorOnSendFiles(store, error.get().v1(), error.get().v2()); + + Exception handleErrorOnSendFiles(StoreFileMetaData md, Exception e) { + try { + final IOException corruptIndexException; + if (md != null && (corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { + if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! + logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); + try { + failEngine(corruptIndexException); + } catch (Exception inner) { + corruptIndexException.addSuppressed(inner); + } + return corruptIndexException; + } else { // corruption has happened on the way to replica + final RemoteTransportException remoteTransportException = + new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null); + remoteTransportException.addSuppressed(e); + logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK", + shardId, request.targetNode(), md), corruptIndexException); + return remoteTransportException; + } + } + } catch (Exception inner) { + e.addSuppressed(inner); + } + return e; } } - private void handleErrorOnSendFiles(Store store, StoreFileMetaData md, Exception e) throws Exception { - final IOException corruptIndexException; - if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { - if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! - logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); - failEngine(corruptIndexException); - throw corruptIndexException; - } else { // corruption has happened on the way to replica - RemoteTransportException exception = new RemoteTransportException( - "File corruption occurred on recovery but checksums are ok", null); - exception.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK", - shardId, request.targetNode(), md), corruptIndexException); - throw exception; - } - } else { - throw e; + private static final class FileChunk { + final long seqId; + final StoreFileMetaData md; + final BytesReference content; + final long position; + final boolean lastChunk; + + FileChunk(long seqId, StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) { + this.seqId = seqId; + this.md = md; + this.content = content; + this.position = position; + this.lastChunk = lastChunk; } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index fcedf0a000ae6..30deb77cb0241 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -187,7 +187,7 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. */ throttleTimeInNanos), fileChunkRequestOptions, new ActionListenerResponseHandler<>( - ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE)); + ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 87c7997333e4e..0b4a6c39ea983 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -786,4 +786,18 @@ public static boolean assertNotScheduleThread(String reason) { "Expected current thread [" + Thread.currentThread() + "] to not be the scheduler thread. Reason: [" + reason + "]"; return true; } + + public static boolean assertCurrentMethodIsNotCalledRecursively() { + final StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); + assert stackTraceElements.length > 2; + assert stackTraceElements[0].getMethodName().equals("getStackTrace"); + assert stackTraceElements[1].getMethodName().equals("assertCurrentMethodIsNotCalledRecursively"); + final StackTraceElement testingMethod = stackTraceElements[2]; + for (int i = 3; i < stackTraceElements.length; i++) { + assert stackTraceElements[i].getClassName().equals(testingMethod.getClassName()) == false + || stackTraceElements[i].getMethodName().equals(testingMethod.getMethodName()) == false : + testingMethod.getClassName() + "#" + testingMethod.getMethodName() + " is called recursively"; + } + return true; + } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index fb7b79f459720..3493de8442786 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -94,7 +94,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.IntSupplier; import java.util.function.Supplier; import java.util.zip.CRC32; @@ -104,7 +103,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.core.IsNull.notNullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyObject; @@ -155,38 +153,22 @@ public void testSendFiles() throws Throwable { metas.add(md); } Store targetStore = newStore(createTempDir()); + MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {}); RecoveryTargetHandler target = new TestRecoveryTargetHandler() { - IndexOutputOutputStream out; @Override public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, int totalTranslogOps, ActionListener listener) { - try { - if (position == 0) { - out = new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) { - @Override - public void close() throws IOException { - super.close(); - targetStore.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it - } - }; - } - final BytesRefIterator iterator = content.iterator(); - BytesRef scratch; - while ((scratch = iterator.next()) != null) { - out.write(scratch.bytes, scratch.offset, scratch.length); - } - if (lastChunk) { - out.close(); - } - listener.onResponse(null); - } catch (Exception e) { - listener.onFailure(e); - } + threadPool.generic().execute(() -> ActionListener.completeWith(listener, () -> { + multiFileWriter.writeFileChunk(md, position, content, lastChunk); + return null; + })); } }; RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5)); - handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0); + PlainActionFuture future = new PlainActionFuture<>(); + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0, future); + future.actionGet(); Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(null); Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata); assertEquals(metas.size(), recoveryDiff.identical.size()); @@ -240,7 +222,7 @@ public void indexTranslogOperations(List operations, int tot RetentionLeases retentionLeases, ActionListener listener) { shippedOps.addAll(operations); checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE)); - maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get())); + threadPool.generic().execute(() -> listener.onResponse(checkpointOnTarget.get())); } }; RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); @@ -274,9 +256,9 @@ public void testSendSnapshotStopOnError() throws Exception { public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu, RetentionLeases retentionLeases, ActionListener listener) { if (randomBoolean()) { - maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED)); + threadPool.generic().execute(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED)); } else { - maybeExecuteAsync(() -> listener.onFailure(new RuntimeException("test - failed to index"))); + threadPool.generic().execute(() -> listener.onFailure(new RuntimeException("test - failed to index"))); wasFailed.set(true); } } @@ -363,9 +345,10 @@ public void close() throws IOException { if (lastChunk) { out.close(); } - listener.onResponse(null); + threadPool.generic().execute(() -> listener.onResponse(null)); } catch (Exception e) { - IOUtils.closeWhileHandlingException(out, () -> listener.onFailure(e)); + IOUtils.closeWhileHandlingException(out); + threadPool.generic().execute(() -> listener.onFailure(e)); } } }; @@ -379,9 +362,11 @@ protected void failEngine(IOException cause) { }; try { - handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0); + PlainActionFuture future = new PlainActionFuture<>(); + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0, future); + future.actionGet(); fail("corrupted index"); - } catch (IOException ex) { + } catch (Exception ex) { assertNotNull(ExceptionsHelper.unwrapCorruption(ex)); } assertTrue(failedEngine.get()); @@ -433,7 +418,9 @@ protected void failEngine(IOException cause) { } }; try { - handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0); + PlainActionFuture future = new PlainActionFuture<>(); + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0, future); + future.actionGet(); fail("exception index"); } catch (RuntimeException ex) { assertNull(ExceptionsHelper.unwrapCorruption(ex)); @@ -442,8 +429,6 @@ protected void failEngine(IOException cause) { } else { assertEquals(ex.getMessage(), "boom"); } - } catch (CorruptIndexException ex) { - fail("not expected here"); } assertFalse(failedEngine.get()); IOUtils.close(store); @@ -527,19 +512,36 @@ public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception { assertBusy(() -> assertTrue(freed.get())); } + int getFileChunkCheckpoint(List files, List ackedChunks) { + int fileIndex = 0; + int filePosition = 0; + int checkpoint = -1; + ackedChunks.sort(Comparator.comparing(FileChunkResponse::name).thenComparing(FileChunkResponse::position)); + for (FileChunkResponse chunk : ackedChunks) { + if (files.get(fileIndex).equals(chunk.md) && filePosition == chunk.position) { + checkpoint++; + filePosition += chunk.length; + if (filePosition == chunk.md.length()) { + fileIndex++; + filePosition = 0; + } + } else { + break; + } + } + return checkpoint; + } + public void testSendFileChunksConcurrently() throws Exception { final IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); final List unrepliedChunks = new CopyOnWriteArrayList<>(); final AtomicInteger sentChunks = new AtomicInteger(); final TestRecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { - final AtomicLong chunkNumberGenerator = new AtomicLong(); @Override public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, int totalTranslogOps, ActionListener listener) { - final long chunkNumber = chunkNumberGenerator.getAndIncrement(); - logger.info("--> write chunk name={} seq={}, position={}", md.name(), chunkNumber, position); - unrepliedChunks.add(new FileChunkResponse(chunkNumber, listener)); + unrepliedChunks.add(new FileChunkResponse(md, position, content.length(), listener)); sentChunks.incrementAndGet(); } }; @@ -550,14 +552,8 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c Store store = newStore(createTempDir(), false); List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); - Thread sender = new Thread(() -> { - try { - handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0); - } catch (Exception ex) { - throw new AssertionError(ex); - } - }); - sender.start(); + PlainActionFuture future = new PlainActionFuture<>(); + handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0, future); assertBusy(() -> { assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks))); assertThat(unrepliedChunks, hasSize(sentChunks.get())); @@ -568,15 +564,7 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c List chunksToAck = randomSubsetOf(between(1, unrepliedChunks.size()), unrepliedChunks); unrepliedChunks.removeAll(chunksToAck); ackedChunks.addAll(chunksToAck); - ackedChunks.sort(Comparator.comparing(c -> c.chunkNumber)); - int checkpoint = -1; - for (int i = 0; i < ackedChunks.size(); i++) { - if (i != ackedChunks.get(i).chunkNumber) { - break; - } else { - checkpoint = i; - } - } + int checkpoint = getFileChunkCheckpoint(files, ackedChunks); int chunksToSend = Math.min( totalChunks - sentChunks.get(), // limited by the remaining chunks maxConcurrentChunks - (sentChunks.get() - 1 - checkpoint)); // limited by the buffering chunks @@ -589,7 +577,7 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c assertThat(unrepliedChunks, hasSize(expectedUnrepliedChunks)); }); } - sender.join(); + future.actionGet(); store.close(); } @@ -599,13 +587,10 @@ public void testSendFileChunksStopOnError() throws Exception { final List unrepliedChunks = new CopyOnWriteArrayList<>(); final AtomicInteger sentChunks = new AtomicInteger(); final TestRecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { - final AtomicLong chunkNumberGenerator = new AtomicLong(); @Override public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, int totalTranslogOps, ActionListener listener) { - final long chunkNumber = chunkNumberGenerator.getAndIncrement(); - logger.info("--> write chunk name={} seq={}, position={}", md.name(), chunkNumber, position); - unrepliedChunks.add(new FileChunkResponse(chunkNumber, listener)); + unrepliedChunks.add(new FileChunkResponse(md, position, content.length(), listener)); sentChunks.incrementAndGet(); } }; @@ -616,32 +601,21 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c Store store = newStore(createTempDir(), false); List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); - AtomicReference error = new AtomicReference<>(); - Thread sender = new Thread(() -> { - try { - handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0); - } catch (Exception ex) { - error.set(ex); - } - }); - sender.start(); + PlainActionFuture future = new PlainActionFuture<>(); + handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0, future); assertBusy(() -> assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks)))); List failedChunks = randomSubsetOf(between(1, unrepliedChunks.size()), unrepliedChunks); - failedChunks.forEach(c -> c.listener.onFailure(new RuntimeException("test chunk exception"))); + failedChunks.forEach(c -> threadPool.generic().execute(() -> c.listener.onFailure(new RuntimeException("test chunk exception")))); unrepliedChunks.removeAll(failedChunks); - unrepliedChunks.forEach(c -> { + unrepliedChunks.forEach(c -> threadPool.generic().execute(() -> { if (randomBoolean()) { - c.listener.onFailure(new RuntimeException("test")); + c.listener.onFailure(new RuntimeException("test chunk exception")); } else { c.listener.onResponse(null); } - }); - assertBusy(() -> { - assertThat(error.get(), notNullValue()); - assertThat(error.get().getMessage(), containsString("test chunk exception")); - }); + })); + assertThat( expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(),containsString("test chunk exception")); assertThat("no more chunks should be sent", sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks))); - sender.join(); store.close(); } @@ -658,13 +632,25 @@ private Store newStore(Path path, boolean checkIndex) throws IOException { } static final class FileChunkResponse { - final long chunkNumber; + final StoreFileMetaData md; + final long position; + final long length; final ActionListener listener; - FileChunkResponse(long chunkNumber, ActionListener listener) { - this.chunkNumber = chunkNumber; + FileChunkResponse(StoreFileMetaData md, long position, long length, ActionListener listener) { + this.md = md; + this.position = position; + this.length = length; this.listener = listener; } + + public String name() { + return md.name(); + } + + public long position() { + return position; + } } private List generateFiles(Store store, int numFiles, IntSupplier fileSizeSupplier) throws IOException { @@ -761,12 +747,4 @@ public void close() { } }; } - - private void maybeExecuteAsync(Runnable runnable) { - if (randomBoolean()) { - threadPool.generic().execute(runnable); - } else { - runnable.run(); - } - } } diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index 92a61cda9e98d..ff9d123bf522a 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.test.ESTestCase; import static org.elasticsearch.threadpool.ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING; +import static org.elasticsearch.threadpool.ThreadPool.assertCurrentMethodIsNotCalledRecursively; import static org.hamcrest.CoreMatchers.equalTo; public class ThreadPoolTests extends ESTestCase { @@ -67,4 +68,17 @@ public void testEstimatedTimeIntervalSettingAcceptsOnlyZeroAndPositiveTime() { Exception e = expectThrows(IllegalArgumentException.class, () -> ESTIMATED_TIME_INTERVAL_SETTING.get(settings)); assertEquals("failed to parse value [-1] for setting [thread_pool.estimated_time_interval], must be >= [0ms]", e.getMessage()); } + + int recursiveFactorial(int n) { + assertCurrentMethodIsNotCalledRecursively(); + if (n <= 1) { + return 1; + } + return n * recursiveFactorial(n - 1); + } + + public void testAssertCurrentMethodIsNotCalledRecursively() { + recursiveFactorial(between(0, 1)); + expectThrows(AssertionError.class, () -> recursiveFactorial(between(2, 10))); + } }