Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid sending back verbose error message and wrong 500 error to user; fix hard code query size of historical analysis #150

Merged
merged 2 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@

package org.opensearch.ad;

import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG;
import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_PARSE_DETECTOR_MSG;
import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.rest.RestStatus.INTERNAL_SERVER_ERROR;

import java.security.InvalidParameterException;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -39,6 +42,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -109,7 +113,7 @@ public AnomalyDetectorProfileRunner(

public void profile(String detectorId, ActionListener<DetectorProfile> listener, Set<DetectorProfileName> profilesToCollect) {
if (profilesToCollect.isEmpty()) {
listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
listener.onFailure(new IllegalArgumentException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
return;
}
calculateTotalResponsesToWait(detectorId, profilesToCollect, listener);
Expand All @@ -132,12 +136,16 @@ private void calculateTotalResponsesToWait(
AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId);
prepareProfile(detector, listener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
logger.error(FAIL_TO_PARSE_DETECTOR_MSG + detectorId, e);
listener.onFailure(new OpenSearchStatusException(FAIL_TO_PARSE_DETECTOR_MSG + detectorId, BAD_REQUEST));
}
} else {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG + detectorId, BAD_REQUEST));
}
}, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception))));
}, exception -> {
logger.error(FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception);
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG + detectorId, INTERNAL_SERVER_ERROR));
}));
}

