Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smarter CCR concurrent file chunk fetching #38841

Merged
merged 4 commits into from
Feb 15, 2019

Conversation

ywelsch
Copy link
Contributor

@ywelsch ywelsch commented Feb 13, 2019

The previous logic for concurrent file chunk fetching did not allow for multiple chunks from the same file to be fetched in parallel. The parallelism only allowed to fetch chunks from different files in parallel. This required complex logic on the follower to be aware from which file it was already fetching information, in order to ensure that chunks for the same file would be fetched in sequential order. During benchmarking, this exhibited throughput issues when recovery came towards the end, where it would only be sequentially fetching chunks for the same largest segment file, with throughput considerably going down in a high-latency network as there was no parallelism anymore.

The new logic here follows the peer recovery model more closely, and sends multiple requests for the same file in parallel, and then reorders the results as necessary. Benchmarks show that this leads to better overall throughput and the implementation is also simpler.

@ywelsch ywelsch added >non-issue v7.0.0 :Distributed Indexing/CCR Issues around the Cross Cluster State Replication features v6.7.0 v8.0.0 v7.2.0 labels Feb 13, 2019
@ywelsch ywelsch requested a review from Tim-Brooks February 13, 2019 11:57
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

@ywelsch ywelsch requested a review from dnhatn February 13, 2019 15:20
Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I left a minor comment.

if (error.get() != null) {
break;
break outer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this check here since the check inside the while loop is good enough.

Copy link
Contributor

@Tim-Brooks Tim-Brooks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the outer loop stuff pretty difficult to reason about. This mostly LGTM. But here is a suggestion about how to structure the method differently (in a way that makes more sense to me).

Additionally, I think we should check the error after waiting to prevent sending unnecessary requests.

        @Override
        protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws IOException {
            logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);

            try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {
            })) {
                final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
                final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();

                for (FileInfo fileInfo : filesToRecover) {
                    final long fileLength = fileInfo.length();
                    long offset = 0;
                    while (offset < fileLength && error.get() != null) {
                        final long requestSeqId = requestSeqIdTracker.generateSeqNo();
                        try {
                            requestSeqIdTracker.waitForOpsToComplete(requestSeqId - ccrSettings.getMaxConcurrentFileChunks());

                            // Tim: Suggestion - Do not fire off more requests if one failed. It is possible one failed while we were waiting
                            if (error.get() != null) {
                                break;
                            }

                            final int bytesRequested = Math.toIntExact(
                                Math.min(ccrSettings.getChunkSize().getBytes(), fileLength - offset));
                            offset += bytesRequested;

                            final GetCcrRestoreFileChunkRequest request =
                                new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileInfo.name(), bytesRequested);
                            logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId,
                                fileInfo.name(), offset, bytesRequested);

                            TimeValue timeout = ccrSettings.getRecoveryActionTimeout();
                            ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> listener =
                                ListenerTimeouts.wrapWithTimeout(threadPool, ActionListener.wrap(
                                    r -> threadPool.generic().execute(new AbstractRunnable() {
                                        @Override
                                        public void onFailure(Exception e) {
                                            error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
                                            requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
                                        }

                                        @Override
                                        protected void doRun() throws Exception {
                                            final int actualChunkSize = r.getChunk().length();
                                            logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId,
                                                snapshotId, fileInfo.name(), r.getOffset(), actualChunkSize);
                                            final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
                                            throttleListener.accept(nanosPaused);
                                            final boolean lastChunk = r.getOffset() + actualChunkSize >= fileLength;
                                            multiFileWriter.writeFileChunk(fileInfo.metadata(), r.getOffset(), r.getChunk(), lastChunk);
                                            requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
                                        }
                                    }),
                                    e -> {
                                        error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
                                        requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
                                    }
                                ), timeout, ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME);
                            remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener);
                        } catch (Exception e) {
                            error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
                            requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
                            // Tim suggestion: WE don't really need to break here as we check at the start of the loop. But do whatever makes sense to you
                            break;
                        }
                    }
                }

                try {
                    requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new ElasticsearchException(e);
                }
                if (error.get() != null) {
                    handleError(store, error.get().v2());
                }
            }

            logger.trace("[{}] completed CCR restore", shardId);
        }

Finally, at some point we maybe should consider the implications when recovering inside the same data center which is a valid scenario (I think). Contending on the same file lock for our concurrency probably makes no impact when the latency is ~100ms. It might make a difference if the latency is 0-1ms.

