Skip to content

Commit efe4877

Browse files
peteralfonsiPeter Alfonsijainankitk
authored
Remove unnecessary iteration in request cache clear (#19263)
Signed-off-by: Peter Alfonsi <petealft@amazon.com> Signed-off-by: Peter Alfonsi <peter.alfonsi@gmail.com> Signed-off-by: Ankit Jain <jainankitk@apache.org> Co-authored-by: Peter Alfonsi <petealft@amazon.com> Co-authored-by: Ankit Jain <jainankitk@apache.org>
1 parent 17b762a commit efe4877

File tree

6 files changed

+80
-0
lines changed

6 files changed

+80
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
9797
- Cache serialised cluster state based on cluster state version and node version.([#19307](https://github.com/opensearch-project/OpenSearch/pull/19307))
9898
- Fix stats API in store-subdirectory module's SubdirectoryAwareStore ([#19470](https://github.com/opensearch-project/OpenSearch/pull/19470))
9999
- Handle negative search request nodes stats ([#19340](https://github.com/opensearch-project/OpenSearch/pull/19340))
100+
- Remove unnecessary iteration per-shard in request cache cleanup ([#19263](https://github.com/opensearch-project/OpenSearch/pull/19263))
100101

101102
### Dependencies
102103
- Bump `com.gradleup.shadow:shadow-gradle-plugin` from 8.3.5 to 8.3.9 ([#19400](https://github.com/opensearch-project/OpenSearch/pull/19400))

server/src/main/java/org/opensearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@
3232

3333
package org.opensearch.action.admin.indices.cache.clear;
3434

35+
import org.apache.logging.log4j.LogManager;
36+
import org.apache.logging.log4j.Logger;
3537
import org.opensearch.action.support.ActionFilters;
38+
import org.opensearch.action.support.broadcast.BroadcastShardOperationFailedException;
3639
import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
3740
import org.opensearch.cluster.ClusterState;
3841
import org.opensearch.cluster.block.ClusterBlockException;
@@ -68,6 +71,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAc
6871
private final IndicesService indicesService;
6972

7073
private final Node node;
74+
private final Logger clearActionLogger = LogManager.getLogger(getClass());
7175

7276
@Inject
7377
public TransportClearIndicesCacheAction(
@@ -135,6 +139,15 @@ protected EmptyResult shardOperation(ClearIndicesCacheRequest request, ShardRout
135139
return EmptyResult.INSTANCE;
136140
}
137141

142+
@Override
143+
protected void nodeOperation(List<EmptyResult> results, List<BroadcastShardOperationFailedException> accumulatedExceptions) {
144+
try {
145+
indicesService.forceClearNodewideCaches();
146+
} catch (Exception e) {
147+
clearActionLogger.warn("Node-wide force cache clear failed; marked keys will be cleaned at next scheduled cache cleanup", e);
148+
}
149+
}
150+
138151
/**
139152
* The refresh request works against *all* shards.
140153
*/

server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,13 @@ protected abstract Response newResponse(
238238
*/
239239
protected abstract ShardsIterator shards(ClusterState clusterState, Request request, String[] concreteIndices);
240240

241+
/**
242+
* Executes a node-level operation. This method is called one time per node, after all shard-level operations have completed.
243+
* @param results List of results from the completed shard-level operations.
244+
* @param accumulatedExceptions List of any exceptions thrown by the shard-level operations.
245+
*/
246+
protected void nodeOperation(List<ShardOperationResult> results, List<BroadcastShardOperationFailedException> accumulatedExceptions) {}
247+
241248
/**
242249
* Executes a global block check before polling the cluster state.
243250
*
@@ -479,6 +486,8 @@ public void messageReceived(final NodeRequest request, TransportChannel channel,
479486
}
480487
}
481488

489+
nodeOperation(results, accumulatedExceptions);
490+
482491
channel.sendResponse(new NodeResponse(request.getNodeId(), totalShards, results, accumulatedExceptions));
483492
}
484493

server/src/main/java/org/opensearch/indices/IndicesRequestCache.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,9 @@ void setStalenessThreshold(String threshold) {
266266

267267
void clear(CacheEntity entity) {
268268
cacheCleanupManager.enqueueCleanupKey(new CleanupKey(entity, null));
269+
}
270+
271+
void forceCleanCache() {
269272
cacheCleanupManager.forceCleanCache();
270273
}
271274

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2117,6 +2117,8 @@ private QueryRewriteContext getRewriteContext(LongSupplier nowInMillis, boolean
21172117

21182118
/**
21192119
* Clears the caches for the given shard id if the shard is still allocated on this node
2120+
* Query cache and field data cache keys are cleared immediately by this method; request cache keys are only marked for cleanup
2121+
* TODO: In a future PR field data cache keys will also only be marked for cleanup.
21202122
*/
21212123
public void clearIndexShardCache(ShardId shardId, boolean queryCache, boolean fieldDataCache, boolean requestCache, String... fields) {
21222124
final IndexService service = indexService(shardId.getIndex());
@@ -2129,6 +2131,14 @@ public void clearIndexShardCache(ShardId shardId, boolean queryCache, boolean fi
21292131
}
21302132
}
21312133

2134+
/**
2135+
* Force clear node-wide request cache, which will remove any keys which have been previously marked for cleanup.
2136+
*/
2137+
public void forceClearNodewideCaches() {
2138+
indicesRequestCache.forceCleanCache();
2139+
// TODO: Field data cache will also be cleared here in a future PR.
2140+
}
2141+
21322142
/**
21332143
* Returns a function which given an index name, returns a predicate which fields must match in order to be returned by get mappings,
21342144
* get index, get field mappings and field capabilities API. Useful to filter the fields that such API return.

server/src/test/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,18 @@
5959
import org.opensearch.cluster.routing.ShardsIterator;
6060
import org.opensearch.cluster.routing.TestShardRouting;
6161
import org.opensearch.cluster.service.ClusterService;
62+
import org.opensearch.common.metrics.CounterMetric;
6263
import org.opensearch.common.settings.Settings;
6364
import org.opensearch.common.util.concurrent.ThreadContext;
65+
import org.opensearch.core.action.ActionListener;
6466
import org.opensearch.core.action.support.DefaultShardOperationFailedException;
6567
import org.opensearch.core.common.io.stream.StreamInput;
6668
import org.opensearch.core.common.io.stream.Writeable;
6769
import org.opensearch.core.index.Index;
6870
import org.opensearch.core.index.shard.ShardId;
6971
import org.opensearch.core.rest.RestStatus;
7072
import org.opensearch.core.transport.TransportResponse;
73+
import org.opensearch.tasks.Task;
7174
import org.opensearch.telemetry.tracing.noop.NoopTracer;
7275
import org.opensearch.test.OpenSearchTestCase;
7376
import org.opensearch.test.transport.CapturingTransport;
@@ -99,6 +102,7 @@
99102
import static org.hamcrest.CoreMatchers.containsString;
100103
import static org.hamcrest.CoreMatchers.is;
101104
import static org.hamcrest.object.HasToString.hasToString;
105+
import static org.mockito.Mockito.mock;
102106

103107
public class TransportBroadcastByNodeActionTests extends OpenSearchTestCase {
104108

@@ -137,6 +141,7 @@ class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction<
137141
Response,
138142
TransportBroadcastByNodeAction.EmptyResult> {
139143
private final Map<ShardRouting, Object> shards = new HashMap<>();
144+
private final CounterMetric nodeOperationCount = new CounterMetric();
140145

141146
TestTransportBroadcastByNodeAction(
142147
TransportService transportService,
@@ -206,6 +211,18 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, Request re
206211
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices);
207212
}
208213

214+
@Override
215+
protected void nodeOperation(
216+
List<TransportBroadcastByNodeAction.EmptyResult> results,
217+
List<BroadcastShardOperationFailedException> accumulatedExceptions
218+
) {
219+
nodeOperationCount.inc();
220+
}
221+
222+
public long getNodeOperationCount() {
223+
return nodeOperationCount.count();
224+
}
225+
209226
public Map<ShardRouting, Object> getResults() {
210227
return shards;
211228
}
@@ -452,6 +469,33 @@ public void testOperationExecution() throws Exception {
452469
assertThat(exception.getMessage(), is("operation indices:admin/test failed"));
453470
assertThat(exception, hasToString(containsString("operation failed")));
454471
}
472+
assertEquals(1, action.getNodeOperationCount());
473+
}
474+
475+
public void testNodeLevelHookForMultiNode() throws Exception {
476+
// Manually send the request from coordinator --> target nodes, action.doExecute() does not do that in this test case setup
477+
478+
Request request = new Request(new String[] { TEST_INDEX });
479+
final PlainActionFuture<Response> listener = PlainActionFuture.newFuture();
480+
action.doExecute(mock(Task.class), request, ActionListener.wrap(listener::onResponse, listener::onFailure));
481+
482+
// Get captured NodeRequest from coordinator for each target node
483+
Map<String, List<CapturingTransport.CapturedRequest>> captured = transport.getCapturedRequestsByTargetNodeAndClear();
484+
assertFalse("expected at least one target node", captured.isEmpty());
485+
486+
final TransportBroadcastByNodeAction<?, ?, ?>.BroadcastByNodeTransportRequestHandler handler =
487+
action.new BroadcastByNodeTransportRequestHandler();
488+
489+
for (Map.Entry<String, List<CapturingTransport.CapturedRequest>> e : captured.entrySet()) {
490+
CapturingTransport.CapturedRequest cr = e.getValue().get(0);
491+
assertEquals(action.transportNodeBroadcastAction, cr.action);
492+
TransportBroadcastByNodeAction.NodeRequest nodeReq = (TransportBroadcastByNodeAction.NodeRequest) cr.request;
493+
494+
final PlainActionFuture<TransportResponse> future = PlainActionFuture.newFuture();
495+
TestTransportChannel channel = new TestTransportChannel(future);
496+
handler.messageReceived(nodeReq, channel, null);
497+
}
498+
assertEquals("nodeOperation should run once per target node", captured.size(), action.getNodeOperationCount());
455499
}
456500

457501
public void testResultAggregation() throws ExecutionException, InterruptedException {

0 commit comments

Comments
 (0)