Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Segment Replication] Update store metadata recovery diff logic to ignore missing files causing exception #4185

Merged
merged 3 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -36,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -247,6 +249,56 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
assertSegmentStats(REPLICA_COUNT);
}

public void testDeleteOperations() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this integ test and it still gives an off by 1 error intermittently so I think we should either hold off on it for a separate PR or fix it before merging it in.

image

Copy link
Member Author

@dreamer-89 dreamer-89 Aug 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good call-out @Poojita-Raj . Yes, I am also able to repro this by running test in interations (fails 3/10). I think this is related to replica lagging behind primary and needs some deep dive. This PR is about avoid shard failures due to delete operations. I will file a new bug to track this.

./gradlew ':server:internalClusterTest' --tests "org.opensearch.indices.replication.SegmentReplicationIT.testDeleteOperations" -Dtests.seed=44620B458370EA29 -Dtests.iters=10

final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();

// wait a short amount of time to give replication a chance to complete.
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
waitForReplicaUpdate();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);

Set<String> ids = indexer.getIds();
String id = ids.toArray()[0].toString();
client(nodeA).prepareDelete(INDEX_NAME, id).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

refresh(INDEX_NAME);
waitForReplicaUpdate();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount - 1);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount - 1);
}
}

private void assertSegmentStats(int numberOfReplicas) throws IOException {
final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet();

Expand Down
88 changes: 71 additions & 17 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -1102,6 +1102,30 @@ public Map<String, StoreFileMetadata> asMap() {
private static final String LIV_FILE_EXTENSION = "liv"; // lucene 5 delete file
private static final String SEGMENT_INFO_EXTENSION = "si";

/**
* Helper method used to group store files according to segment and commit.
*
* @see MetadataSnapshot#recoveryDiff(MetadataSnapshot)
* @see MetadataSnapshot#segmentReplicationDiff(MetadataSnapshot)
*/
private Iterable<List<StoreFileMetadata>> getGroupedFilesIterable() {
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();
for (StoreFileMetadata meta : this) {
final String segmentId = IndexFileNames.parseSegmentName(meta.name());
final String extension = IndexFileNames.getExtension(meta.name());
if (IndexFileNames.SEGMENTS.equals(segmentId)
|| DEL_FILE_EXTENSION.equals(extension)
|| LIV_FILE_EXTENSION.equals(extension)) {
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
perCommitStoreFiles.add(meta);
} else {
perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);
}
}
return Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles));
}

/**
* Returns a diff between the two snapshots that can be used for recovery. The given snapshot is treated as the
* recovery target and this snapshot as the source. The returned diff will hold a list of files that are:
Expand Down Expand Up @@ -1139,23 +1163,8 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();

for (StoreFileMetadata meta : this) {
final String segmentId = IndexFileNames.parseSegmentName(meta.name());
final String extension = IndexFileNames.getExtension(meta.name());
if (IndexFileNames.SEGMENTS.equals(segmentId)
|| DEL_FILE_EXTENSION.equals(extension)
|| LIV_FILE_EXTENSION.equals(extension)) {
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
perCommitStoreFiles.add(meta);
} else {
perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);
}
}
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
for (List<StoreFileMetadata> segmentFiles : getGroupedFilesIterable()) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
Expand Down Expand Up @@ -1190,6 +1199,51 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
return recoveryDiff;
}

/**
* Segment Replication method
* Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the
* target and this snapshot as the source. The returned diff will hold a list of files that are:
* <ul>
* <li>identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered</li>
* <li>different: they exist in both snapshots but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
*/
public RecoveryDiff segmentReplicationDiff(MetadataSnapshot recoveryTargetSnapshot) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - rename snapshot as well for consistency - segmentReplicationTargetSnapshot

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Poojita-Raj for the review.

This method evaluates the difference in segment files b/w different store metadata snapshots which makes this naming consistent. This name also aligns with existing recoveryDiff used for evaluating snapshots diff.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code in this method seems identical to recoveryDiff except for how missing files are handled. Instead of duplicating most of the code, I would suggest gating the two pieces of logic with a boolean method parameter like:

