Skip to content

Commit

Permalink
[Bug] [Segment Replication] Update store metadata recovery diff logic…
Browse files Browse the repository at this point in the history
… to ignore missing files causing exception (opensearch-project#4185)

* Update Store for segment replication dif

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* Update recoveryDiff logic to ingore missing files causing exception on replica during copy

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments

Signed-off-by: Suraj Singh <surajrider@gmail.com>

Signed-off-by: Poojita Raj <poojiraj@amazon.com>
Signed-off-by: Suraj Singh <surajrider@gmail.com>
Co-authored-by: Poojita Raj <poojiraj@amazon.com>
  • Loading branch information
dreamer-89 and Poojita-Raj committed Aug 12, 2022
1 parent d344209 commit 5c74ca5
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -354,6 +355,56 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
assertSegmentStats(REPLICA_COUNT);
}

public void testDeleteOperations() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();

// wait a short amount of time to give replication a chance to complete.
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
waitForReplicaUpdate();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);

Set<String> ids = indexer.getIds();
String id = ids.toArray()[0].toString();
client(nodeA).prepareDelete(INDEX_NAME, id).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

refresh(INDEX_NAME);
waitForReplicaUpdate();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount - 1);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount - 1);
}
}

private void assertSegmentStats(int numberOfReplicas) throws IOException {
final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet();

Expand Down
88 changes: 71 additions & 17 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -1143,6 +1143,30 @@ public Map<String, StoreFileMetadata> asMap() {
private static final String LIV_FILE_EXTENSION = "liv"; // lucene 5 delete file
private static final String SEGMENT_INFO_EXTENSION = "si";

/**
* Helper method used to group store files according to segment and commit.
*
* @see MetadataSnapshot#recoveryDiff(MetadataSnapshot)
* @see MetadataSnapshot#segmentReplicationDiff(MetadataSnapshot)
*/
private Iterable<List<StoreFileMetadata>> getGroupedFilesIterable() {
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();
for (StoreFileMetadata meta : this) {
final String segmentId = IndexFileNames.parseSegmentName(meta.name());
final String extension = IndexFileNames.getExtension(meta.name());
if (IndexFileNames.SEGMENTS.equals(segmentId)
|| DEL_FILE_EXTENSION.equals(extension)
|| LIV_FILE_EXTENSION.equals(extension)) {
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
perCommitStoreFiles.add(meta);
} else {
perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);
}
}
return Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles));
}

/**
* Returns a diff between the two snapshots that can be used for recovery. The given snapshot is treated as the
* recovery target and this snapshot as the source. The returned diff will hold a list of files that are:
Expand Down Expand Up @@ -1180,23 +1204,8 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();

for (StoreFileMetadata meta : this) {
final String segmentId = IndexFileNames.parseSegmentName(meta.name());
final String extension = IndexFileNames.getExtension(meta.name());
if (IndexFileNames.SEGMENTS.equals(segmentId)
|| DEL_FILE_EXTENSION.equals(extension)
|| LIV_FILE_EXTENSION.equals(extension)) {
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
perCommitStoreFiles.add(meta);
} else {
perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);
}
}
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
for (List<StoreFileMetadata> segmentFiles : getGroupedFilesIterable()) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
Expand Down Expand Up @@ -1231,6 +1240,51 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
return recoveryDiff;
}

/**
* Segment Replication method
* Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the
* target and this snapshot as the source. The returned diff will hold a list of files that are:
* <ul>
* <li>identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered</li>
* <li>different: they exist in both snapshots but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
*/
public RecoveryDiff segmentReplicationDiff(MetadataSnapshot recoveryTargetSnapshot) {
final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : getGroupedFilesIterable()) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());
if (storeFileMetadata == null) {
// Do not consider missing files as inconsistent in SegRep as replicas may lag while primary updates
// documents and generate new files specific to a segment
missing.add(meta);
} else if (storeFileMetadata.isSame(meta) == false) {
consistent = false;
different.add(meta);
} else {
identicalFiles.add(meta);
}
}
if (consistent) {
identical.addAll(identicalFiles);
} else {
different.addAll(identicalFiles);
}
}
RecoveryDiff recoveryDiff = new RecoveryDiff(
Collections.unmodifiableList(identical),
Collections.unmodifiableList(different),
Collections.unmodifiableList(missing)
);
return recoveryDiff;
}

