Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Avoid indexing documents as stale in non primary mode #8578

Merged
merged 1 commit into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@ public class IndexPrimaryRelocationIT extends OpenSearchIntegTestCase {

private static final int RELOCATION_COUNT = 15;

public void setup() {}

public Settings indexSettings() {
return Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build();
}

public void testPrimaryRelocationWhileIndexing() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(2, 3));
client().admin()
.indices()
.prepareCreate("test")
.setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.setMapping("field", "type=text")
.get();
setup();
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("test");
AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.recovery.IndexPrimaryRelocationIT;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class RemoteIndexPrimaryRelocationIT extends IndexPrimaryRelocationIT {

protected static final String REPOSITORY_NAME = "test-remote-store-repo";

protected Path absolutePath;

public void setup() {
absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, REPOSITORY_NAME, false))
.build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,22 @@
import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.Before;
import org.opensearch.action.admin.indices.close.CloseIndexResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -116,4 +121,63 @@ public void testPromoteReplicaToPrimary() throws Exception {
refresh(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numOfDocs);
}

public void testFailoverWhileIndexing() throws Exception {
internalCluster().startNode();
internalCluster().startNode();
final String indexName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
shard_count = scaledRandomIntBetween(1, 5);
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
createIndex(indexName);
ensureGreen(indexName);
int docCount = scaledRandomIntBetween(20, 50);
final int indexDocAfterFailover = scaledRandomIntBetween(20, 50);
AtomicInteger numAutoGenDocs = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = new Thread(() -> {
int docsAfterFailover = 0;
while (finished.get() == false && numAutoGenDocs.get() < docCount) {
IndexResponse indexResponse = internalCluster().clusterManagerClient()
.prepareIndex(indexName)
.setSource("field", numAutoGenDocs.get())
.get();

if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.ACCEPTED) {
numAutoGenDocs.incrementAndGet();
if (numAutoGenDocs.get() == docCount / 2) {
if (random().nextInt(3) == 0) {
refresh(indexName);
} else if (random().nextInt(2) == 0) {
flush(indexName);
}
// Node is killed on this
latch.countDown();
} else if (numAutoGenDocs.get() > docCount / 2) {
docsAfterFailover++;
if (docsAfterFailover == indexDocAfterFailover) {
finished.set(true);
}
}
}
}
logger.debug("Done indexing");
});
indexingThread.start();
latch.await();

ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final int numShards = state.metadata().index(indexName).getNumberOfShards();
final ShardRouting primaryShard = state.routingTable().index(indexName).shard(randomIntBetween(0, numShards - 1)).primaryShard();
final DiscoveryNode randomNode = state.nodes().resolveNode(primaryShard.currentNodeId());

// stop the random data node, all remaining shards are promoted to primaries
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(randomNode.getName()));
ensureYellowAndNoInitializingShards(indexName);
indexingThread.join();
refresh(indexName);
assertHitCount(
client(internalCluster().getClusterManagerName()).prepareSearch(indexName).setSize(0).setTrackTotalHits(true).get(),
numAutoGenDocs.get()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
final OpVsLuceneDocStatus status;
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
assert incrementVersionLookup();
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled();
if (versionValue != null) {
status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue);
} else {
Expand All @@ -722,10 +723,8 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
} else if (op.seqNo() > docAndSeqNo.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == docAndSeqNo.seqNo) {
assert localCheckpointTracker.hasProcessed(op.seqNo()) : "local checkpoint tracker is not updated seq_no="
+ op.seqNo()
+ " id="
+ op.id();
assert localCheckpointTracker.hasProcessed(op.seqNo()) || segRepEnabled
: "local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
Expand Down Expand Up @@ -927,6 +926,7 @@ public IndexResult index(Index index) throws IOException {
plan.currentNotFoundOrDeleted
);
}

}
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
Expand Down Expand Up @@ -1005,10 +1005,18 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes;
plan = IndexingStrategy.optimizedAppendOnly(index.version(), 0);
} else {
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled();
versionMap.enforceSafeAccess();
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processAsStaleOp(index.version());
if (segRepEnabled) {
// For segrep based indices, we can't completely rely on localCheckpointTracker
// as the preserved checkpoint may not have all the operations present in lucene
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
// we don't need to index it again as stale op as it would create multiple documents for same seq no
plan = IndexingStrategy.processButSkipLucene(false, index.version());
} else {
plan = IndexingStrategy.processAsStaleOp(index.version());
}
} else {
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0);
}
Expand Down Expand Up @@ -1442,9 +1450,17 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
plan = DeletionStrategy.processButSkipLucene(false, delete.version());
} else {
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled();
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = DeletionStrategy.processAsStaleOp(delete.version());
if (segRepEnabled) {
// For segrep based indices, we can't completely rely on localCheckpointTracker
// as the preserved checkpoint may not have all the operations present in lucene
// we don't need to index it again as stale op as it would create multiple documents for same seq no
plan = DeletionStrategy.processButSkipLucene(false, delete.version());
} else {
plan = DeletionStrategy.processAsStaleOp(delete.version());
}
} else {
plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version(), 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
Expand All @@ -50,8 +50,6 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Constants;
import org.junit.Assert;
import org.opensearch.common.io.PathUtils;
import org.opensearch.core.Assertions;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
Expand All @@ -77,11 +75,12 @@
import org.opensearch.common.Randomness;
import org.opensearch.common.Strings;
import org.opensearch.common.UUIDs;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -90,8 +89,9 @@
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.core.Assertions;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
Expand All @@ -106,8 +106,8 @@
import org.opensearch.index.engine.EngineTestCase;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.engine.ReadOnlyEngine;
import org.opensearch.index.fielddata.FieldDataStats;
import org.opensearch.index.fielddata.IndexFieldData;
Expand Down Expand Up @@ -168,6 +168,7 @@
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand All @@ -192,7 +193,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.Collection;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsInAnyOrder;
Expand All @@ -217,8 +218,8 @@
import static org.mockito.Mockito.mock;
import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.test.hamcrest.RegexMatcher.matches;

Expand Down Expand Up @@ -2886,13 +2887,14 @@ public void testCommitLevelRestoreShardFromRemoteStore() throws IOException {
}

public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOException {
String remoteStorePath = createTempDir().toString();
IndexShard target = newStartedShard(
true,
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "temp-fs")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "temp-fs")
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStorePath + "__test")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStorePath + "__test")
.build(),
new InternalEngineFactory()
);
Expand Down Expand Up @@ -2957,7 +2959,6 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
target.restoreFromRemoteStore(future);
target.remoteStore().decRef();

assertTrue(future.actionGet());
assertDocs(target, "1", "2");

Expand Down
Loading