public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot, boolean shouldCheckMissingFiles) {

That would also remove the need to extract the getGroupedFilesIterable method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kartg, this is a good point! Initially, I thought of adding a flag for gating but later realized recoveryDiff is used heavily and needs lot many changes. Also, I feel keeping them separate clearly calls out for separate functionality and leaves scope for future enhancements around store's metadata comparison (for e.g. identical/missing files are not used for segment comparison and can be removed altogether) and makes it more readable.

final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : getGroupedFilesIterable()) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());
if (storeFileMetadata == null) {
// Do not consider missing files as inconsistent in SegRep as replicas may lag while primary updates
// documents and generate new files specific to a segment
missing.add(meta);
} else if (storeFileMetadata.isSame(meta) == false) {
consistent = false;
different.add(meta);
} else {
identicalFiles.add(meta);
}
}
if (consistent) {
identical.addAll(identicalFiles);
} else {
different.addAll(identicalFiles);
}
}
RecoveryDiff recoveryDiff = new RecoveryDiff(
Collections.unmodifiableList(identical),
Collections.unmodifiableList(different),
Collections.unmodifiableList(missing)
);
return recoveryDiff;
}

/**
* Returns the number of files in this snapshot
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
throws IOException {
final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot();
Store.MetadataSnapshot localMetadata = getMetadataSnapshot();
final Store.RecoveryDiff diff = snapshot.recoveryDiff(localMetadata);
final Store.RecoveryDiff diff = snapshot.segmentReplicationDiff(localMetadata);
logger.debug("Replication diff {}", diff);
// Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming snapshot
// from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,51 @@

package org.opensearch.indices.replication;

import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.analysis.MockAnalyzer;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.Version;
import org.junit.Assert;
import org.mockito.Mockito;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.StoreTests;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.DummyShardLock;
import org.opensearch.test.IndexSettingsModule;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Random;
import java.util.Arrays;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -69,6 +91,11 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase {
0
);

private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
"index",
Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build()
);

SegmentInfos testSegmentInfos;

@Override
Expand Down Expand Up @@ -361,6 +388,113 @@ public void onFailure(Exception e) {
});
}

/**
* This tests ensures that new files generated on primary (due to delete operation) are not considered missing on replica
* @throws IOException
*/
public void test_MissingFiles_NotCausingFailure() throws IOException {
int docCount = 1 + random().nextInt(10);
// Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files
// generated due to delete operations. These two snapshots can then be used in test to mock the primary shard
// snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element).
List<Store.MetadataSnapshot> storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick - please add comments here so that it's clear what the subsequent get(0) and get(1) fetch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


SegmentReplicationSource segrepSource = new SegmentReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
listener.onResponse(
new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1), buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))
);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
Store store,
ActionListener<GetSegmentFilesResponse> listener
) {
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
}
};
SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock(
SegmentReplicationTargetService.SegmentReplicationListener.class
);

segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener));
when(segrepTarget.getMetadataSnapshot()).thenReturn(storeMetadataSnapshots.get(0));
segrepTarget.startReplication(new ActionListener<Void>() {
@Override
public void onResponse(Void replicationResponse) {
logger.info("No error processing checkpoint info");
}

@Override
public void onFailure(Exception e) {
assert (e instanceof IllegalStateException);
}
});
}

/**
* Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete
* operation. A list of snapshots is returned so that identical files have same checksum.
* @param docCount
* @return
* @throws IOException
*/
List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount) throws IOException {
List<Document> docList = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
Document document = new Document();
String text = new String(new char[] { (char) (97 + i), (char) (97 + i) });
document.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
document.add(new TextField("str", text, Field.Store.YES));
docList.add(document);
}
long seed = random().nextLong();
Random random = new Random(seed);
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec());
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
iwc.setUseCompoundFile(true);
final ShardId shardId = new ShardId("index", "_na_", 1);
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc);
for (Document d : docList) {
writer.addDocument(d);
}
writer.commit();
Store.MetadataSnapshot storeMetadata = store.getMetadata();
// Delete one document to generate .liv file
writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(docCount))));
writer.commit();
Store.MetadataSnapshot storeMetadataWithDeletes = store.getMetadata();
deleteContent(store.directory());
writer.close();
store.close();
return Arrays.asList(storeMetadata, storeMetadataWithDeletes);
}

public static void deleteContent(Directory directory) throws IOException {
final String[] files = directory.listAll();
final List<IOException> exceptions = new ArrayList<>();
for (String file : files) {
try {
directory.deleteFile(file);
} catch (NoSuchFileException | FileNotFoundException e) {
// ignore
} catch (IOException e) {
exceptions.add(e);
}
}
ExceptionsHelper.rethrowAndSuppress(exceptions);
}

@Override
public void tearDown() throws Exception {
super.tearDown();
Expand Down