Skip to content

Commit

Permalink
Not blocking detector creation on unknown feature validation error (#…
Browse files Browse the repository at this point in the history
…1366)

* don't fail on unknown exception

Signed-off-by: Amit Galitzky <amgalitz@amazon.com>

* fixing test

Signed-off-by: Amit Galitzky <amgalitz@amazon.com>

* order of needed permissions is changed on latest security version or at least not always consistent now

Signed-off-by: Amit Galitzky <amgalitz@amazon.com>

* refactor customNodeclient

Signed-off-by: Amit Galitzky <amgalitz@amazon.com>

---------

Signed-off-by: Amit Galitzky <amgalitz@amazon.com>
  • Loading branch information
amitgalitz authored Nov 17, 2024
1 parent 5ec87b2 commit 4c545ab
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 11 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ buildscript {
js_resource_folder = "src/test/resources/job-scheduler"
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
bwcVersionShort = "2.18.0"
bwcVersionShort = "2.19.0"
bwcVersion = bwcVersionShort + ".0"
bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ protected void validateConfigFeatures(String id, boolean indexingDryRun, ActionL
feature.getId()
);
ssb.aggregation(internalAgg.getAggregatorFactories().iterator().next());
ssb.trackTotalHits(false);
SearchRequest searchRequest = new SearchRequest().indices(config.getIndices().toArray(new String[0])).source(ssb);
ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(response -> {
Optional<double[]> aggFeatureResult = searchFeatureDao.parseResponse(response, Arrays.asList(feature.getId()), false);
Expand All @@ -905,13 +906,19 @@ protected void validateConfigFeatures(String id, boolean indexingDryRun, ActionL
}
}, e -> {
String errorMessage;
if (isExceptionCausedByInvalidQuery(e)) {
if (isExceptionCausedByInvalidQuery(e) || e instanceof TimeSeriesException) {
errorMessage = CommonMessages.FEATURE_WITH_INVALID_QUERY_MSG + feature.getName();
logger.error(errorMessage, e);
multiFeatureQueriesResponseListener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.BAD_REQUEST, e));
} else {
errorMessage = CommonMessages.UNKNOWN_SEARCH_QUERY_EXCEPTION_MSG + feature.getName();
logger.error(errorMessage, e);
// If we see an unexpected error such as timeout or some task cancellation cause of search backpressure
// we don't want to block detector creation as this is unlikely an error due to wrong configs
// but we want to record what error was seen
multiFeatureQueriesResponseListener
.onResponse(new MergeableList<>(new ArrayList<>(Collections.singletonList(Optional.empty()))));
}
logger.error(errorMessage, e);
multiFeatureQueriesResponseListener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.BAD_REQUEST, e));
});
clientUtil.asyncRequestWithInjectedSecurity(searchRequest, client::search, user, client, context, searchResponseListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.RestRequest;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -207,7 +208,7 @@ public void testMoreThanTenThousandSingleEntityDetectors() throws IOException, I

// extend NodeClient since its execute method is final and mockito does not allow to mock final methods
// we can also use spy to overstep the final methods
NodeClient client = getCustomNodeClient(detectorResponse, userIndexResponse, detector, threadPool);
NodeClient client = getCustomNodeClient(detectorResponse, userIndexResponse, null, false, detector, threadPool);
NodeClient clientSpy = spy(client);
NodeStateManager nodeStateManager = mock(NodeStateManager.class);
clientUtil = new SecurityClientUtil(nodeStateManager, settings);
Expand Down Expand Up @@ -546,10 +547,14 @@ public void testUpdateTextField() throws IOException, InterruptedException {
public static NodeClient getCustomNodeClient(
SearchResponse detectorResponse,
SearchResponse userIndexResponse,
SearchResponse configInputIndicesResponse,
boolean useConfigInputIndicesResponse,
AnomalyDetector detector,
ThreadPool pool
) {
return new NodeClient(Settings.EMPTY, pool) {
private int searchCallCount = 0;

@Override
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Expand All @@ -560,8 +565,19 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
if (action.equals(SearchAction.INSTANCE)) {
assertTrue(request instanceof SearchRequest);
SearchRequest searchRequest = (SearchRequest) request;
searchCallCount++;
if (searchRequest.indices()[0].equals(CommonName.CONFIG_INDEX)) {
listener.onResponse((Response) detectorResponse);
} else if (useConfigInputIndicesResponse
&& Arrays.equals(searchRequest.indices(), detector.getIndices().toArray(new String[0]))
&& searchRequest.source().aggregations() == null) {
listener.onResponse((Response) configInputIndicesResponse);
// Call for feature validation occurs on the 3rd call and we want to make sure we supplied a response to the
// previous call.
} else if (searchCallCount == 3 && useConfigInputIndicesResponse) {
// This is the third search call, which should be for featureConfig and we want to replicate something like a
// timeout exception
listener.onFailure(new OpenSearchStatusException("timeout", RestStatus.BAD_REQUEST));
} else {
listener.onResponse((Response) userIndexResponse);
}
Expand Down Expand Up @@ -590,7 +606,7 @@ public void testMoreThanTenMultiEntityDetectors() throws IOException, Interrupte
when(userIndexResponse.getHits()).thenReturn(TestHelpers.createSearchHits(userIndexHits));
// extend NodeClient since its execute method is final and mockito does not allow to mock final methods
// we can also use spy to overstep the final methods
NodeClient client = getCustomNodeClient(detectorResponse, userIndexResponse, detector, threadPool);
NodeClient client = getCustomNodeClient(detectorResponse, userIndexResponse, null, false, detector, threadPool);
NodeClient clientSpy = spy(client);
NodeStateManager nodeStateManager = mock(NodeStateManager.class);
clientUtil = new SecurityClientUtil(nodeStateManager, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
Expand All @@ -54,6 +55,7 @@
import org.opensearch.timeseries.AbstractTimeSeriesTest;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.TestHelpers;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.common.exception.ValidationException;
import org.opensearch.timeseries.feature.SearchFeatureDao;
import org.opensearch.timeseries.model.ValidationAspect;
Expand Down Expand Up @@ -150,7 +152,7 @@ public void testValidateMoreThanThousandSingleEntityDetectorLimit() throws IOExc
// extend NodeClient since its execute method is final and mockito does not allow to mock final methods
// we can also use spy to overstep the final methods
NodeClient client = IndexAnomalyDetectorActionHandlerTests
.getCustomNodeClient(detectorResponse, userIndexResponse, singleEntityDetector, threadPool);
.getCustomNodeClient(detectorResponse, userIndexResponse, null, false, singleEntityDetector, threadPool);

NodeClient clientSpy = spy(client);
NodeStateManager nodeStateManager = mock(NodeStateManager.class);
Expand Down Expand Up @@ -208,7 +210,7 @@ public void testValidateMoreThanTenMultiEntityDetectorsLimit() throws IOExceptio
// extend NodeClient since its execute method is final and mockito does not allow to mock final methods
// we can also use spy to overstep the final methods
NodeClient client = IndexAnomalyDetectorActionHandlerTests
.getCustomNodeClient(detectorResponse, userIndexResponse, detector, threadPool);
.getCustomNodeClient(detectorResponse, userIndexResponse, null, false, detector, threadPool);
NodeClient clientSpy = spy(client);
NodeStateManager nodeStateManager = mock(NodeStateManager.class);
SecurityClientUtil clientUtil = new SecurityClientUtil(nodeStateManager, settings);
Expand Down Expand Up @@ -250,4 +252,60 @@ public void testValidateMoreThanTenMultiEntityDetectorsLimit() throws IOExceptio
assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS));
verify(clientSpy, never()).execute(eq(GetMappingsAction.INSTANCE), any(), any());
}

// This test also validates that if we get a non timeseries exception or not an invalid query that we will not completely block
// detector creation, this is applicable like things when we get timeout not cause of AD configuration errors but because cluster
// is momentarily under utilized.
public void testValidateMoreThanTenMultiEntityDetectorsLimitDuplicateNameFailure() throws IOException, InterruptedException {
SearchResponse mockResponse = mock(SearchResponse.class);
int totalHits = 1;
when(mockResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits));
SearchResponse detectorResponse = mock(SearchResponse.class);
when(detectorResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits));
SearchResponse userIndexResponse = mock(SearchResponse.class);
when(userIndexResponse.getHits()).thenReturn(TestHelpers.createSearchHits(0));
AnomalyDetector singleEntityDetector = TestHelpers.randomAnomalyDetector(TestHelpers.randomUiMetadata(), null, true);

SearchResponse configInputIndicesResponse = mock(SearchResponse.class);
when(configInputIndicesResponse.getHits()).thenReturn(TestHelpers.createSearchHits(2));

// extend NodeClient since its execute method is final and mockito does not allow to mock final methods
// we can also use spy to overstep the final methods
NodeClient client = IndexAnomalyDetectorActionHandlerTests
.getCustomNodeClient(detectorResponse, userIndexResponse, configInputIndicesResponse, true, singleEntityDetector, threadPool);

NodeClient clientSpy = spy(client);
NodeStateManager nodeStateManager = mock(NodeStateManager.class);
SecurityClientUtil clientUtil = new SecurityClientUtil(nodeStateManager, settings);

handler = new ValidateAnomalyDetectorActionHandler(
clusterService,
clientSpy,
clientUtil,
anomalyDetectionIndices,
singleEntityDetector,
requestTimeout,
maxSingleEntityAnomalyDetectors,
maxMultiEntityAnomalyDetectors,
maxAnomalyFeatures,
maxCategoricalFields,
method,
xContentRegistry(),
null,
searchFeatureDao,
ValidationAspect.DETECTOR.getName(),
clock,
settings
);
PlainActionFuture<ValidateConfigResponse> future = PlainActionFuture.newFuture();
handler.start(future);
try {
future.actionGet(100, TimeUnit.SECONDS);
fail("should not reach here");
} catch (Exception e) {
assertTrue(e instanceof TimeSeriesException);
assertTrue(e.getMessage().contains("Cannot create anomaly detector with name"));
}
verify(clientSpy, never()).execute(eq(GetMappingsAction.INSTANCE), any(), any());
}
}
4 changes: 3 additions & 1 deletion src/test/java/org/opensearch/ad/rest/SecureADRestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,9 @@ public void testCreateAnomalyDetectorWithCustomResultIndex() throws IOException
Assert
.assertTrue(
"got " + exception.getMessage(),
exception.getMessage().contains("no permissions for [indices:admin/aliases, indices:admin/create]")
exception.getMessage().contains("indices:admin/aliases")
&& exception.getMessage().contains("indices:admin/create")
&& exception.getMessage().contains("no permissions for")
);