private void prepareProfile(
Expand Down
19 changes: 9 additions & 10 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.security.InvalidParameterException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -102,7 +101,7 @@ public void profile(
ActionListener<EntityProfile> listener
) {
if (profilesToCollect == null || profilesToCollect.size() == 0) {
listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
listener.onFailure(new IllegalArgumentException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
return;
}
GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
Expand All @@ -119,18 +118,18 @@ public void profile(
List<String> categoryFields = detector.getCategoryField();
int maxCategoryFields = NumericSetting.maxCategoricalFields();
if (categoryFields == null || categoryFields.size() == 0) {
listener.onFailure(new InvalidParameterException(NOT_HC_DETECTOR_ERR_MSG));
listener.onFailure(new IllegalArgumentException(NOT_HC_DETECTOR_ERR_MSG));
} else if (categoryFields.size() > maxCategoryFields) {
listener
.onFailure(new InvalidParameterException(CommonErrorMessages.getTooManyCategoricalFieldErr(maxCategoryFields)));
.onFailure(new IllegalArgumentException(CommonErrorMessages.getTooManyCategoricalFieldErr(maxCategoryFields)));
} else {
validateEntity(entityValue, categoryFields, detectorId, profilesToCollect, detector, listener);
}
} catch (Exception t) {
listener.onFailure(t);
}
} else {
listener.onFailure(new InvalidParameterException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
listener.onFailure(new IllegalArgumentException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
}
}, listener::onFailure));
}
Expand Down Expand Up @@ -160,12 +159,12 @@ private void validateEntity(
) {
Map<String, String> attributes = entity.getAttributes();
if (attributes == null || attributes.size() != categoryFields.size()) {
listener.onFailure(new InvalidParameterException("Empty entity attributes"));
listener.onFailure(new IllegalArgumentException("Empty entity attributes"));
return;
}
for (String field : categoryFields) {
if (false == attributes.containsKey(field)) {
listener.onFailure(new InvalidParameterException("Cannot find " + field));
listener.onFailure(new IllegalArgumentException("Cannot find " + field));
return;
}
}
Expand All @@ -184,15 +183,15 @@ private void validateEntity(
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
try {
if (searchResponse.getHits().getHits().length == 0) {
listener.onFailure(new InvalidParameterException(NO_ENTITY));
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
return;
}
prepareEntityProfile(listener, detectorId, entity, profilesToCollect, detector, categoryFields.get(0));
} catch (Exception e) {
listener.onFailure(new InvalidParameterException(NO_ENTITY));
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
return;
}
}, e -> listener.onFailure(new InvalidParameterException(NO_ENTITY))));
}, e -> listener.onFailure(new IllegalArgumentException(NO_ENTITY))));

}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/caching/PriorityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.ad.AnomalyDetectorPlugin;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.MemoryTracker.Origin;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.common.exception.LimitExceededException;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.ml.CheckpointDao;
Expand Down Expand Up @@ -637,7 +637,7 @@ public void maintenance() {
});
} catch (Exception e) {
// will be thrown to ES's transport broadcast handler
throw new OpenSearchException("Fail to maintain cache", e);
throw new AnomalyDetectionException("Fail to maintain cache", e);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class CommonErrorMessages {
public static final String ALL_FEATURES_DISABLED_ERR_MSG =
"Having trouble querying data because all of your features have been disabled.";
public static final String INVALID_TIMESTAMP_ERR_MSG = "timestamp is invalid";
public static String FAIL_TO_PARSE_DETECTOR_MSG = "Fail to parse detector with id: ";
public static String FAIL_TO_FIND_DETECTOR_MSG = "Fail to find detector with id: ";
public static String FAIL_TO_GET_PROFILE_MSG = "Fail to get profile for detector ";
public static String FAIL_TO_GET_TOTAL_ENTITIES = "Failed to get total entities for detector ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.dataprocessor.Interpolator;
import org.opensearch.ad.model.AnomalyDetector;
Expand Down Expand Up @@ -425,7 +426,7 @@ public void onResponse(SearchResponse response) {
listener.onResponse(topEntities);
} else if (expirationEpochMs < clock.millis()) {
if (topEntities.isEmpty()) {
listener.onFailure(new IllegalStateException("timeout to get preview results. Please retry later."));
listener.onFailure(new AnomalyDetectionException("timeout to get preview results. Please retry later."));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we switching this?

Copy link
Collaborator Author

@ylwu-amzn ylwu-amzn Jul 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In AD we have self-defined exceptions under class "AnomalyDetectionException" , this PR need to get concrete error message based on exception class, if it's "AnomalyDetectionException" (and several other exceptions, check RestHandlerUtils class), we will get message from exception directly, otherwise we use general error message to avoid returning verbose error message to user. Check RestHandlerUtils#wrapRestActionListener for details.

Security team has concern about returning verbose error .

} else {
logger.info("timeout to get preview results. Send whatever we have.");
listener.onResponse(topEntities);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/opensearch/ad/ml/CheckpointDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.indices.ADIndex;
Expand Down Expand Up @@ -674,7 +675,7 @@ public void batchWrite(BulkRequest request, ActionListener<BulkResponse> listene
clientUtil.<BulkRequest, BulkResponse>execute(BulkAction.INSTANCE, request, listener);
} else {
// create index failure. Notify callers using listener.
listener.onFailure(new RuntimeException("Creating checkpoint with mappings call not acknowledged."));
listener.onFailure(new AnomalyDetectionException("Creating checkpoint with mappings call not acknowledged."));
}
}, exception -> {
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private void indexAnomalyDetectorJob(AnomalyDetectorJob job, AnomalyDetectorFunc
);
}

private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) throws IOException {
private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) {
if (response == null || (response.getResult() != CREATED && response.getResult() != UPDATED)) {
String errorMsg = getShardsFailure(response);
listener.onFailure(new OpenSearchStatusException(errorMsg, response.status()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,13 @@ private AnomalyDetectorSettings() {}
Setting.Property.Dynamic
);

public static final int MAX_BATCH_TASK_PIECE_SIZE = 10_000;
public static final Setting<Integer> BATCH_TASK_PIECE_SIZE = Setting
.intSetting(
"plugins.anomaly_detection.batch_task_piece_size",
LegacyOpenDistroAnomalyDetectorSettings.BATCH_TASK_PIECE_SIZE,
1,
10_000,
MAX_BATCH_TASK_PIECE_SIZE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.opensearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;
import static org.opensearch.ad.util.ParseUtils.getUserContext;
import static org.opensearch.ad.util.ParseUtils.resolveUserAndExecute;
import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -88,14 +89,16 @@ public AnomalyDetectorJobTransportAction(
}

@Override
protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener<AnomalyDetectorJobResponse> listener) {
protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener<AnomalyDetectorJobResponse> actionListener) {
String detectorId = request.getDetectorID();
DetectionDateRange detectionDateRange = request.getDetectionDateRange();
boolean historical = request.isHistorical();
long seqNo = request.getSeqNo();
long primaryTerm = request.getPrimaryTerm();
String rawPath = request.getRawPath();
TimeValue requestTimeout = REQUEST_TIMEOUT.get(settings);
String action = rawPath.endsWith(RestHandlerUtils.START_JOB) ? "start" : "stop";
ActionListener<AnomalyDetectorJobResponse> listener = wrapRestActionListener(actionListener, "Failed to " + action + " detector");

// By the time request reaches here, the user permissions are validated by Security plugin.
User user = getUserContext(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.opensearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.ad.util.ParseUtils.getUserContext;
import static org.opensearch.ad.util.ParseUtils.resolveUserAndExecute;
import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
Expand Down Expand Up @@ -98,10 +99,11 @@ public DeleteAnomalyDetectorTransportAction(
}

@Override
protected void doExecute(Task task, DeleteAnomalyDetectorRequest request, ActionListener<DeleteResponse> listener) {
protected void doExecute(Task task, DeleteAnomalyDetectorRequest request, ActionListener<DeleteResponse> actionListener) {
String detectorId = request.getDetectorID();
LOG.info("Delete anomaly detector job {}", detectorId);
User user = getUserContext(client);
ActionListener<DeleteResponse> listener = wrapRestActionListener(actionListener, "Failed to delete detector");
// By the time request reaches here, the user permissions are validated by Security plugin.
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
resolveUserAndExecute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.opensearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.ad.util.ParseUtils.addUserBackendRolesFilter;
import static org.opensearch.ad.util.ParseUtils.getUserContext;
import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -53,7 +54,8 @@ public DeleteAnomalyResultsTransportAction(
}

@Override
protected void doExecute(Task task, DeleteByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
protected void doExecute(Task task, DeleteByQueryRequest request, ActionListener<BulkByScrollResponse> actionListener) {
ActionListener<BulkByScrollResponse> listener = wrapRestActionListener(actionListener, "Failed to delete anomaly result");
delete(request, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.opensearch.ad.util.ParseUtils.getUserContext;
import static org.opensearch.ad.util.ParseUtils.resolveUserAndExecute;
import static org.opensearch.ad.util.RestHandlerUtils.PROFILE;
import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.util.Arrays;
Expand Down Expand Up @@ -140,9 +141,10 @@ public GetAnomalyDetectorTransportAction(
}

@Override
protected void doExecute(Task task, GetAnomalyDetectorRequest request, ActionListener<GetAnomalyDetectorResponse> listener) {
protected void doExecute(Task task, GetAnomalyDetectorRequest request, ActionListener<GetAnomalyDetectorResponse> actionListener) {
String detectorID = request.getDetectorID();
User user = getUserContext(client);
ActionListener<GetAnomalyDetectorResponse> listener = wrapRestActionListener(actionListener, "Failed to get detector");
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
resolveUserAndExecute(
user,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.opensearch.ad.util.ParseUtils.checkFilterByBackendRoles;
import static org.opensearch.ad.util.ParseUtils.getDetector;
import static org.opensearch.ad.util.ParseUtils.getUserContext;
import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -95,10 +96,12 @@ public IndexAnomalyDetectorTransportAction(
}

@Override
protected void doExecute(Task task, IndexAnomalyDetectorRequest request, ActionListener<IndexAnomalyDetectorResponse> listener) {
protected void doExecute(Task task, IndexAnomalyDetectorRequest request, ActionListener<IndexAnomalyDetectorResponse> actionListener) {
User user = getUserContext(client);
String detectorId = request.getDetectorID();
RestRequest.Method method = request.getMethod();
String action = method == RestRequest.Method.PUT ? "update" : "create";
ActionListener<IndexAnomalyDetectorResponse> listener = wrapRestActionListener(actionListener, "Failed to " + action + " detector");
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
resolveUserAndExecute(user, detectorId, method, listener, (detector) -> adExecute(request, user, detector, context, listener));
} catch (Exception e) {
Expand Down
Loading