Skip to content

Commit

Permalink
Fixing refresh flow
Browse files Browse the repository at this point in the history
  • Loading branch information
RS146BIJAY committed Oct 20, 2024
1 parent 8cab473 commit 134283c
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 41 deletions.
18 changes: 9 additions & 9 deletions server/src/main/java/org/opensearch/bootstrap/Bootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -473,15 +473,15 @@ private static void closeSysError() {
}

private static void checkLucene() {
if (Version.CURRENT.luceneVersion.equals(org.apache.lucene.util.Version.LATEST) == false) {
throw new AssertionError(
"Lucene version mismatch this version of OpenSearch requires lucene version ["
+ Version.CURRENT.luceneVersion
+ "] but the current lucene version is ["
+ org.apache.lucene.util.Version.LATEST
+ "]"
);
}
// if (Version.CURRENT.luceneVersion.equals(org.apache.lucene.util.Version.LATEST) == false) {
// throw new AssertionError(
// "Lucene version mismatch this version of OpenSearch requires lucene version ["
// + Version.CURRENT.luceneVersion
// + "] but the current lucene version is ["
// + org.apache.lucene.util.Version.LATEST
// + "]"
// );
// }
}

}
45 changes: 38 additions & 7 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -742,13 +742,13 @@ protected final GetResult getFromSearcher(
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public final SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper) throws EngineException {
return acquireSearcherSupplier(wrapper, SearcherScope.EXTERNAL);
return acquireSearcherSupplier(wrapper, SearcherScope.EXTERNAL, getReferenceManager(SearcherScope.EXTERNAL));
}

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope, ReferenceManager<OpenSearchDirectoryReader> referenceManager) throws EngineException {
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
Expand All @@ -757,7 +757,11 @@ public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wra
}
Releasable releasable = store::decRef;
try {
ReferenceManager<OpenSearchDirectoryReader> referenceManager = getReferenceManager(scope);
if (referenceManager == null) {
referenceManager = getReferenceManager(scope);
}

ReferenceManager<OpenSearchDirectoryReader> referenceManagerInternal = referenceManager;
OpenSearchDirectoryReader acquire = referenceManager.acquire();
SearcherSupplier reader = new SearcherSupplier(wrapper) {
@Override
Expand All @@ -776,7 +780,7 @@ public Searcher acquireSearcherInternal(String source) {
@Override
protected void doClose() {
try {
referenceManager.release(acquire);
referenceManagerInternal.release(acquire);
} catch (IOException e) {
throw new UncheckedIOException("failed to close", e);
} catch (AlreadyClosedException e) {
Expand Down Expand Up @@ -810,9 +814,13 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin
}

public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
return acquireSearcher(source, scope, wrapper, getReferenceManager(scope));
}

public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper, ReferenceManager<OpenSearchDirectoryReader> referenceManager) throws EngineException {
SearcherSupplier releasable = null;
try {
SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope);
SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope, referenceManager);
Searcher searcher = reader.acquireSearcher(source);
releasable = null;
return new Searcher(
Expand All @@ -830,6 +838,11 @@ public Searcher acquireSearcher(String source, SearcherScope scope, Function<Sea

protected abstract ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(SearcherScope scope);

// TODO: Overrride this in InternalEngine
protected List<ReferenceManager<OpenSearchDirectoryReader>> getChildLevelReferenceManagerList() {
return null;
}

boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) {
return true;
}
Expand Down Expand Up @@ -1152,10 +1165,28 @@ public boolean refreshNeeded() {
store. this violates the assumption that all files are closed when
the store is closed so we need to make sure we increment it here
*/

List<ReferenceManager<OpenSearchDirectoryReader>> childDirectoryReaderReferenceList = getChildLevelReferenceManagerList();
try {
try (Searcher searcher = acquireSearcher("refresh_needed", SearcherScope.EXTERNAL)) {
return searcher.getDirectoryReader().isCurrent() == false;

// For regular scenario.
if (childDirectoryReaderReferenceList == null || childDirectoryReaderReferenceList.isEmpty()) {
try (Searcher searcher = acquireSearcher("refresh_needed", SearcherScope.EXTERNAL)) {
return searcher.getDirectoryReader().isCurrent() == false;
}
} else {
// With criteria based IndexWriter.
for (ReferenceManager<OpenSearchDirectoryReader> childDirectoryReaderReference : childDirectoryReaderReferenceList) {
try (Searcher searcher = acquireSearcher("refresh_needed", SearcherScope.EXTERNAL, Function.identity(), childDirectoryReaderReference)) {
if (searcher.getDirectoryReader().isCurrent() == false) {
return true;
}
}
}

return false;
}

} catch (IOException e) {
logger.error("failed to access searcher manager", e);
failEngine("failed to access searcher manager", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ public class InternalEngine extends Engine {
private final KeyedLock<Long> noOpKeyedLock = new KeyedLock<>();
private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false);

// This is required to prevent cyclic dependency between reader refresh and parentIndexWriter update.
// We are updating parent indexwriter on refresh and we are performing refresh when there is update.
private final List<ReferenceManager<OpenSearchDirectoryReader>> groupLevelExternalReaderManagers = new ArrayList<>();

/**
* If multiple writes passed {@link InternalEngine#tryAcquireInFlightDocs(Operation, int)} but they haven't adjusted
* {@link IndexWriter#getPendingNumDocs()} yet, then IndexWriter can fail with too many documents. In this case, we have to fail
Expand Down Expand Up @@ -328,7 +332,11 @@ public void onFailure(String reason, Exception ex) {
throw e;
}
}
externalReaderManager = createReaderManager(new RefreshWarmerListener(logger, isClosed, engineConfig));
externalReaderManager = createReaderManager(new RefreshWarmerListener(logger, isClosed, engineConfig), parentIndexWriter);
for (IndexWriter groupLevelIndexWriter : criteriaBasedIndexWriters.values()) {
groupLevelExternalReaderManagers.add(createReaderManager(new RefreshWarmerListener(logger, isClosed, engineConfig), groupLevelIndexWriter));
}

internalReaderManager = externalReaderManager.internalReaderManager;
this.internalReaderManager = internalReaderManager;
this.externalReaderManager = externalReaderManager;
Expand Down Expand Up @@ -360,10 +368,8 @@ public void onFailure(String reason, Exception ex) {
success = true;
} finally {
if (success == false) {
for (OpenSearchConcurrentMergeScheduler scheduler: mergeSchedulerCriteriaMap.values()) {
IOUtils.closeWhileHandlingException(writer, translogManagerRef, internalReaderManager, externalReaderManager, scheduler);
}

IOUtils.closeWhileHandlingException(writer, translogManagerRef, internalReaderManager, externalReaderManager);
IOUtils.closeWhileHandlingException(mergeSchedulerCriteriaMap.values());
if (isClosed.get() == false) {
// failure we need to dec the store reference
store.decRef();
Expand Down Expand Up @@ -579,13 +585,13 @@ private long getFlushingBytes() {
return flushingBytes;
}

private ExternalReaderManager createReaderManager(RefreshWarmerListener externalRefreshListener) throws EngineException {
private ExternalReaderManager createReaderManager(RefreshWarmerListener externalRefreshListener, IndexWriter internalIndexWriter) throws EngineException {
boolean success = false;
OpenSearchReaderManager internalReaderManager = null;
try {
try {
final OpenSearchDirectoryReader directoryReader = OpenSearchDirectoryReader.wrap(
DirectoryReader.open(parentIndexWriter),
DirectoryReader.open(internalIndexWriter),
shardId
);
internalReaderManager = new OpenSearchReaderManager(directoryReader);
Expand All @@ -596,21 +602,19 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external
} catch (IOException e) {
maybeFailEngine("start", e);
try {
parentIndexWriter.rollback();
for (IndexWriter indexWriter: criteriaBasedIndexWriters.values()) {
indexWriter.rollback();
}
internalIndexWriter.rollback();
// for (IndexWriter indexWriter: criteriaBasedIndexWriters.values()) {
// indexWriter.rollback();
// }
} catch (IOException inner) { // iw is closed below
e.addSuppressed(inner);
}
throw new EngineCreationFailureException(shardId, "failed to open reader on writer", e);
}
} finally {
if (success == false) { // release everything we created on a failure
IOUtils.closeWhileHandlingException(internalReaderManager, parentIndexWriter);
for (IndexWriter indexWriter: criteriaBasedIndexWriters.values()) {
IOUtils.closeWhileHandlingException(internalReaderManager, indexWriter);
}
IOUtils.closeWhileHandlingException(internalReaderManager, internalIndexWriter);
// IOUtils.closeWhileHandlingException(criteriaBasedIndexWriters.values());
}
}
}
Expand Down Expand Up @@ -1216,11 +1220,16 @@ private String getGroupingCriteriaForDoc(final Iterable<? extends Iterable<? ext
while (docIt.hasNext()) {
IndexableField field = docIt.next();
if (field.stringValue() != null && field.name().equals("status")) {
return field.stringValue();
int statusCode = Integer.parseInt(field.stringValue())/100;
if (statusCode <= 3) {
return "200";
} else {
return "400";
}
}
}

return "0";
return "200";
}

private void addDocs(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
Expand Down Expand Up @@ -1834,16 +1843,26 @@ final boolean refresh(String source, SearcherScope scope, boolean block) throws
SegmentInfos successLogSegmentInfos = r2.getSegmentInfos();
addPrefixToSegmentInfoAttribute(clientErrorLogSegmentInfos, "400");
addPrefixToSegmentInfoAttribute(successLogSegmentInfos, "200");
parentIndexWriter.addIndexes(r1.getSegmentInfos(), r2.getSegmentInfos());
System.out.println("Segment Infos for directory " + store.directory() +
" state during refresh for 400: " + clientErrorLogSegmentInfos + " and 200: " + successLogSegmentInfos);
parentIndexWriter.addIndexes(clientErrorLogSegmentInfos, successLogSegmentInfos);
// even though we maintain 2 managers we really do the heavy-lifting only once.
// the second refresh will only do the extra work we have to do for warming caches etc.
ReferenceManager<OpenSearchDirectoryReader> referenceManager = getReferenceManager(scope);
// it is intentional that we never refresh both internal / external together
// Refresh both child and Parent ReaderManager together.
if (block) {
referenceManager.maybeRefreshBlocking();
for (ReferenceManager<OpenSearchDirectoryReader> groupLevelExternalReaderManager: groupLevelExternalReaderManagers) {
groupLevelExternalReaderManager.maybeRefreshBlocking();
}

refreshed = true;
} else {
refreshed = referenceManager.maybeRefresh();
for (ReferenceManager<OpenSearchDirectoryReader> groupLevelExternalReaderManager: groupLevelExternalReaderManagers) {
groupLevelExternalReaderManager.maybeRefresh();
}
}
} catch (Exception ex) {
throw new IOException("Failed refresh", ex);
Expand Down Expand Up @@ -1883,6 +1902,11 @@ final boolean refresh(String source, SearcherScope scope, boolean block) throws
return refreshed;
}

@Override
protected List<ReferenceManager<OpenSearchDirectoryReader>> getChildLevelReferenceManagerList() {
return groupLevelExternalReaderManagers;
}

private void addPrefixToSegmentInfoAttribute(SegmentInfos infos, String prefix) {
for (SegmentCommitInfo commitInfo: infos) {
commitInfo.info.putAttribute("segment_name_prefix", prefix);
Expand Down Expand Up @@ -2199,6 +2223,9 @@ private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
for (IndexWriter indexWriter:criteriaBasedIndexWriters.values()) {
indexWriter.deleteUnusedFiles();
}

// TODO Is it needed?
parentIndexWriter.deleteUnusedFiles();
} catch (AlreadyClosedException ignored) {
// That's ok, we'll clean up unused files the next time it's opened.
}
Expand Down Expand Up @@ -2398,6 +2425,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
}
try {
IOUtils.close(externalReaderManager, internalReaderManager);
IOUtils.close(groupLevelExternalReaderManagers);
} catch (Exception e) {
logger.warn("Failed to close ReaderManager", e);
}
Expand Down
14 changes: 9 additions & 5 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1910,7 +1910,7 @@ public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scop
readAllowed();
markSearcherAccessed();
final Engine engine = getEngine();
return engine.acquireSearcherSupplier(this::wrapSearcher, scope);
return engine.acquireSearcherSupplier(this::wrapSearcher, scope, null);
}

public Engine.Searcher acquireSearcher(String source) {
Expand Down Expand Up @@ -4691,14 +4691,18 @@ public final boolean isSearchIdle() {
* Primary shards push out new segments only
* after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving
* a new set of segments.
*
* TODO: Remove this.
*/
public final boolean isSearchIdleSupported() {
// If the index is remote store backed, then search idle is not supported. This is to ensure that async refresh
// task continues to upload to remote store periodically.
if (isRemoteTranslogEnabled() || indexSettings.isAssignedOnRemoteNode()) {
return false;
}
return indexSettings.isSegRepEnabledOrRemoteNode() == false || indexSettings.getNumberOfReplicas() == 0;
// if (isRemoteTranslogEnabled() || indexSettings.isAssignedOnRemoteNode()) {
// return false;
// }
// return indexSettings.isSegRepEnabledOrRemoteNode() == false || indexSettings.getNumberOfReplicas() == 0;

return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
package org.opensearch.test.engine;

import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.search.ReferenceManager;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.EngineException;
Expand Down Expand Up @@ -97,7 +99,7 @@ public Engine.Searcher acquireSearcher(String source, SearcherScope scope) {
}

@Override
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
return super.acquireSearcherSupplier(wrapper.andThen(s -> support().wrapSearcher(s)), scope);
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope, ReferenceManager<OpenSearchDirectoryReader> referenceManager) throws EngineException {
return super.acquireSearcherSupplier(wrapper.andThen(s -> support().wrapSearcher(s)), scope, referenceManager);
}
}

0 comments on commit 134283c

Please sign in to comment.