// User cat has permission to create index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void testValidateAnomalyDetectorWithInvalidFeatureField() throws IOExcept
}

@Test
public void testValidateAnomalyDetectorWithUnknownFeatureField() throws IOException {
public void testValidateAnomalyDetectorWithInvalidFeatureDueToTimeSeriesException() throws IOException {
AggregationBuilder aggregationBuilder = TestHelpers.parseAggregation("{\"test\":{\"terms\":{\"field\":\"type\"}}}");
AnomalyDetector anomalyDetector = TestHelpers
.randomAnomalyDetector(
Expand All @@ -245,7 +245,7 @@ public void testValidateAnomalyDetectorWithUnknownFeatureField() throws IOExcept
assertNotNull(response.getIssue());
assertEquals(ValidationIssueType.FEATURE_ATTRIBUTES, response.getIssue().getType());
assertEquals(ValidationAspect.DETECTOR, response.getIssue().getAspect());
assertTrue(response.getIssue().getMessage().contains(CommonMessages.UNKNOWN_SEARCH_QUERY_EXCEPTION_MSG));
assertTrue(response.getIssue().getMessage().contains(CommonMessages.FEATURE_WITH_INVALID_QUERY_MSG));
assertTrue(response.getIssue().getSubIssues().containsKey(nameField));
}

Expand Down

0 comments on commit 4c545ab

Please sign in to comment.