Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Refactor RemoteStoreReplicationSource #8767

Merged
merged 26 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c04190e
[Segment Replication] Refactor remote replication source
dreamer-89 Jul 18, 2023
2d57ddc
Unit test updates
dreamer-89 Jul 19, 2023
929d824
Self review
dreamer-89 Jul 19, 2023
eb29746
Self review
dreamer-89 Jul 19, 2023
60126e5
Segregate shard level tests for node to node and remote store segment…
dreamer-89 Jul 20, 2023
5d9063a
Fix failing unit tests
dreamer-89 Jul 24, 2023
7200dcb
Fix failing UT
dreamer-89 Jul 24, 2023
8c92bc3
Fix failing UT
dreamer-89 Jul 24, 2023
49cabde
Address review comments
dreamer-89 Jul 24, 2023
2ce7e63
Fix more unit tests
dreamer-89 Jul 25, 2023
04ad07c
Improve RemoteStoreReplicationSourceTests, remove unnecessary mocks a…
dreamer-89 Jul 26, 2023
d537a4e
Spotless check fix
dreamer-89 Jul 26, 2023
dfdc7cc
Address review comments
dreamer-89 Jul 27, 2023
b53b4b4
Ignore files already in store while computing segment file diff with …
dreamer-89 Jul 27, 2023
7b1b6ff
Spotless fix
dreamer-89 Jul 27, 2023
6ceaf1d
Fix failing UT
dreamer-89 Jul 27, 2023
90e4bc8
Spotless fix
dreamer-89 Jul 27, 2023
ac59af9
Move read/writes from IndexInput/Output to RemoteSegmentMetadata
dreamer-89 Jul 27, 2023
bc9bd8c
Address review commnt
dreamer-89 Jul 27, 2023
1689fd2
Update recovery flow to perform commits during recovery
dreamer-89 Aug 1, 2023
dc79593
Remove un-necessary char
dreamer-89 Aug 1, 2023
330e712
Address review comments
dreamer-89 Aug 1, 2023
0ac181a
Update comment nit-pick
dreamer-89 Aug 1, 2023
4337dcd
Remove deletion logic causing read issues due to deleted segments_N
dreamer-89 Aug 1, 2023
14ca8ed
Spotless fix
dreamer-89 Aug 1, 2023
753729c
Fix unit tests
dreamer-89 Aug 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -24,7 +23,6 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -134,24 +132,6 @@ protected void waitForSearchableDocs(long docCount, String... nodes) throws Exce
waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList()));
}

protected void waitForSegmentReplication(String node) throws Exception {
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
assertBusy(() -> {
SegmentReplicationStatsResponse segmentReplicationStatsResponse = client(node).admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();
final SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats()
.get(INDEX_NAME)
.get(0);
assertEquals(
perGroupStats.getReplicaStats().stream().findFirst().get().getCurrentReplicationState().getStage(),
SegmentReplicationState.Stage.DONE
);
}, 1, TimeUnit.MINUTES);
}

