From 0492097d094fcdea160fca6d88caf32d6338eeb1 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 20 Jan 2019 11:22:27 -0500 Subject: [PATCH 1/5] Send file chunks asynchronously in peer recovery --- .../indices/recovery/FileChunk.java | 37 +++ .../indices/recovery/MultiFileWriter.java | 13 -- .../recovery/RecoverySourceHandler.java | 210 +++++++++++++----- .../recovery/RemoteRecoveryTargetHandler.java | 2 +- .../recovery/RecoverySourceHandlerTests.java | 102 +++++---- 5 files changed, 245 insertions(+), 119 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/indices/recovery/FileChunk.java diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/FileChunk.java b/server/src/main/java/org/elasticsearch/indices/recovery/FileChunk.java new file mode 100644 index 0000000000000..09f1032519f26 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/recovery/FileChunk.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.index.store.StoreFileMetaData; + +final class FileChunk { + final StoreFileMetaData md; + final BytesReference content; + final long position; + final boolean lastChunk; + + FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) { + this.md = md; + this.content = content; + this.position = position; + this.lastChunk = lastChunk; + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java index 87a6d18671a6f..64c01f5a95320 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java @@ -166,19 +166,6 @@ public void renameAllTempFiles() throws IOException { store.renameTempFilesSafe(tempFileNames); } - static final class FileChunk { - final StoreFileMetaData md; - final BytesReference content; - final long position; - final boolean lastChunk; - FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) { - this.md = md; - this.content = content; - this.position = position; - this.lastChunk = lastChunk; - } - } - private final class FileChunkWriter { // chunks can be delivered out of order, we need to buffer chunks if there's a gap between them. final PriorityQueue pendingChunks = new PriorityQueue<>(Comparator.comparing(fc -> fc.position)); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 7cf4d28d428f5..efe54cd080323 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -29,22 +29,24 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; +import org.elasticsearch.Assertions; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; @@ -62,18 +64,21 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteTransportException; +import java.io.BufferedInputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.List; import java.util.Locale; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.StreamSupport; @@ -102,6 +107,7 @@ public class RecoverySourceHandler { private final int chunkSizeInBytes; private final RecoveryTargetHandler recoveryTarget; private final int maxConcurrentFileChunks; + private final List resources = new CopyOnWriteArrayList<>(); private final CancellableThreads cancellableThreads = new CancellableThreads(); public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget, final StartRecoveryRequest request, @@ -124,7 +130,6 @@ public StartRecoveryRequest getRequest() { * performs the recovery from the local engine to the target */ public void recoverToTarget(ActionListener listener) { - final List resources = new CopyOnWriteArrayList<>(); final Closeable releaseResources = () -> IOUtils.close(resources); final ActionListener wrappedListener = ActionListener.notifyOnce(listener); try { @@ -411,7 +416,11 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo( phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get())); - sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps); + final PlainActionFuture sendFileChunksFuture = new PlainActionFuture<>(); + final StoreFileMetaData[] filesSortedByLength = phase1Files.toArray(new StoreFileMetaData[0]); + ArrayUtil.timSort(filesSortedByLength, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first + sendFiles(store, filesSortedByLength, translogOps, sendFileChunksFuture); + cancellableThreads.execute(sendFileChunksFuture::actionGet); // Send the CLEAN_FILES request, which takes all of the files that // were transferred and renames them from their temporary file // names to the actual file names. It also writes checksums for @@ -680,72 +689,157 @@ public String toString() { '}'; } - void sendFiles(Store store, StoreFileMetaData[] files, Supplier translogOps) throws Exception { - ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first - final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); - final AtomicReference> error = new AtomicReference<>(); - final byte[] buffer = new byte[chunkSizeInBytes]; - for (final StoreFileMetaData md : files) { - if (error.get() != null) { - break; + void sendFiles(Store store, StoreFileMetaData[] files, Supplier translogOps, ActionListener listener) { + final MultiFileSender multiFileSender = new MultiFileSender(store, files, translogOps); + resources.add(multiFileSender); // need to register to the resource list so we can clean up if the recovery gets cancelled. + final ActionListener wrappedListener = ActionListener.wrap( + r -> { + if (Assertions.ENABLED) { + long expectedChunks = Arrays.stream(files).mapToLong(f -> (f.length() + chunkSizeInBytes - 1) / chunkSizeInBytes).sum(); + long sentChunks = multiFileSender.requestSeqIdTracker.getCheckpoint() + 1; + assert sentChunks == expectedChunks : "sent chunks=" + sentChunks + " != expected chunks=" + expectedChunks; + } + multiFileSender.close(); + listener.onResponse(null); + }, + e -> { + IOUtils.closeWhileHandlingException(multiFileSender); + listener.onFailure(e); + }); + multiFileSender.sendFileChunks(ActionListener.notifyOnce(wrappedListener)); + } + + private final class MultiFileSender implements Closeable { + private final Store store; + private final List files; + private final Supplier translogOps; + private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); + private final Deque recycledBuffers; + private boolean readerClosed; // ensure we don't reopen reader after the recovery was cancelled + private StoreFileMetaData md; + private InputStream currentInput; + private int position; + private final Object turnMutex = new Object(); + private final AtomicBoolean turnAcquired = new AtomicBoolean(); + + MultiFileSender(Store store, StoreFileMetaData[] files, Supplier translogOps) { + this.store = store; + this.files = new ArrayList<>(Arrays.asList(files)); + this.translogOps = translogOps; + this.recycledBuffers = ConcurrentCollections.newDeque(); + for (int i = 0; i < maxConcurrentFileChunks; i++) { + this.recycledBuffers.add(new byte[chunkSizeInBytes]); } - try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); - InputStream in = new InputStreamIndexInput(indexInput, md.length())) { - long position = 0; - int bytesRead; - while ((bytesRead = in.read(buffer, 0, buffer.length)) != -1) { - final BytesArray content = new BytesArray(buffer, 0, bytesRead); - final boolean lastChunk = position + content.length() == md.length(); - final long requestSeqId = requestSeqIdTracker.generateSeqNo(); - cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqId - maxConcurrentFileChunks)); - cancellableThreads.checkForCancel(); - if (error.get() != null) { - break; + } + + void sendFileChunks(ActionListener listener) { + while (true) { + synchronized (turnMutex) { + if (turnAcquired.compareAndSet(false, true) == false) { + return; } - final long requestFilePosition = position; - cancellableThreads.executeIO(() -> - recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.get(), + if (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getCheckpoint() >= maxConcurrentFileChunks) { + turnAcquired.set(false); + return; + } + } + try { + final byte[] buffer = recycledBuffers.removeFirst(); + final FileChunk chunk = readChunk(buffer); + if (chunk == null) { + if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getCheckpoint()) { + listener.onResponse(null); + } + recycledBuffers.addFirst(buffer); + turnAcquired.set(false); + return; + } + final long requestSeqId = requestSeqIdTracker.generateSeqNo(); + turnAcquired.set(false); + cancellableThreads.execute(() -> + recoveryTarget.writeFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk, translogOps.get(), ActionListener.wrap( - r -> requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId), - e -> { - error.compareAndSet(null, Tuple.tuple(md, e)); + r -> { + recycledBuffers.addFirst(buffer); requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); - } + sendFileChunks(listener); + }, + e -> listener.onFailure(handleErrorOnSendFiles(chunk.md, e)) ))); - position += content.length(); + } catch (Exception e) { + listener.onFailure(e); + return; } - } catch (Exception e) { - error.compareAndSet(null, Tuple.tuple(md, e)); - break; } } - // When we terminate exceptionally, we don't wait for the outstanding requests as we don't use their results anyway. - // This allows us to end quickly and eliminate the complexity of handling requestSeqIds in case of error. - if (error.get() == null) { - cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo())); + + synchronized FileChunk readChunk(final byte[] buffer) throws Exception { + cancellableThreads.checkForCancel(); + if (readerClosed) { + throw new IllegalStateException("chunk reader was closed"); + } + try { + if (currentInput == null) { + if (files.isEmpty()) { + return null; + } + md = files.remove(0); + position = 0; + final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); + currentInput = new BufferedInputStream(new InputStreamIndexInput(indexInput, md.length()), chunkSizeInBytes) { + @Override + public void close() throws IOException { + indexInput.close(); //InputStreamIndexInput's close is noop + } + }; + } + final int bytesRead = currentInput.read(buffer, 0, buffer.length); + final boolean lastChunk = position + bytesRead == md.length(); + if (bytesRead == -1) { + IOUtils.close(currentInput, () -> currentInput = null); + return readChunk(buffer); + } + final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), position, lastChunk); + position += bytesRead; + return chunk; + } catch (Exception e) { + throw handleErrorOnSendFiles(md, e); + } } - if (error.get() != null) { - handleErrorOnSendFiles(store, error.get().v1(), error.get().v2()); + + @Override + public synchronized void close() throws IOException { + if (readerClosed == false) { + readerClosed = true; + IOUtils.close(currentInput); + } } - } - private void handleErrorOnSendFiles(Store store, StoreFileMetaData md, Exception e) throws Exception { - final IOException corruptIndexException; - if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { - if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! - logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); - failEngine(corruptIndexException); - throw corruptIndexException; - } else { // corruption has happened on the way to replica - RemoteTransportException exception = new RemoteTransportException( - "File corruption occurred on recovery but checksums are ok", null); - exception.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK", - shardId, request.targetNode(), md), corruptIndexException); - throw exception; + Exception handleErrorOnSendFiles(StoreFileMetaData md, Exception e) { + try { + final IOException corruptIndexException; + if (md != null && (corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { + if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! + logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); + try { + failEngine(corruptIndexException); + } catch (Exception inner) { + corruptIndexException.addSuppressed(inner); + } + return corruptIndexException; + } else { // corruption has happened on the way to replica + final RemoteTransportException remoteTransportException = + new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null); + remoteTransportException.addSuppressed(e); + logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK", + shardId, request.targetNode(), md), corruptIndexException); + return remoteTransportException; + } + } + } catch (Exception inner) { + e.addSuppressed(inner); } - } else { - throw e; + return e; } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index fcedf0a000ae6..30deb77cb0241 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -187,7 +187,7 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. */ throttleTimeInNanos), fileChunkRequestOptions, new ActionListenerResponseHandler<>( - ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE)); + ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index fb7b79f459720..713d57edd9aff 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -178,7 +178,7 @@ public void close() throws IOException { if (lastChunk) { out.close(); } - listener.onResponse(null); + maybeExecuteAsync(() -> listener.onResponse(null)); } catch (Exception e) { listener.onFailure(e); } @@ -186,7 +186,9 @@ public void close() throws IOException { }; RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5)); - handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0); + PlainActionFuture future = new PlainActionFuture<>(); + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0, future); + future.actionGet(); Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(null); Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata); assertEquals(metas.size(), recoveryDiff.identical.size()); @@ -379,9 +381,11 @@ protected void failEngine(IOException cause) { }; try { - handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0); + PlainActionFuture future = new PlainActionFuture<>(); + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0, future); + future.actionGet(); fail("corrupted index"); - } catch (IOException ex) { + } catch (Exception ex) { assertNotNull(ExceptionsHelper.unwrapCorruption(ex)); } assertTrue(failedEngine.get()); @@ -433,7 +437,9 @@ protected void failEngine(IOException cause) { } }; try { - handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0); + PlainActionFuture future = new PlainActionFuture<>(); + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0, future); + future.actionGet(); fail("exception index"); } catch (RuntimeException ex) { assertNull(ExceptionsHelper.unwrapCorruption(ex)); @@ -442,8 +448,6 @@ protected void failEngine(IOException cause) { } else { assertEquals(ex.getMessage(), "boom"); } - } catch (CorruptIndexException ex) { - fail("not expected here"); } assertFalse(failedEngine.get()); IOUtils.close(store); @@ -527,19 +531,36 @@ public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception { assertBusy(() -> assertTrue(freed.get())); } + int getFileChunkCheckpoint(List files, List ackedChunks) { + int fileIndex = 0; + int filePosition = 0; + int checkpoint = -1; + ackedChunks.sort(Comparator.comparing(FileChunkResponse::name).thenComparing(FileChunkResponse::position)); + for (FileChunkResponse chunk : ackedChunks) { + if (files.get(fileIndex).equals(chunk.md) && filePosition == chunk.position) { + checkpoint++; + filePosition += chunk.length; + if (filePosition == chunk.md.length()) { + fileIndex++; + filePosition = 0; + } + } else { + break; + } + } + return checkpoint; + } + public void testSendFileChunksConcurrently() throws Exception { final IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); final List unrepliedChunks = new CopyOnWriteArrayList<>(); final AtomicInteger sentChunks = new AtomicInteger(); final TestRecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { - final AtomicLong chunkNumberGenerator = new AtomicLong(); @Override public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, int totalTranslogOps, ActionListener listener) { - final long chunkNumber = chunkNumberGenerator.getAndIncrement(); - logger.info("--> write chunk name={} seq={}, position={}", md.name(), chunkNumber, position); - unrepliedChunks.add(new FileChunkResponse(chunkNumber, listener)); + unrepliedChunks.add(new FileChunkResponse(md, position, content.length(), listener)); sentChunks.incrementAndGet(); } }; @@ -550,14 +571,8 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c Store store = newStore(createTempDir(), false); List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); - Thread sender = new Thread(() -> { - try { - handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0); - } catch (Exception ex) { - throw new AssertionError(ex); - } - }); - sender.start(); + PlainActionFuture future = new PlainActionFuture<>(); + handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0, future); assertBusy(() -> { assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks))); assertThat(unrepliedChunks, hasSize(sentChunks.get())); @@ -568,28 +583,20 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c List chunksToAck = randomSubsetOf(between(1, unrepliedChunks.size()), unrepliedChunks); unrepliedChunks.removeAll(chunksToAck); ackedChunks.addAll(chunksToAck); - ackedChunks.sort(Comparator.comparing(c -> c.chunkNumber)); - int checkpoint = -1; - for (int i = 0; i < ackedChunks.size(); i++) { - if (i != ackedChunks.get(i).chunkNumber) { - break; - } else { - checkpoint = i; - } - } + int checkpoint = getFileChunkCheckpoint(files, ackedChunks); int chunksToSend = Math.min( totalChunks - sentChunks.get(), // limited by the remaining chunks maxConcurrentChunks - (sentChunks.get() - 1 - checkpoint)); // limited by the buffering chunks int expectedSentChunks = sentChunks.get() + chunksToSend; int expectedUnrepliedChunks = unrepliedChunks.size() + chunksToSend; - chunksToAck.forEach(c -> c.listener.onResponse(null)); + chunksToAck.forEach(c -> maybeExecuteAsync(() -> c.listener.onResponse(null))); assertBusy(() -> { assertThat(sentChunks.get(), equalTo(expectedSentChunks)); assertThat(unrepliedChunks, hasSize(expectedUnrepliedChunks)); }); } - sender.join(); + future.actionGet(); store.close(); } @@ -599,13 +606,10 @@ public void testSendFileChunksStopOnError() throws Exception { final List unrepliedChunks = new CopyOnWriteArrayList<>(); final AtomicInteger sentChunks = new AtomicInteger(); final TestRecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { - final AtomicLong chunkNumberGenerator = new AtomicLong(); @Override public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, int totalTranslogOps, ActionListener listener) { - final long chunkNumber = chunkNumberGenerator.getAndIncrement(); - logger.info("--> write chunk name={} seq={}, position={}", md.name(), chunkNumber, position); - unrepliedChunks.add(new FileChunkResponse(chunkNumber, listener)); + unrepliedChunks.add(new FileChunkResponse(md, position, content.length(), listener)); sentChunks.incrementAndGet(); } }; @@ -617,21 +621,14 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); AtomicReference error = new AtomicReference<>(); - Thread sender = new Thread(() -> { - try { - handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0); - } catch (Exception ex) { - error.set(ex); - } - }); - sender.start(); + handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0, ActionListener.wrap(r -> {}, error::set)); assertBusy(() -> assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks)))); List failedChunks = randomSubsetOf(between(1, unrepliedChunks.size()), unrepliedChunks); failedChunks.forEach(c -> c.listener.onFailure(new RuntimeException("test chunk exception"))); unrepliedChunks.removeAll(failedChunks); unrepliedChunks.forEach(c -> { if (randomBoolean()) { - c.listener.onFailure(new RuntimeException("test")); + c.listener.onFailure(new RuntimeException("test chunk exception")); } else { c.listener.onResponse(null); } @@ -641,7 +638,6 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c assertThat(error.get().getMessage(), containsString("test chunk exception")); }); assertThat("no more chunks should be sent", sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks))); - sender.join(); store.close(); } @@ -658,13 +654,25 @@ private Store newStore(Path path, boolean checkIndex) throws IOException { } static final class FileChunkResponse { - final long chunkNumber; + final StoreFileMetaData md; + final long position; + final long length; final ActionListener listener; - FileChunkResponse(long chunkNumber, ActionListener listener) { - this.chunkNumber = chunkNumber; + FileChunkResponse(StoreFileMetaData md, long position, long length, ActionListener listener) { + this.md = md; + this.position = position; + this.length = length; this.listener = listener; } + + public String name() { + return md.name(); + } + + public long position() { + return position; + } } private List generateFiles(Store store, int numFiles, IntSupplier fileSizeSupplier) throws IOException { From e9e3584b249df521cfd969540c05b44cd3e0a2ad Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 6 Mar 2019 23:36:09 -0500 Subject: [PATCH 2/5] =?UTF-8?q?don=E2=80=99t=20check=20for=20cancellation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../elasticsearch/indices/recovery/RecoverySourceHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index efe54cd080323..08292f73c1d37 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -774,7 +774,6 @@ void sendFileChunks(ActionListener listener) { } synchronized FileChunk readChunk(final byte[] buffer) throws Exception { - cancellableThreads.checkForCancel(); if (readerClosed) { throw new IllegalStateException("chunk reader was closed"); } From 1dc48232b8a42f4cb02df651a8bced7d17d08a4d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 9 Mar 2019 13:32:02 -0500 Subject: [PATCH 3/5] feedback --- .../indices/recovery/FileChunk.java | 37 ------ .../indices/recovery/MultiFileWriter.java | 13 ++ .../recovery/RecoverySourceHandler.java | 119 +++++++++++------- 3 files changed, 84 insertions(+), 85 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/indices/recovery/FileChunk.java diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/FileChunk.java b/server/src/main/java/org/elasticsearch/indices/recovery/FileChunk.java deleted file mode 100644 index 09f1032519f26..0000000000000 --- a/server/src/main/java/org/elasticsearch/indices/recovery/FileChunk.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.recovery; - -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.index.store.StoreFileMetaData; - -final class FileChunk { - final StoreFileMetaData md; - final BytesReference content; - final long position; - final boolean lastChunk; - - FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) { - this.md = md; - this.content = content; - this.position = position; - this.lastChunk = lastChunk; - } -} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java index 64c01f5a95320..87a6d18671a6f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java @@ -166,6 +166,19 @@ public void renameAllTempFiles() throws IOException { store.renameTempFilesSafe(tempFileNames); } + static final class FileChunk { + final StoreFileMetaData md; + final BytesReference content; + final long position; + final boolean lastChunk; + FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) { + this.md = md; + this.content = content; + this.position = position; + this.lastChunk = lastChunk; + } + } + private final class FileChunkWriter { // chunks can be delivered out of order, we need to buffer chunks if there's a gap between them. final PriorityQueue pendingChunks = new PriorityQueue<>(Comparator.comparing(fc -> fc.position)); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 08292f73c1d37..ad33825c694ff 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -29,7 +29,6 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; -import org.elasticsearch.Assertions; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -40,6 +39,7 @@ import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; @@ -64,8 +64,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteTransportException; -import java.io.BufferedInputStream; import java.io.Closeable; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -77,7 +77,7 @@ import java.util.Locale; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; @@ -694,11 +694,6 @@ void sendFiles(Store store, StoreFileMetaData[] files, Supplier translo resources.add(multiFileSender); // need to register to the resource list so we can clean up if the recovery gets cancelled. final ActionListener wrappedListener = ActionListener.wrap( r -> { - if (Assertions.ENABLED) { - long expectedChunks = Arrays.stream(files).mapToLong(f -> (f.length() + chunkSizeInBytes - 1) / chunkSizeInBytes).sum(); - long sentChunks = multiFileSender.requestSeqIdTracker.getCheckpoint() + 1; - assert sentChunks == expectedChunks : "sent chunks=" + sentChunks + " != expected chunks=" + expectedChunks; - } multiFileSender.close(); listener.onResponse(null); }, @@ -715,53 +710,58 @@ private final class MultiFileSender implements Closeable { private final Supplier translogOps; private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); private final Deque recycledBuffers; - private boolean readerClosed; // ensure we don't reopen reader after the recovery was cancelled + private boolean closed; // ensure we don't reopen files if the recovery is cancelled private StoreFileMetaData md; private InputStream currentInput; private int position; - private final Object turnMutex = new Object(); - private final AtomicBoolean turnAcquired = new AtomicBoolean(); + private final Semaphore semaphore = new Semaphore(1); MultiFileSender(Store store, StoreFileMetaData[] files, Supplier translogOps) { this.store = store; this.files = new ArrayList<>(Arrays.asList(files)); this.translogOps = translogOps; this.recycledBuffers = ConcurrentCollections.newDeque(); - for (int i = 0; i < maxConcurrentFileChunks; i++) { - this.recycledBuffers.add(new byte[chunkSizeInBytes]); - } } + /** + * File chunks are read sequentially by at most one thread. Other threads which are triggered by file-chunk responses + * will abort without waiting if another thread is reading files already. This is controlled via {@code semaphore}. + * + * This implementation can send up to {@code maxConcurrentFileChunks} consecutive file-chunk requests without waiting + * for the replies from the recovery target to reduce the recovery time in secure/compressed/high latency communication. + * We assign a seqId to every file-chunk request and stop reading/sending file chunks if the gap between max_seq_no + * and local_checkpoint is greater than {@code maxConcurrentFileChunks}. + */ void sendFileChunks(ActionListener listener) { while (true) { - synchronized (turnMutex) { - if (turnAcquired.compareAndSet(false, true) == false) { - return; + cancellableThreads.checkForCancel(); + synchronized (this) { + if (semaphore.tryAcquire() == false) { + break; } + // don't send more, the number of unreplied chunks is already greater than maxConcurrentFileChunks, if (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getCheckpoint() >= maxConcurrentFileChunks) { - turnAcquired.set(false); - return; + semaphore.release(); + break; } } try { - final byte[] buffer = recycledBuffers.removeFirst(); + final byte[] reusedBuffer = recycledBuffers.pollFirst(); + final byte[] buffer = reusedBuffer != null ? reusedBuffer : new byte[chunkSizeInBytes]; final FileChunk chunk = readChunk(buffer); + semaphore.release(); // other thread can read and send chunks if (chunk == null) { if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getCheckpoint()) { listener.onResponse(null); } - recycledBuffers.addFirst(buffer); - turnAcquired.set(false); - return; + break; } - final long requestSeqId = requestSeqIdTracker.generateSeqNo(); - turnAcquired.set(false); cancellableThreads.execute(() -> recoveryTarget.writeFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk, translogOps.get(), ActionListener.wrap( r -> { recycledBuffers.addFirst(buffer); - requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); + requestSeqIdTracker.markSeqNoAsCompleted(chunk.seqId); sendFileChunks(listener); }, e -> listener.onFailure(handleErrorOnSendFiles(chunk.md, e)) @@ -773,32 +773,39 @@ void sendFileChunks(ActionListener listener) { } } - synchronized FileChunk readChunk(final byte[] buffer) throws Exception { - if (readerClosed) { - throw new IllegalStateException("chunk reader was closed"); - } + FileChunk readChunk(final byte[] buffer) throws Exception { try { - if (currentInput == null) { - if (files.isEmpty()) { - return null; + synchronized (this) { + if (closed) { + throw new IllegalStateException("chunk reader was closed"); } - md = files.remove(0); - position = 0; - final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); - currentInput = new BufferedInputStream(new InputStreamIndexInput(indexInput, md.length()), chunkSizeInBytes) { - @Override - public void close() throws IOException { - indexInput.close(); //InputStreamIndexInput's close is noop + if (currentInput == null) { + if (files.isEmpty()) { + return null; } - }; + md = files.remove(0); + position = 0; + final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); + currentInput = new InputStreamIndexInput(indexInput, md.length()) { + @Override + public void close() throws IOException { + indexInput.close(); //InputStreamIndexInput's close is noop + } + }; + } } - final int bytesRead = currentInput.read(buffer, 0, buffer.length); - final boolean lastChunk = position + bytesRead == md.length(); + final int bytesRead = currentInput.read(buffer); if (bytesRead == -1) { - IOUtils.close(currentInput, () -> currentInput = null); - return readChunk(buffer); + throw new EOFException("position [" + position + "] md [" + md + "]"); + } + final boolean lastChunk = position + bytesRead == md.length(); + if (lastChunk) { + synchronized (this) { + IOUtils.close(currentInput, () -> currentInput = null); + } } - final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), position, lastChunk); + final long seqId = requestSeqIdTracker.generateSeqNo(); + final FileChunk chunk = new FileChunk(seqId, md, new BytesArray(buffer, 0, bytesRead), position, lastChunk); position += bytesRead; return chunk; } catch (Exception e) { @@ -808,8 +815,8 @@ public void close() throws IOException { @Override public synchronized void close() throws IOException { - if (readerClosed == false) { - readerClosed = true; + if (closed == false) { + closed = true; IOUtils.close(currentInput); } } @@ -842,6 +849,22 @@ Exception handleErrorOnSendFiles(StoreFileMetaData md, Exception e) { } } + private static final class FileChunk { + final long seqId; + final StoreFileMetaData md; + final BytesReference content; + final long position; + final boolean lastChunk; + + FileChunk(long seqId, StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) { + this.seqId = seqId; + this.md = md; + this.content = content; + this.position = position; + this.lastChunk = lastChunk; + } + } + protected void failEngine(IOException cause) { shard.failShard("recovery", cause); } From 24519f1716d6d6eca4ae8a40059c01f887140443 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 10 Mar 2019 16:12:34 -0400 Subject: [PATCH 4/5] handle out of order in test --- .../recovery/RecoverySourceHandler.java | 14 ++++++---- .../recovery/RecoverySourceHandlerTests.java | 28 ++++--------------- 2 files changed, 13 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index ad33825c694ff..2ddd5a14edf44 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -798,16 +798,18 @@ public void close() throws IOException { if (bytesRead == -1) { throw new EOFException("position [" + position + "] md [" + md + "]"); } - final boolean lastChunk = position + bytesRead == md.length(); - if (lastChunk) { - synchronized (this) { + final boolean lastChunk; + final long chunkPosition; + synchronized (this) { + chunkPosition = this.position; + this.position += bytesRead; + lastChunk = this.position == md.length(); + if (lastChunk) { IOUtils.close(currentInput, () -> currentInput = null); } } final long seqId = requestSeqIdTracker.generateSeqNo(); - final FileChunk chunk = new FileChunk(seqId, md, new BytesArray(buffer, 0, bytesRead), position, lastChunk); - position += bytesRead; - return chunk; + return new FileChunk(seqId, md, new BytesArray(buffer, 0, bytesRead), chunkPosition, lastChunk); } catch (Exception e) { throw handleErrorOnSendFiles(md, e); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 713d57edd9aff..1d7c8415490ed 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -155,33 +155,15 @@ public void testSendFiles() throws Throwable { metas.add(md); } Store targetStore = newStore(createTempDir()); + MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {}); RecoveryTargetHandler target = new TestRecoveryTargetHandler() { - IndexOutputOutputStream out; @Override public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, int totalTranslogOps, ActionListener listener) { - try { - if (position == 0) { - out = new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) { - @Override - public void close() throws IOException { - super.close(); - targetStore.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it - } - }; - } - final BytesRefIterator iterator = content.iterator(); - BytesRef scratch; - while ((scratch = iterator.next()) != null) { - out.write(scratch.bytes, scratch.offset, scratch.length); - } - if (lastChunk) { - out.close(); - } - maybeExecuteAsync(() -> listener.onResponse(null)); - } catch (Exception e) { - listener.onFailure(e); - } + maybeExecuteAsync(() -> ActionListener.completeWith(listener, () -> { + multiFileWriter.writeFileChunk(md, position, content, lastChunk); + return null; + })); } }; RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request, From 037cdc6aa0e15e42a281e77613f9ea19248256c5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 11 Mar 2019 23:11:48 -0400 Subject: [PATCH 5/5] assert method is not called recursively --- .../recovery/RecoverySourceHandler.java | 1 + .../elasticsearch/threadpool/ThreadPool.java | 14 +++++++ .../recovery/RecoverySourceHandlerTests.java | 40 +++++++------------ .../threadpool/ThreadPoolTests.java | 14 +++++++ 4 files changed, 43 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 2ddd5a14edf44..18d0fc9a9fa2f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -733,6 +733,7 @@ private final class MultiFileSender implements Closeable { * and local_checkpoint is greater than {@code maxConcurrentFileChunks}. */ void sendFileChunks(ActionListener listener) { + assert ThreadPool.assertCurrentMethodIsNotCalledRecursively(); while (true) { cancellableThreads.checkForCancel(); synchronized (this) { diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 87c7997333e4e..0b4a6c39ea983 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -786,4 +786,18 @@ public static boolean assertNotScheduleThread(String reason) { "Expected current thread [" + Thread.currentThread() + "] to not be the scheduler thread. Reason: [" + reason + "]"; return true; } + + public static boolean assertCurrentMethodIsNotCalledRecursively() { + final StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); + assert stackTraceElements.length > 2; + assert stackTraceElements[0].getMethodName().equals("getStackTrace"); + assert stackTraceElements[1].getMethodName().equals("assertCurrentMethodIsNotCalledRecursively"); + final StackTraceElement testingMethod = stackTraceElements[2]; + for (int i = 3; i < stackTraceElements.length; i++) { + assert stackTraceElements[i].getClassName().equals(testingMethod.getClassName()) == false + || stackTraceElements[i].getMethodName().equals(testingMethod.getMethodName()) == false : + testingMethod.getClassName() + "#" + testingMethod.getMethodName() + " is called recursively"; + } + return true; + } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 1d7c8415490ed..3493de8442786 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -94,7 +94,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.IntSupplier; import java.util.function.Supplier; import java.util.zip.CRC32; @@ -104,7 +103,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.core.IsNull.notNullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyObject; @@ -160,7 +158,7 @@ public void testSendFiles() throws Throwable { @Override public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, int totalTranslogOps, ActionListener listener) { - maybeExecuteAsync(() -> ActionListener.completeWith(listener, () -> { + threadPool.generic().execute(() -> ActionListener.completeWith(listener, () -> { multiFileWriter.writeFileChunk(md, position, content, lastChunk); return null; })); @@ -224,7 +222,7 @@ public void indexTranslogOperations(List operations, int tot RetentionLeases retentionLeases, ActionListener listener) { shippedOps.addAll(operations); checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE)); - maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get())); + threadPool.generic().execute(() -> listener.onResponse(checkpointOnTarget.get())); } }; RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); @@ -258,9 +256,9 @@ public void testSendSnapshotStopOnError() throws Exception { public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu, RetentionLeases retentionLeases, ActionListener listener) { if (randomBoolean()) { - maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED)); + threadPool.generic().execute(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED)); } else { - maybeExecuteAsync(() -> listener.onFailure(new RuntimeException("test - failed to index"))); + threadPool.generic().execute(() -> listener.onFailure(new RuntimeException("test - failed to index"))); wasFailed.set(true); } } @@ -347,9 +345,10 @@ public void close() throws IOException { if (lastChunk) { out.close(); } - listener.onResponse(null); + threadPool.generic().execute(() -> listener.onResponse(null)); } catch (Exception e) { - IOUtils.closeWhileHandlingException(out, () -> listener.onFailure(e)); + IOUtils.closeWhileHandlingException(out); + threadPool.generic().execute(() -> listener.onFailure(e)); } } }; @@ -572,7 +571,7 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c int expectedSentChunks = sentChunks.get() + chunksToSend; int expectedUnrepliedChunks = unrepliedChunks.size() + chunksToSend; - chunksToAck.forEach(c -> maybeExecuteAsync(() -> c.listener.onResponse(null))); + chunksToAck.forEach(c -> c.listener.onResponse(null)); assertBusy(() -> { assertThat(sentChunks.get(), equalTo(expectedSentChunks)); assertThat(unrepliedChunks, hasSize(expectedUnrepliedChunks)); @@ -602,23 +601,20 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c Store store = newStore(createTempDir(), false); List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); - AtomicReference error = new AtomicReference<>(); - handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0, ActionListener.wrap(r -> {}, error::set)); + PlainActionFuture future = new PlainActionFuture<>(); + handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0, future); assertBusy(() -> assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks)))); List failedChunks = randomSubsetOf(between(1, unrepliedChunks.size()), unrepliedChunks); - failedChunks.forEach(c -> c.listener.onFailure(new RuntimeException("test chunk exception"))); + failedChunks.forEach(c -> threadPool.generic().execute(() -> c.listener.onFailure(new RuntimeException("test chunk exception")))); unrepliedChunks.removeAll(failedChunks); - unrepliedChunks.forEach(c -> { + unrepliedChunks.forEach(c -> threadPool.generic().execute(() -> { if (randomBoolean()) { c.listener.onFailure(new RuntimeException("test chunk exception")); } else { c.listener.onResponse(null); } - }); - assertBusy(() -> { - assertThat(error.get(), notNullValue()); - assertThat(error.get().getMessage(), containsString("test chunk exception")); - }); + })); + assertThat( expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(),containsString("test chunk exception")); assertThat("no more chunks should be sent", sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks))); store.close(); } @@ -751,12 +747,4 @@ public void close() { } }; } - - private void maybeExecuteAsync(Runnable runnable) { - if (randomBoolean()) { - threadPool.generic().execute(runnable); - } else { - runnable.run(); - } - } } diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index 92a61cda9e98d..ff9d123bf522a 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.test.ESTestCase; import static org.elasticsearch.threadpool.ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING; +import static org.elasticsearch.threadpool.ThreadPool.assertCurrentMethodIsNotCalledRecursively; import static org.hamcrest.CoreMatchers.equalTo; public class ThreadPoolTests extends ESTestCase { @@ -67,4 +68,17 @@ public void testEstimatedTimeIntervalSettingAcceptsOnlyZeroAndPositiveTime() { Exception e = expectThrows(IllegalArgumentException.class, () -> ESTIMATED_TIME_INTERVAL_SETTING.get(settings)); assertEquals("failed to parse value [-1] for setting [thread_pool.estimated_time_interval], must be >= [0ms]", e.getMessage()); } + + int recursiveFactorial(int n) { + assertCurrentMethodIsNotCalledRecursively(); + if (n <= 1) { + return 1; + } + return n * recursiveFactorial(n - 1); + } + + public void testAssertCurrentMethodIsNotCalledRecursively() { + recursiveFactorial(between(0, 1)); + expectThrows(AssertionError.class, () -> recursiveFactorial(between(2, 10))); + } }