/**
* Returns the number of files in this snapshot
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
throws IOException {
final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot();
Store.MetadataSnapshot localMetadata = getMetadataSnapshot();
final Store.RecoveryDiff diff = snapshot.recoveryDiff(localMetadata);
final Store.RecoveryDiff diff = snapshot.segmentReplicationDiff(localMetadata);
logger.debug("Replication diff {}", diff);
// Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming snapshot
// from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,51 @@

package org.opensearch.indices.replication;

import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.analysis.MockAnalyzer;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.Version;
import org.junit.Assert;
import org.mockito.Mockito;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.StoreTests;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.DummyShardLock;
import org.opensearch.test.IndexSettingsModule;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Random;
import java.util.Arrays;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -69,6 +91,11 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase {
0
);

private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
"index",
Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build()
);

SegmentInfos testSegmentInfos;

@Override
Expand Down Expand Up @@ -361,6 +388,113 @@ public void onFailure(Exception e) {
});
}

/**
* This tests ensures that new files generated on primary (due to delete operation) are not considered missing on replica
* @throws IOException
*/
public void test_MissingFiles_NotCausingFailure() throws IOException {
int docCount = 1 + random().nextInt(10);
// Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files
// generated due to delete operations. These two snapshots can then be used in test to mock the primary shard
// snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element).
List<Store.MetadataSnapshot> storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount);

SegmentReplicationSource segrepSource = new SegmentReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
listener.onResponse(
new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1), buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))
);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
Store store,
ActionListener<GetSegmentFilesResponse> listener
) {
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
}
};
SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock(
SegmentReplicationTargetService.SegmentReplicationListener.class
);

segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener));
when(segrepTarget.getMetadataSnapshot()).thenReturn(storeMetadataSnapshots.get(0));
segrepTarget.startReplication(new ActionListener<Void>() {
@Override
public void onResponse(Void replicationResponse) {
logger.info("No error processing checkpoint info");
}

@Override
public void onFailure(Exception e) {
assert (e instanceof IllegalStateException);
}
});
}

/**
* Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete
* operation. A list of snapshots is returned so that identical files have same checksum.
* @param docCount
* @return
* @throws IOException
*/
List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount) throws IOException {
List<Document> docList = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
Document document = new Document();
String text = new String(new char[] { (char) (97 + i), (char) (97 + i) });
document.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
document.add(new TextField("str", text, Field.Store.YES));
docList.add(document);
}
long seed = random().nextLong();
Random random = new Random(seed);
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec());
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
iwc.setUseCompoundFile(true);
final ShardId shardId = new ShardId("index", "_na_", 1);
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc);
for (Document d : docList) {
writer.addDocument(d);
}
writer.commit();
Store.MetadataSnapshot storeMetadata = store.getMetadata();
// Delete one document to generate .liv file
writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(docCount))));
writer.commit();
Store.MetadataSnapshot storeMetadataWithDeletes = store.getMetadata();
deleteContent(store.directory());
writer.close();
store.close();
return Arrays.asList(storeMetadata, storeMetadataWithDeletes);
}

public static void deleteContent(Directory directory) throws IOException {
final String[] files = directory.listAll();
final List<IOException> exceptions = new ArrayList<>();
for (String file : files) {
try {
directory.deleteFile(file);
} catch (NoSuchFileException | FileNotFoundException e) {
// ignore
} catch (IOException e) {
exceptions.add(e);
}
}
ExceptionsHelper.rethrowAndSuppress(exceptions);
}

@Override
public void tearDown() throws Exception {
super.tearDown();
Expand Down

0 comments on commit 5c74ca5

Please sign in to comment.