Skip to content

Commit

Permalink
TEST: Retry synced-flush if ongoing ops on primary (#30978)
Browse files Browse the repository at this point in the history
When the last indexing operation is completed, we will fire a global
checkpoint sync. Since a global checkpoint sync request is a replication
request, it will acquire an index shard permit on the primary when
executing. If this happens at the same time while we are issuing the
synced-flush, the synced-flush request will fail as it thinks there are
in-flight operations. We can avoid such situation by retrying another
synced-flush if the current request fails due to ongoing operations on
the primary.

Closes #29392
  • Loading branch information
dnhatn committed Jun 10, 2018
1 parent 827bb58 commit 49695b0
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.indices.flush;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -502,18 +501,7 @@ private InFlightOpsResponse performInFlightOps(InFlightOpsRequest request) {
if (indexShard.routingEntry().primary() == false) {
throw new IllegalStateException("[" + request.shardId() +"] expected a primary shard");
}
if (Assertions.ENABLED) {
if (logger.isTraceEnabled()) {
logger.trace("in flight operations {}, acquirers {}", indexShard.getActiveOperationsCount(), indexShard.getActiveOperations());
}
}
int opCount = indexShard.getActiveOperationsCount();
// Need to snapshot the debug info twice as it's updated concurrently with the permit count.
if (Assertions.ENABLED) {
if (logger.isTraceEnabled()) {
logger.trace("in flight operations {}, acquirers {}", indexShard.getActiveOperationsCount(), indexShard.getActiveOperations());
}
}
return new InFlightOpsResponse(opCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,13 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -103,7 +100,7 @@ public void onFailure(Exception e) {
}
}

public void testSyncedFlush() throws ExecutionException, InterruptedException, IOException {
public void testSyncedFlush() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)).get();
ensureGreen();
Expand Down Expand Up @@ -246,16 +243,6 @@ private void indexDoc(Engine engine, String id) throws IOException {
assertThat(indexResult.getFailure(), nullValue());
}

private String syncedFlushDescription(ShardsSyncedFlushResult result) {
String detail = result.shardResponses().entrySet().stream()
.map(e -> "Shard [" + e.getKey() + "], result [" + e.getValue() + "]")
.collect(Collectors.joining(","));
return String.format(Locale.ROOT, "Total shards: [%d], failed: [%s], reason: [%s], detail: [%s]",
result.totalShards(), result.failed(), result.failureReason(), detail);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392")
@TestLogging("_root:DEBUG,org.elasticsearch.indices.flush:TRACE")
public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
Expand All @@ -281,7 +268,6 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
indexDoc(IndexShardTestCase.getEngine(outOfSyncReplica), "extra_" + i);
}
final ShardsSyncedFlushResult partialResult = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("Partial seal: {}", syncedFlushDescription(partialResult));
assertThat(partialResult.totalShards(), equalTo(numberOfReplicas + 1));
assertThat(partialResult.successfulShards(), equalTo(numberOfReplicas));
assertThat(partialResult.shardResponses().get(outOfSyncReplica.routingEntry()).failureReason, equalTo(
Expand All @@ -297,8 +283,6 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392")
@TestLogging("_root:DEBUG,org.elasticsearch.indices.flush:TRACE")
public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
Expand All @@ -315,11 +299,9 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("First seal: {}", syncedFlushDescription(firstSeal));
assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1));
// Do not renew synced-flush
final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("Second seal: {}", syncedFlushDescription(secondSeal));
assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId()));
// Shards were updated, renew synced flush.
Expand All @@ -328,7 +310,6 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("Third seal: {}", syncedFlushDescription(thirdSeal));
assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId())));
// Manually remove or change sync-id, renew synced flush.
Expand All @@ -344,7 +325,6 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
assertThat(shard.commitStats().syncId(), nullValue());
}
final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("Forth seal: {}", syncedFlushDescription(forthSeal));
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.test.ESTestCase.assertBusy;

/** Utils for SyncedFlush */
public class SyncedFlushUtil {
Expand All @@ -40,21 +43,31 @@ private SyncedFlushUtil() {
/**
* Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)}
*/
public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, InternalTestCluster cluster, ShardId shardId) {
public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, InternalTestCluster cluster, ShardId shardId) throws Exception {
/*
* When the last indexing operation is completed, we will fire a global checkpoint sync.
* Since a global checkpoint sync request is a replication request, it will acquire an index
* shard permit on the primary when executing. If this happens at the same time while we are
* issuing the synced-flush, the synced-flush request will fail as it thinks there are
* in-flight operations. We can avoid such situation by continuing issuing another synced-flush
* if the synced-flush failed due to the ongoing operations on the primary.
*/
SyncedFlushService service = cluster.getInstance(SyncedFlushService.class);
logger.debug("Issue synced-flush on node [{}], shard [{}], cluster state [{}]",
service.nodeName(), shardId, cluster.clusterService(service.nodeName()).state());
LatchedListener<ShardsSyncedFlushResult> listener = new LatchedListener<>();
service.attemptSyncedFlush(shardId, listener);
try {
AtomicReference<LatchedListener<ShardsSyncedFlushResult>> listenerHolder = new AtomicReference<>();
assertBusy(() -> {
LatchedListener<ShardsSyncedFlushResult> listener = new LatchedListener<>();
listenerHolder.set(listener);
service.attemptSyncedFlush(shardId, listener);
listener.latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (listener.result != null && listener.result.failureReason() != null
&& listener.result.failureReason().contains("ongoing operations on primary")) {
throw new AssertionError(listener.result.failureReason()); // cause the assert busy to retry
}
});
if (listenerHolder.get().error != null) {
throw ExceptionsHelper.convertToElastic(listenerHolder.get().error);
}
if (listener.error != null) {
throw ExceptionsHelper.convertToElastic(listener.error);
}
return listener.result;
return listenerHolder.get().result;
}

public static final class LatchedListener<T> implements ActionListener<T> {
Expand Down

0 comments on commit 49695b0

Please sign in to comment.