Skip to content

Commit

Permalink
Add get file chunk timeouts with listener timeouts (#38758)
Browse files Browse the repository at this point in the history
This commit adds a `ListenerTimeouts` class that will wrap a
`ActionListener` in a listener with a timeout scheduled on the generic
thread pool. If the timeout expires before the listener is completed,
`onFailure` will be called with an `ElasticsearchTimeoutException`.

Timeouts for the get ccr file chunk action are implemented using this
functionality. Additionally, this commit attempts to fix #38027 by also
blocking proxied get ccr file chunk actions. This test being un-muted is
useful to verify the timeout functionality.
  • Loading branch information
Tim-Brooks committed Feb 16, 2019
1 parent 3b4b80a commit cf82864
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.support;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.atomic.AtomicBoolean;

public class ListenerTimeouts {

/**
* Wraps a listener with a listener that can timeout. After the timeout period the
* {@link ActionListener#onFailure(Exception)} will be called with a
* {@link ElasticsearchTimeoutException} if the listener has not already been completed.
*
* @param threadPool used to schedule the timeout
* @param listener to that can timeout
* @param timeout period before listener failed
* @param executor to use for scheduling timeout
* @param listenerName name of the listener for timeout exception
* @return the wrapped listener that will timeout
*/
public static <Response> ActionListener<Response> wrapWithTimeout(ThreadPool threadPool, ActionListener<Response> listener,
TimeValue timeout, String executor, String listenerName) {
TimeoutableListener<Response> wrappedListener = new TimeoutableListener<>(listener, timeout, listenerName);
wrappedListener.cancellable = threadPool.schedule(wrappedListener, timeout, executor);
return wrappedListener;
}

private static class TimeoutableListener<Response> implements ActionListener<Response>, Runnable {

private final AtomicBoolean isDone = new AtomicBoolean(false);
private final ActionListener<Response> delegate;
private final TimeValue timeout;
private final String listenerName;
private volatile Scheduler.ScheduledCancellable cancellable;

private TimeoutableListener(ActionListener<Response> delegate, TimeValue timeout, String listenerName) {
this.delegate = delegate;
this.timeout = timeout;
this.listenerName = listenerName;
}

@Override
public void onResponse(Response response) {
if (isDone.compareAndSet(false, true)) {
cancellable.cancel();
delegate.onResponse(response);
}
}

@Override
public void onFailure(Exception e) {
if (isDone.compareAndSet(false, true)) {
cancellable.cancel();
delegate.onFailure(e);
}
}

@Override
public void run() {
if (isDone.compareAndSet(false, true)) {
String timeoutMessage = "[" + listenerName + "]" + " timed out after [" + timeout + "]";
delegate.onFailure(new ElasticsearchTimeoutException(timeoutMessage));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.support;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.core.IsInstanceOf.instanceOf;

public class ListenerTimeoutsTests extends ESTestCase {

private final TimeValue timeout = TimeValue.timeValueMillis(10);
private final String generic = ThreadPool.Names.GENERIC;
private DeterministicTaskQueue taskQueue;

@Before
public void setUp() throws Exception {
super.setUp();
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
taskQueue = new DeterministicTaskQueue(settings, random());
}

public void testListenerTimeout() {
AtomicBoolean success = new AtomicBoolean(false);
AtomicReference<Exception> exception = new AtomicReference<>();
ActionListener<Void> listener = wrap(success, exception);

ActionListener<Void> wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test");
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();

wrapped.onResponse(null);
wrapped.onFailure(new IOException("incorrect exception"));

assertFalse(success.get());
assertThat(exception.get(), instanceOf(ElasticsearchTimeoutException.class));
}

public void testFinishNormallyBeforeTimeout() {
AtomicBoolean success = new AtomicBoolean(false);
AtomicReference<Exception> exception = new AtomicReference<>();
ActionListener<Void> listener = wrap(success, exception);

ActionListener<Void> wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test");
wrapped.onResponse(null);
wrapped.onFailure(new IOException("boom"));
wrapped.onResponse(null);

assertTrue(taskQueue.hasDeferredTasks());
taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();

assertTrue(success.get());
assertNull(exception.get());
}

public void testFinishExceptionallyBeforeTimeout() {
AtomicBoolean success = new AtomicBoolean(false);
AtomicReference<Exception> exception = new AtomicReference<>();
ActionListener<Void> listener = wrap(success, exception);

ActionListener<Void> wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test");
wrapped.onFailure(new IOException("boom"));

assertTrue(taskQueue.hasDeferredTasks());
taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();

assertFalse(success.get());
assertThat(exception.get(), instanceOf(IOException.class));
}

private ActionListener<Void> wrap(AtomicBoolean success, AtomicReference<Exception> exception) {
return new ActionListener<Void>() {

private final AtomicBoolean completed = new AtomicBoolean();

@Override
public void onResponse(Void aVoid) {
assertTrue(completed.compareAndSet(false, true));
assertTrue(success.compareAndSet(false, true));
}

@Override
public void onFailure(Exception e) {
assertTrue(completed.compareAndSet(false, true));
assertTrue(exception.compareAndSet(null, e));
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.ListenerTimeouts;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -103,7 +104,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
private final ThreadPool threadPool;

private final CounterMetric throttledTime = new CounterMetric();

public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings,
CcrSettings ccrSettings, ThreadPool threadPool) {
this.metadata = metadata;
Expand Down Expand Up @@ -377,7 +378,8 @@ void restoreFiles() throws IOException {
protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws IOException {
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);

try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {})) {
try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {
})) {
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();

Expand All @@ -403,8 +405,9 @@ protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws I
logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId,
fileInfo.name(), offset, bytesRequested);

remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request,
ActionListener.wrap(
TimeValue timeout = ccrSettings.getRecoveryActionTimeout();
ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> listener =
ListenerTimeouts.wrapWithTimeout(threadPool, ActionListener.wrap(
r -> threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
Expand All @@ -428,7 +431,8 @@ protected void doRun() throws Exception {
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
}
));
), timeout, ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME);
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener);
} catch (Exception e) {
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction;
Expand Down Expand Up @@ -292,7 +293,6 @@ public void testRateLimitingIsEmployed() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38027")
public void testIndividualActionsTimeout() throws Exception {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
TimeValue timeValue = TimeValue.timeValueMillis(100);
Expand All @@ -315,7 +315,8 @@ public void testIndividualActionsTimeout() throws Exception {
MockTransportService mockTransportService = (MockTransportService) transportService;
transportServices.add(mockTransportService);
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false) {
if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false &&
action.equals(TransportActionProxy.getProxyAction(GetCcrRestoreFileChunkAction.NAME)) == false) {
connection.sendRequest(requestId, action, request, options);
}
});
Expand All @@ -337,33 +338,34 @@ public void testIndividualActionsTimeout() throws Exception {
.renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS))
.indexSettings(settingsBuilder);

final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));

// Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching
// metadata this will throw an exception. If it times-out when restoring a shard, the shard will
// be marked as failed. Either one is a success for the purpose of this test.
try {
RestoreInfo restoreInfo = future.actionGet();
assertThat(restoreInfo.failedShards(), greaterThan(0));
assertThat(restoreInfo.successfulShards(), lessThan(restoreInfo.totalShards()));
assertEquals(numberOfPrimaryShards, restoreInfo.totalShards());
} catch (Exception e) {
assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class));
}
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));

// Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching
// metadata this will throw an exception. If it times-out when restoring a shard, the shard will
// be marked as failed. Either one is a success for the purpose of this test.
try {
RestoreInfo restoreInfo = future.actionGet();
assertThat(restoreInfo.failedShards(), greaterThan(0));
assertThat(restoreInfo.successfulShards(), lessThan(restoreInfo.totalShards()));
assertEquals(numberOfPrimaryShards, restoreInfo.totalShards());
} catch (Exception e) {
assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class));
}
} finally {
for (MockTransportService transportService : transportServices) {
transportService.clearAllRules();
}

for (MockTransportService transportService : transportServices) {
transportService.clearAllRules();
settingsRequest = new ClusterUpdateSettingsRequest();
TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY);
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(),
defaultValue));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
}

settingsRequest = new ClusterUpdateSettingsRequest();
TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY);
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(),
defaultValue));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
}

public void testFollowerMappingIsUpdated() throws IOException {
Expand Down

0 comments on commit cf82864

Please sign in to comment.