Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary refresh listeners from NRTReplicationReaderManager. #8859

Merged
merged 2 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void testPrimaryRelocation() throws Exception {
createIndex(1);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(100, 1000);
final int initialDocCount = scaledRandomIntBetween(10, 100);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 0; i < initialDocCount; i++) {
Expand Down Expand Up @@ -137,7 +137,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
createIndex(1);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(100, 1000);
final int initialDocCount = scaledRandomIntBetween(10, 100);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 0; i < initialDocCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
this.readerManager = readerManager;
this.readerManager.addListener(completionStatsCache);
for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) {
this.readerManager.addListener(listener);
}
// NRT Replicas do not have a concept of Internal vs External reader managers.
// We also do not want to wire up refresh listeners for waitFor & pending refresh location.
// which are the current external listeners set from IndexShard.
// Only wire up the internal listeners.
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) {
this.readerManager.addListener(listener);
}
Expand Down Expand Up @@ -322,22 +323,12 @@ public List<Segment> segments(boolean verbose) {

@Override
public void refresh(String source) throws EngineException {
maybeRefresh(source);
// Refresh on this engine should only ever happen in the reader after new segments arrive.
}
mch2 marked this conversation as resolved.
Show resolved Hide resolved

@Override
public boolean maybeRefresh(String source) throws EngineException {
ensureOpen();
try {
return readerManager.maybeRefresh();
} catch (IOException e) {
try {
failEngine("refresh failed source[" + source + "]", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void updateSegments(SegmentInfos infos) throws IOException {
// is always increased.
infos.updateGeneration(currentInfos);
currentInfos = infos;
maybeRefreshBlocking();
maybeRefresh();
}

public SegmentInfos getSegmentInfos() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4423,7 +4423,8 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
readAllowed = isReadAllowed();
}
}
if (readAllowed) {
// NRT Replicas will not accept refresh listeners.
if (readAllowed && isSegmentReplicationAllowed() == false) {
refreshListeners.addOrNotify(location, listener);
} else {
// we're not yet ready fo ready for reads, just ignore refresh cycles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException {
closeShards(indexShard);
}

public void testNRTReplicasDoNotAcceptRefreshListeners() throws IOException {
final IndexShard indexShard = newStartedShard(false, settings, new NRTReplicationEngineFactory());
indexShard.addRefreshListener(mock(Translog.Location.class), Assert::assertFalse);
closeShards(indexShard);
}

public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
Expand Down