protected void verifyStoreContent() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = getClusterState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
assertTrue(
replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted > 0
&& primaryStats.uploadBytesStarted
- zeroStatePrimaryStats.uploadBytesStarted == replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted
- zeroStatePrimaryStats.uploadBytesStarted >= replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted
);
assertTrue(
replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0
&& primaryStats.uploadBytesSucceeded
- zeroStatePrimaryStats.uploadBytesSucceeded == replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded
- zeroStatePrimaryStats.uploadBytesSucceeded >= replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded
);
// Assert zero failures
assertEquals(0, primaryStats.uploadBytesFailed);
Expand Down Expand Up @@ -369,8 +369,8 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
assertEquals(0, uploadsFailed);
assertEquals(0, uploadBytesFailed);
for (int j = 0; j < response.getSuccessfulShards() - 1; j++) {
assertEquals(uploadBytesStarted - zeroStatePrimaryStats.uploadBytesStarted, (long) downloadBytesStarted.get(j));
assertEquals(uploadBytesSucceeded - zeroStatePrimaryStats.uploadBytesSucceeded, (long) downloadBytesSucceeded.get(j));
assertTrue(uploadBytesStarted - zeroStatePrimaryStats.uploadBytesStarted > downloadBytesStarted.get(j));
assertTrue(uploadBytesSucceeded - zeroStatePrimaryStats.uploadBytesSucceeded > downloadBytesSucceeded.get(j));
assertEquals(0, (long) downloadBytesFailed.get(j));
}
}, 60, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put(super.nodeSettings(nodeOrdinal))
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that check by-timestamp order
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.put(remoteStoreClusterSettings("remote-store-repo-name"))
.build();
}
Expand Down
54 changes: 10 additions & 44 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -1988,7 +1986,7 @@ private long recoverLocallyUpToGlobalCheckpoint() {
final Optional<SequenceNumbers.CommitInfo> safeCommit;
final long globalCheckpoint;
try {
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(TRANSLOG_UUID_KEY);
globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
safeCommit = store.findSafeIndexCommit(globalCheckpoint);
} catch (org.apache.lucene.index.IndexNotFoundException e) {
Expand Down Expand Up @@ -2088,7 +2086,7 @@ private long recoverLocallyUptoLastCommit() {
try {
seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(MAX_SEQ_NO));
} catch (org.apache.lucene.index.IndexNotFoundException e) {
logger.error("skip local recovery as no index commit found", e);
logger.error("skip local recovery as no index commit found");
return UNASSIGNED_SEQ_NO;
} catch (Exception e) {
logger.error("skip local recovery as failed to find the safe commit", e);
Expand Down Expand Up @@ -2242,7 +2240,7 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
// we have to set it before we open an engine and recover from the translog because
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
}
Expand Down Expand Up @@ -2326,7 +2324,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) {
syncSegmentsFromRemoteSegmentStore(false, true, true);
syncSegmentsFromRemoteSegmentStore(false, true);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
if (syncFromRemote) {
Expand Down Expand Up @@ -4555,7 +4553,7 @@ public void close() throws IOException {
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false, true, true);
syncSegmentsFromRemoteSegmentStore(false, true);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
Expand Down Expand Up @@ -4616,13 +4614,11 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
* @param shouldCommit if the shard requires committing the changes after sync from remote.
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit)
throws IOException {
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
logger.trace("Downloading segments from remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
// We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that
// are uploaded to the remote segment store.
Expand All @@ -4647,7 +4643,6 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
} else {
storeDirectory = store.directory();
}
Set<String> localSegmentFiles = Sets.newHashSet(storeDirectory.listAll());
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal);

if (refreshLevelSegmentSync && remoteSegmentMetadata != null) {
Expand All @@ -4661,37 +4656,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
indexInput,
remoteSegmentMetadata.getGeneration()
);
// Replicas never need a local commit
if (shouldCommit) {
if (this.shardRouting.primary()) {
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the
// latest commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
- 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
}
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
} else {
finalizeReplication(infosSnapshot);
mch2 marked this conversation as resolved.
Show resolved Hide resolved
}
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
}
} catch (IOException e) {
Expand All @@ -4716,7 +4682,7 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
long primaryTerm,
long commitGeneration
) throws IOException {
logger.info("Downloading segments from given remote segment store");
logger.trace("Downloading segments from given remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = null;
if (remoteStore != null) {
remoteDirectory = getRemoteDirectory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private synchronized boolean syncSegments() {
public void onResponse(Void unused) {
try {
// Start metadata file upload
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
uploadMetadata(localSegmentsPostRefresh, segmentInfos, checkpoint);
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(
refreshTimeMs,
Expand Down Expand Up @@ -327,7 +327,8 @@ private boolean isRefreshAfterCommit() throws IOException {
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos) throws IOException {
void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)
throws IOException {
final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint();
SegmentInfos segmentInfosSnapshot = segmentInfos.clone();
Map<String, String> userData = segmentInfosSnapshot.getUserData();
Expand All @@ -344,8 +345,8 @@ void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos se
localSegmentsPostRefresh,
segmentInfosSnapshot,
storeDirectory,
indexShard.getOperationPrimaryTerm(),
translogFileGeneration
translogFileGeneration,
replicationCheckpoint
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
remoteStore.incRef();
try {
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true);
indexShard.syncSegmentsFromRemoteSegmentStore(true, true);

if (store.directory().listAll().length == 0) {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -603,19 +604,20 @@ public boolean containsFile(String localFilename, String checksum) {
* @param segmentFiles segment files that are part of the shard at the time of the latest refresh
* @param segmentInfosSnapshot SegmentInfos bytes to store as part of metadata file
* @param storeDirectory instance of local directory to temporarily create metadata file before upload
* @param primaryTerm primary term to be used in the name of metadata file
* @param translogGeneration translog generation
* @param replicationCheckpoint ReplicationCheckpoint of primary shard
* @throws IOException in case of I/O error while uploading the metadata file
*/
public void uploadMetadata(
Collection<String> segmentFiles,
SegmentInfos segmentInfosSnapshot,
Directory storeDirectory,
long primaryTerm,
long translogGeneration
long translogGeneration,
ReplicationCheckpoint replicationCheckpoint
) throws IOException {
synchronized (this) {
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(
primaryTerm,
replicationCheckpoint.getPrimaryTerm(),
segmentInfosSnapshot.getGeneration(),
translogGeneration,
metadataUploadCounter.incrementAndGet(),
Expand Down Expand Up @@ -646,8 +648,7 @@ public void uploadMetadata(
new RemoteSegmentMetadata(
RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments),
segmentInfoSnapshotByteArray,
primaryTerm,
segmentInfosSnapshot.getGeneration()
replicationCheckpoint
)
);
}
Expand Down
10 changes: 6 additions & 4 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -845,22 +845,24 @@ private void cleanupFiles(Collection<String> filesToConsiderForCleanup, String r
* @param tmpToFileName Map of temporary replication file to actual file name
* @param infosBytes bytes[] of SegmentInfos supposed to be sent over by primary excluding segment_N file
* @param segmentsGen segment generation number
* @param consumer consumer for generated SegmentInfos
* @param finalizeConsumer consumer for action on passed in SegmentInfos
* @param renameConsumer consumer for action on temporary copied over files
* @throws IOException Exception while reading store and building segment infos
*/
public void buildInfosFromBytes(
Map<String, String> tmpToFileName,
byte[] infosBytes,
long segmentsGen,
CheckedConsumer<SegmentInfos, IOException> consumer
CheckedConsumer<SegmentInfos, IOException> finalizeConsumer,
CheckedConsumer<Map<String, String>, IOException> renameConsumer
) throws IOException {
metadataLock.writeLock().lock();
try {
final List<String> values = new ArrayList<>(tmpToFileName.values());
incRefFileDeleter(values);
try {
renameTempFilesSafe(tmpToFileName);
consumer.accept(buildSegmentInfos(infosBytes, segmentsGen));
renameConsumer.accept(tmpToFileName);
finalizeConsumer.accept(buildSegmentInfos(infosBytes, segmentsGen));
} finally {
decRefFileDeleter(values);
}
Expand Down
Loading