Skip to content

Commit 423b6d9

Browse files
kaushalmahi12Divyansh Sharma
authored andcommitted
[WLM] Introduce rule cardinality check (opensearch-project#18663)
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
1 parent 7c45235 commit 423b6d9

File tree

6 files changed

+92
-2
lines changed

6 files changed

+92
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2626
- Add Semantic Version field type mapper and extensive unit tests([#18454](https://github.com/opensearch-project/OpenSearch/pull/18454))
2727
- Pass index settings to system ingest processor factories. ([#18708](https://github.com/opensearch-project/OpenSearch/pull/18708))
2828
- Include named queries from rescore contexts in matched_queries array ([#18697](https://github.com/opensearch-project/OpenSearch/pull/18697))
29+
- Add the configurable limit on rule cardinality ([#18663](https://github.com/opensearch-project/OpenSearch/pull/18663))
2930

3031
### Changed
3132
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))

modules/autotagging-commons/common/src/main/java/org/opensearch/rule/RuleQueryMapper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,11 @@ public interface RuleQueryMapper<T> {
2424
* @return
2525
*/
2626
T from(GetRuleRequest request);
27+
28+
/**
29+
* This method returns the cardinality query for the rule, this query should
30+
* be constructed in such a way that it can be used to calculate the cardinality of the rules
31+
* @return
32+
*/
33+
T getCardinalityQuery();
2734
}

modules/autotagging-commons/common/src/main/java/org/opensearch/rule/service/IndexStoredRulePersistenceService.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
2020
import org.opensearch.action.update.UpdateRequest;
2121
import org.opensearch.cluster.service.ClusterService;
22+
import org.opensearch.common.settings.Setting;
2223
import org.opensearch.common.util.concurrent.ThreadContext;
2324
import org.opensearch.common.xcontent.XContentFactory;
2425
import org.opensearch.core.action.ActionListener;
26+
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
2527
import org.opensearch.core.xcontent.ToXContent;
2628
import org.opensearch.index.engine.DocumentMissingException;
2729
import org.opensearch.index.query.QueryBuilder;
@@ -53,8 +55,28 @@
5355
*/
5456
public class IndexStoredRulePersistenceService implements RulePersistenceService {
5557
/**
56-
* The system index name used for storing rules
58+
* Default value for max rules count
5759
*/
60+
public static final int DEFAULT_MAX_ALLOWED_RULE_COUNT = 200;
61+
62+
/**
63+
* max wlm rules setting name
64+
*/
65+
public static final String MAX_RULES_COUNT_SETTING_NAME = "wlm.autotagging.max_rules";
66+
67+
/**
68+
* max wlm rules setting
69+
*/
70+
public static final Setting<Integer> MAX_WLM_RULES_SETTING = Setting.intSetting(
71+
MAX_RULES_COUNT_SETTING_NAME,
72+
DEFAULT_MAX_ALLOWED_RULE_COUNT,
73+
10,
74+
500,
75+
Setting.Property.NodeScope,
76+
Setting.Property.Dynamic
77+
);
78+
79+
private int maxAllowedRulesCount;
5880
private final String indexName;
5981
private final Client client;
6082
private final ClusterService clusterService;
@@ -87,6 +109,12 @@ public IndexStoredRulePersistenceService(
87109
this.maxRulesPerPage = maxRulesPerPage;
88110
this.parser = parser;
89111
this.queryBuilder = queryBuilder;
112+
this.maxAllowedRulesCount = MAX_WLM_RULES_SETTING.get(clusterService.getSettings());
113+
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WLM_RULES_SETTING, this::setMaxAllowedRules);
114+
}
115+
116+
private void setMaxAllowedRules(int maxAllowedRules) {
117+
this.maxAllowedRulesCount = maxAllowedRules;
90118
}
91119

92120
/**
@@ -101,12 +129,27 @@ public void createRule(CreateRuleRequest request, ActionListener<CreateRuleRespo
101129
logger.error("Index {} does not exist", indexName);
102130
listener.onFailure(new IllegalStateException("Index" + indexName + " does not exist"));
103131
} else {
132+
performCardinalityCheck(listener);
104133
Rule rule = request.getRule();
105134
validateNoDuplicateRule(rule, ActionListener.wrap(unused -> persistRule(rule, listener), listener::onFailure));
106135
}
107136
}
108137
}
109138

139+
private void performCardinalityCheck(ActionListener<CreateRuleResponse> listener) {
140+
SearchResponse searchResponse = client.prepareSearch(indexName).setQuery(queryBuilder.getCardinalityQuery()).get();
141+
if (searchResponse.getHits().getTotalHits() != null && searchResponse.getHits().getTotalHits().value() >= maxAllowedRulesCount) {
142+
listener.onFailure(
143+
new OpenSearchRejectedExecutionException(
144+
"This create operation will violate"
145+
+ " the cardinality limit of "
146+
+ DEFAULT_MAX_ALLOWED_RULE_COUNT
147+
+ ". Please delete some stale or redundant rules first"
148+
)
149+
);
150+
}
151+
}
152+
110153
/**
111154
* Validates that no existing rule has the same attribute map as the given rule.
112155
* This validation must be performed one at a time to prevent writing duplicate rules.

modules/autotagging-commons/common/src/main/java/org/opensearch/rule/storage/IndexBasedRuleQueryMapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,9 @@ public QueryBuilder from(GetRuleRequest request) {
5353
}
5454
return boolQuery;
5555
}
56+
57+
@Override
58+
public QueryBuilder getCardinalityQuery() {
59+
return QueryBuilders.matchAllQuery();
60+
}
5661
}

modules/autotagging-commons/common/src/test/java/org/opensearch/rule/service/IndexStoredRulePersistenceServiceTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import org.opensearch.cluster.metadata.Metadata;
2222
import org.opensearch.cluster.service.ClusterService;
2323
import org.opensearch.common.action.ActionFuture;
24+
import org.opensearch.common.settings.ClusterSettings;
2425
import org.opensearch.common.settings.Settings;
2526
import org.opensearch.common.util.concurrent.ThreadContext;
2627
import org.opensearch.core.action.ActionListener;
2728
import org.opensearch.core.common.bytes.BytesArray;
29+
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
2830
import org.opensearch.core.index.shard.ShardId;
2931
import org.opensearch.index.engine.DocumentMissingException;
3032
import org.opensearch.index.query.QueryBuilder;
@@ -50,6 +52,7 @@
5052
import java.util.ArrayList;
5153
import java.util.Collections;
5254
import java.util.HashMap;
55+
import java.util.HashSet;
5356
import java.util.List;
5457
import java.util.Map;
5558
import java.util.Set;
@@ -99,6 +102,11 @@ public void setUp() throws Exception {
99102
client = setUpMockClient(searchRequestBuilder);
100103
rule = mock(Rule.class);
101104
clusterService = mock(ClusterService.class);
105+
Settings testSettings = Settings.EMPTY;
106+
ClusterSettings clusterSettings = new ClusterSettings(testSettings, new HashSet<>());
107+
when(clusterService.getSettings()).thenReturn(testSettings);
108+
clusterSettings.registerSetting(IndexStoredRulePersistenceService.MAX_WLM_RULES_SETTING);
109+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
102110
ClusterState clusterState = mock(ClusterState.class);
103111
Metadata metadata = mock(Metadata.class);
104112
when(clusterService.state()).thenReturn(clusterState);
@@ -109,6 +117,7 @@ public void setUp() throws Exception {
109117
queryBuilder = mock(QueryBuilder.class);
110118
when(queryBuilder.filter(any())).thenReturn(queryBuilder);
111119
when(ruleQueryMapper.from(any(GetRuleRequest.class))).thenReturn(queryBuilder);
120+
when(ruleQueryMapper.getCardinalityQuery()).thenReturn(mock(QueryBuilder.class));
112121
when(ruleEntityParser.parse(anyString())).thenReturn(rule);
113122

114123
rulePersistenceService = new IndexStoredRulePersistenceService(
@@ -144,6 +153,25 @@ public void testCreateRuleOnExistingIndex() throws Exception {
144153
assertNotNull(responseCaptor.getValue().getRule());
145154
}
146155

156+
public void testCardinalityCheckBasedFailure() throws Exception {
157+
CreateRuleRequest createRuleRequest = mock(CreateRuleRequest.class);
158+
when(createRuleRequest.getRule()).thenReturn(rule);
159+
when(rule.toXContent(any(), any())).thenAnswer(invocation -> invocation.getArgument(0));
160+
161+
SearchResponse searchResponse = mock(SearchResponse.class);
162+
when(searchResponse.getHits()).thenReturn(
163+
new SearchHits(new SearchHit[] {}, new TotalHits(10000, TotalHits.Relation.EQUAL_TO), 1.0f)
164+
);
165+
when(searchRequestBuilder.get()).thenReturn(searchResponse);
166+
167+
ActionListener<CreateRuleResponse> listener = mock(ActionListener.class);
168+
rulePersistenceService.createRule(createRuleRequest, listener);
169+
170+
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(OpenSearchRejectedExecutionException.class);
171+
verify(listener).onFailure(exceptionCaptor.capture());
172+
assertNotNull(exceptionCaptor.getValue());
173+
}
174+
147175
public void testConcurrentCreateDuplicateRules() throws InterruptedException {
148176
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
149177
int threadCount = 10;

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@
7575
import java.util.Map;
7676
import java.util.function.Supplier;
7777

78+
import static org.opensearch.rule.service.IndexStoredRulePersistenceService.MAX_WLM_RULES_SETTING;
79+
7880
/**
7981
* Plugin class for WorkloadManagement
8082
*/
@@ -190,7 +192,11 @@ public List<RestHandler> getRestHandlers(
190192

191193
@Override
192194
public List<Setting<?>> getSettings() {
193-
return List.of(WorkloadGroupPersistenceService.MAX_QUERY_GROUP_COUNT, RefreshBasedSyncMechanism.RULE_SYNC_REFRESH_INTERVAL_SETTING);
195+
return List.of(
196+
WorkloadGroupPersistenceService.MAX_QUERY_GROUP_COUNT,
197+
RefreshBasedSyncMechanism.RULE_SYNC_REFRESH_INTERVAL_SETTING,
198+
MAX_WLM_RULES_SETTING
199+
);
194200
}
195201

196202
@Override

0 commit comments

Comments
 (0)