Skip to content

Commit

Permalink
Remove unnecessary refresh listeners from NRTReplicationReaderManager. (
Browse files Browse the repository at this point in the history
#8859)

* Remove unnecessary refresh listeners from NRTReplicationReaderManager.

This change removes RefreshListeners used by InternalEngine to provide waitFor functionality.
These listeners were previously registered onto NRT replicas only to be force released on the next refresh cycle without actually refreshing the reader.

This change also removes the unnecessary blocking refresh from NRTReaderManager because we no longer have conflicting refresh invocations from scheduledRefresh.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Reduce the amount of docs ingested with testPrimaryRelocation and testPrimaryRelocationWithSegRepFailure.

These tests were ingesting 100-1k docs and randomly selecting a refresh policy. Wtih the IMMEDIATE refresh policy a blocking refresh is performed that increase the time required for the primary to block operations for relocation.  On my machine this change reduces the test time with max docs from 1m to 5-6s.

Signed-off-by: Marc Handalian <handalm@amazon.com>

---------

Signed-off-by: Marc Handalian <handalm@amazon.com>
(cherry picked from commit 4319f2b)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Jul 26, 2023
1 parent 4c131e6 commit 85d1660
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void testPrimaryRelocation() throws Exception {
createIndex(1);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(100, 1000);
final int initialDocCount = scaledRandomIntBetween(10, 100);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 0; i < initialDocCount; i++) {
Expand Down Expand Up @@ -137,7 +137,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
createIndex(1);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(100, 1000);
final int initialDocCount = scaledRandomIntBetween(10, 100);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 0; i < initialDocCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
this.readerManager = readerManager;
this.readerManager.addListener(completionStatsCache);
for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) {
this.readerManager.addListener(listener);
}
// NRT Replicas do not have a concept of Internal vs External reader managers.
// We also do not want to wire up refresh listeners for waitFor & pending refresh location.
// which are the current external listeners set from IndexShard.
// Only wire up the internal listeners.
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) {
this.readerManager.addListener(listener);
}
Expand Down Expand Up @@ -355,22 +356,12 @@ public List<Segment> segments(boolean verbose) {

@Override
public void refresh(String source) throws EngineException {
maybeRefresh(source);
// Refresh on this engine should only ever happen in the reader after new segments arrive.
}

@Override
public boolean maybeRefresh(String source) throws EngineException {
ensureOpen();
try {
return readerManager.maybeRefresh();
} catch (IOException e) {
try {
failEngine("refresh failed source[" + source + "]", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void updateSegments(SegmentInfos infos) throws IOException {
// is always increased.
infos.updateGeneration(currentInfos);
currentInfos = infos;
maybeRefreshBlocking();
maybeRefresh();
}

public SegmentInfos getSegmentInfos() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4434,7 +4434,8 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
readAllowed = isReadAllowed();
}
}
if (readAllowed) {
// NRT Replicas will not accept refresh listeners.
if (readAllowed && isSegmentReplicationAllowed() == false) {
refreshListeners.addOrNotify(location, listener);
} else {
// we're not yet ready fo ready for reads, just ignore refresh cycles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException {
closeShards(indexShard);
}

public void testNRTReplicasDoNotAcceptRefreshListeners() throws IOException {
final IndexShard indexShard = newStartedShard(false, settings, new NRTReplicationEngineFactory());
indexShard.addRefreshListener(mock(Translog.Location.class), Assert::assertFalse);
closeShards(indexShard);
}

public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
Expand Down

0 comments on commit 85d1660

Please sign in to comment.