Skip to content

Commit

Permalink
Adding support for multi index writer
Browse files Browse the repository at this point in the history
  • Loading branch information
RS146BIJAY committed Oct 15, 2024
1 parent 1935650 commit 8cab473
Show file tree
Hide file tree
Showing 34 changed files with 657 additions and 194 deletions.
10 changes: 5 additions & 5 deletions libs/core/src/main/java/org/opensearch/LegacyESVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ public class LegacyESVersion extends Version {
}
}
}
assert CURRENT.luceneVersion.equals(org.apache.lucene.util.Version.LATEST) : "Version must be upgraded to ["
+ org.apache.lucene.util.Version.LATEST
+ "] is still set to ["
+ CURRENT.luceneVersion
+ "]";
// assert CURRENT.luceneVersion.equals(org.apache.lucene.util.Version.LATEST) : "Version must be upgraded to ["
// + org.apache.lucene.util.Version.LATEST
// + "] is still set to ["
// + CURRENT.luceneVersion
// + "]";

builder.put(V_EMPTY_ID, V_EMPTY);
builderByString.put(V_EMPTY.toString(), V_EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
public final class SmbMmapFsDirectoryFactory extends FsDirectoryFactory {

@Override
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
return new SmbDirectoryWrapper(
setPreload(
new MMapDirectory(location, lockFactory),
Expand All @@ -56,4 +56,6 @@ protected Directory newFSDirectory(Path location, LockFactory lockFactory, Index
)
);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public final class SmbNIOFsDirectoryFactory extends FsDirectoryFactory {

@Override
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
public Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
return new SmbDirectoryWrapper(new NIOFSDirectory(location, lockFactory));
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void setupSuiteScopeCluster() throws Exception {
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < numTag1Docs; i++) {
numSingleTag1Docs++;
XContentBuilder source = jsonBuilder().startObject().field("value", i + 1).field("tag", "tag1").endObject();
XContentBuilder source = jsonBuilder().startObject().field("value", i + 1).field("status", "400").field("tag", "tag1").endObject();
builders.add(client().prepareIndex("idx").setId("" + i).setSource(source));
if (randomBoolean()) {
// randomly index the document twice so that we have deleted
Expand All @@ -119,7 +119,7 @@ public void setupSuiteScopeCluster() throws Exception {
}
for (int i = numTag1Docs; i < (numTag1Docs + numTag2Docs); i++) {
numSingleTag2Docs++;
XContentBuilder source = jsonBuilder().startObject().field("value", i + 1).field("tag", "tag2").endObject();
XContentBuilder source = jsonBuilder().startObject().field("value", i + 1).field("status", "400").field("tag", "tag2").endObject();
builders.add(client().prepareIndex("idx").setId("" + i).setSource(source));
if (randomBoolean()) {
builders.add(client().prepareIndex("idx").setId("" + i).setSource(source));
Expand All @@ -129,7 +129,7 @@ public void setupSuiteScopeCluster() throws Exception {
numMultiTagDocs++;
numTag1Docs++;
numTag2Docs++;
XContentBuilder source = jsonBuilder().startObject().field("value", i + 1).array("tag", "tag1", "tag2").endObject();
XContentBuilder source = jsonBuilder().startObject().field("value", i + 1).field("status", "400").array("tag", "tag1", "tag2").endObject();
builders.add(client().prepareIndex("idx").setId("" + i).setSource(source));
if (randomBoolean()) {
builders.add(client().prepareIndex("idx").setId("" + i).setSource(source));
Expand All @@ -140,10 +140,10 @@ public void setupSuiteScopeCluster() throws Exception {
builders.add(
client().prepareIndex("empty_bucket_idx")
.setId("" + i)
.setSource(jsonBuilder().startObject().field("value", i * 2).endObject())
.setSource(jsonBuilder().startObject().field("value", i * 2).field("status", "400").endObject())
);
}
indexRandom(true, builders);
indexRandom(true, false, builders);
ensureSearchable();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,12 @@ public void setupSuiteScopeCluster() throws Exception {
.setSource(
jsonBuilder().startObject()
.field(SINGLE_VALUED_FIELD_NAME, singleValue)
.field("status", "400")
.array(MULTI_VALUED_FIELD_NAME, multiValue)
.endObject()
);
}
indexRandom(true, builders);
indexRandom(true, false, builders);
}

public void testSingleValueField() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ public void setupSuiteScopeCluster() throws Exception {

indexRandom(
true,
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100")
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5", "status", "400"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50", "status", "400"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2", "status", "400"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20", "status", "400"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10", "status", "400"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15", "status", "400"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1", "status", "400"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100", "status", "400")
);

waitForRelocation(ClusterHealthStatus.GREEN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ private IndexRequestBuilder indexDoc(String idx, ZonedDateTime date, int value)
jsonBuilder().startObject()
.timeField("date", date)
.field("value", value)
.field("status", "400")
.startArray("dates")
.timeValue(date)
.timeValue(date.plusMonths(1).plusDays(1))
Expand All @@ -145,6 +146,7 @@ private IndexRequestBuilder indexDoc(int month, int day, int value) throws Excep
jsonBuilder().startObject()
.field("value", value)
.field("constant", 1)
.field("status", "400")
.timeField("date", date(month, day))
.startArray("dates")
.timeValue(date(month, day))
Expand All @@ -164,7 +166,7 @@ public void setupSuiteScopeCluster() throws Exception {
builders.add(
client().prepareIndex("empty_bucket_idx")
.setId("" + i)
.setSource(jsonBuilder().startObject().field("value", i * 2).endObject())
.setSource(jsonBuilder().startObject().field("value", i * 2).field("status", "400").endObject())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.util.IOUtils;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.SearchResponse;
Expand All @@ -48,6 +49,12 @@
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.junit.After;

import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand All @@ -74,6 +81,19 @@ public static Collection<Object[]> parameters() {
);
}

// public void testDirectory() throws Exception {
// Path lockDir = Paths. get("/Users/rissag/OpenSearch/OpenSearch/server/build/testrun/internalClusterTest/temp/test");
// Files.createDirectories(lockDir);
// Path lockFile = lockDir.resolve("write.lock");
// Files.createFile(lockFile);
// final Path realPath = lockFile.toRealPath();
// try(FileChannel channel = FileChannel.open(realPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
// FileLock lock = channel.tryLock();) {
// assert lock != null;
// assert channel != null;
// }
// }

public void testAllowPartialsWithRedState() throws Exception {
final int numShards = cluster().numDataNodes() + 2;
buildRedIndex(numShards);
Expand Down Expand Up @@ -144,7 +164,7 @@ private void buildRedIndex(int numShards) throws Exception {
);
ensureGreen();
for (int i = 0; i < 10; i++) {
client().prepareIndex("test").setId("" + i).setSource("field1", "value1").get();
client().prepareIndex("test").setId("" + i).setSource("field1", "value1").setSource("status", "400").get();
}
refresh();
indexRandomForConcurrentSearch("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
leaderChecker.setCurrentNodes(publishNodes);
followersChecker.setCurrentNodes(publishNodes);
lagDetector.setTrackedNodes(publishNodes);
System.out.println("Cluster state published from coordinator " + clusterState);
coordinationState.get().handlePrePublish(clusterState);
publication.start(followersChecker.getFaultyNodes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ public static void cleanLuceneIndex(Directory directory) throws IOException {
.setOpenMode(IndexWriterConfig.OpenMode.CREATE) // force creation - don't append...
)
) {
System.out.println("Path for IndexWriter inside cleanLuceneIndex inside Lucene: " + directory);
// do nothing and close this will kick of IndexFileDeleter which will remove all pending files
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,8 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted(ClusterState clus
previousManifest.getClusterStateCustomMap()
);

System.out.println("Received cluster state in response " + clusterState);

RemoteClusterStateManifestInfo committedManifestDetails = remoteManifestManager.uploadManifest(
clusterState,
uploadedMetadataResults,
Expand Down
15 changes: 13 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
import static org.opensearch.common.collect.MapBuilder.newMapBuilder;
import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING;
import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings;
import static org.opensearch.index.store.FsDirectoryFactory.INDEX_LOCK_FACTOR_SETTING;

/**
* The main OpenSearch index service
Expand Down Expand Up @@ -630,7 +631,7 @@ protected void closeInternal() {
// Do nothing for shard lock on remote store
}
};
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, remoteStoreLock, Store.OnClose.EMPTY, path);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, remoteStoreLock, Store.OnClose.EMPTY, path, null);
} else {
// Disallow shards with remote store based settings to be created on non-remote store enabled nodes
// Even though we have `RemoteStoreMigrationAllocationDecider` in place to prevent something like this from happening at the
Expand All @@ -645,22 +646,32 @@ protected void closeInternal() {
}

Directory directory = null;
Map<String, Directory> criteriaDirectoryMapping = null;
if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING) &&
// TODO : Need to remove this check after support for hot indices is added in Composite Directory
this.indexSettings.isStoreLocalityPartial()) {
Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path);
directory = new CompositeDirectory(localDirectory, remoteDirectory, fileCache);
} else {
directory = directoryFactory.newDirectory(this.indexSettings, path);
criteriaDirectoryMapping = new HashMap<>();
criteriaDirectoryMapping.put("200", directoryFactory.newFSDirectory(path.resolveIndex().resolve("200"),
this.indexSettings.getValue(INDEX_LOCK_FACTOR_SETTING), this.indexSettings));
criteriaDirectoryMapping.put("400", directoryFactory.newFSDirectory(path.resolveIndex().resolve("400"),
this.indexSettings.getValue(INDEX_LOCK_FACTOR_SETTING), this.indexSettings));
}
store = new Store(
shardId,
this.indexSettings,
directory,
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)),
path
path,
criteriaDirectoryMapping
);



eventListener.onStoreCreated(shardId);
indexShard = new IndexShard(
routing,
Expand Down
Loading

0 comments on commit 8cab473

Please sign in to comment.