@ywelsch
Copy link
Contributor Author

ywelsch commented Feb 14, 2019

@tbrooks8 the suggested change here has a flaw: it does not guarantee anymore that every request id that is generated will have a corresponding completion event (as we generate a fresh seq no, and then possibly exit via the if (error.get() != null) { break; } condition. I've fixed this but otherwise taken on your suggestion as i don't have a strong preference here.

Copy link
Contributor

@Tim-Brooks Tim-Brooks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ywelsch ywelsch merged commit 5f2ddf2 into elastic:master Feb 15, 2019
ywelsch added a commit that referenced this pull request Feb 15, 2019
The previous logic for concurrent file chunk fetching did not allow for multiple chunks from the same
file to be fetched in parallel. The parallelism only allowed to fetch chunks from different files in
parallel. This required complex logic on the follower to be aware from which file it was already
fetching information, in order to ensure that chunks for the same file would be fetched in sequential
order. During benchmarking, this exhibited throughput issues when recovery came towards the end,
where it would only be sequentially fetching chunks for the same largest segment file, with
throughput considerably going down in a high-latency network as there was no parallelism anymore.

The new logic here follows the peer recovery model more closely, and sends multiple requests for
the same file in parallel, and then reorders the results as necessary. Benchmarks show that this
leads to better overall throughput and the implementation is also simpler.
ywelsch added a commit that referenced this pull request Feb 15, 2019
The previous logic for concurrent file chunk fetching did not allow for multiple chunks from the same
file to be fetched in parallel. The parallelism only allowed to fetch chunks from different files in
parallel. This required complex logic on the follower to be aware from which file it was already
fetching information, in order to ensure that chunks for the same file would be fetched in sequential
order. During benchmarking, this exhibited throughput issues when recovery came towards the end,
where it would only be sequentially fetching chunks for the same largest segment file, with
throughput considerably going down in a high-latency network as there was no parallelism anymore.

The new logic here follows the peer recovery model more closely, and sends multiple requests for
the same file in parallel, and then reorders the results as necessary. Benchmarks show that this
leads to better overall throughput and the implementation is also simpler.
ywelsch added a commit that referenced this pull request Feb 15, 2019
The previous logic for concurrent file chunk fetching did not allow for multiple chunks from the same
file to be fetched in parallel. The parallelism only allowed to fetch chunks from different files in
parallel. This required complex logic on the follower to be aware from which file it was already
fetching information, in order to ensure that chunks for the same file would be fetched in sequential
order. During benchmarking, this exhibited throughput issues when recovery came towards the end,
where it would only be sequentially fetching chunks for the same largest segment file, with
throughput considerably going down in a high-latency network as there was no parallelism anymore.

The new logic here follows the peer recovery model more closely, and sends multiple requests for
the same file in parallel, and then reorders the results as necessary. Benchmarks show that this
leads to better overall throughput and the implementation is also simpler.
jasontedor added a commit to jasontedor/elasticsearch that referenced this pull request Feb 15, 2019
* elastic/master:
  Avoid double term construction in DfsPhase (elastic#38716)
  Fix typo in DateRange docs (yyy → yyyy) (elastic#38883)
  Introduced class reuses follow parameter code between ShardFollowTasks (elastic#38910)
  Ensure random timestamps are within search boundary (elastic#38753)
  [CI] Muting  method testFollowIndex in IndexFollowingIT
  Update Lucene snapshot repo for 7.0.0-beta1 (elastic#38946)
  SQL: Doc on syntax (identifiers in particular) (elastic#38662)
  Upgrade to Gradle 5.2.1 (elastic#38880)
  Tie break search shard iterator comparisons on cluster alias (elastic#38853)
  Also mmap cfs files for hybridfs (elastic#38940)
  Build: Fix issue with test status logging (elastic#38799)
  Adapt FullClusterRestartIT on master (elastic#38856)
  Fix testAutoFollowing test to use createLeaderIndex() helper method.
  Migrate muted auto follow rolling upgrade test and unmute this test (elastic#38900)
  ShardBulkAction ignore primary response on primary (elastic#38901)
  Recover peers from translog, ignoring soft deletes (elastic#38904)
  Fix NPE on Stale Index in IndicesService (elastic#38891)
  Smarter CCR concurrent file chunk fetching (elastic#38841)
  Fix intermittent failure in ApiKeyIntegTests (elastic#38627)
  re-enable SmokeTestWatcherWithSecurityIT (elastic#38814)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants