Skip to content

Commit

Permalink
avoid sending back verbose error message and wrong 500 error to user;…
Browse files Browse the repository at this point in the history
… fix hard code query size of historical analysis (#150)

* avoid sending back verbose error message and wrong 500 error to user

Signed-off-by: Yaliang Wu <ylwu@amazon.com>

* put general error message into constants
  • Loading branch information
ylwu-amzn authored Jul 30, 2021
1 parent db23563 commit 6843b29
Show file tree
Hide file tree
Showing 28 changed files with 175 additions and 66 deletions.
18 changes: 13 additions & 5 deletions src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@

package org.opensearch.ad;

import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG;
import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_PARSE_DETECTOR_MSG;
import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.rest.RestStatus.INTERNAL_SERVER_ERROR;

import java.security.InvalidParameterException;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -39,6 +42,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -109,7 +113,7 @@ public AnomalyDetectorProfileRunner(

public void profile(String detectorId, ActionListener<DetectorProfile> listener, Set<DetectorProfileName> profilesToCollect) {
if (profilesToCollect.isEmpty()) {
listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
listener.onFailure(new IllegalArgumentException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
return;
}
calculateTotalResponsesToWait(detectorId, profilesToCollect, listener);
Expand All @@ -132,12 +136,16 @@ private void calculateTotalResponsesToWait(
AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId);
prepareProfile(detector, listener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
logger.error(FAIL_TO_PARSE_DETECTOR_MSG + detectorId, e);
listener.onFailure(new OpenSearchStatusException(FAIL_TO_PARSE_DETECTOR_MSG + detectorId, BAD_REQUEST));
}
} else {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG + detectorId, BAD_REQUEST));
}
}, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception))));
}, exception -> {
logger.error(FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception);
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG + detectorId, INTERNAL_SERVER_ERROR));
}));
}

private void prepareProfile(
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.security.InvalidParameterException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -77,6 +76,7 @@ public class EntityProfileRunner extends AbstractProfileRunner {
private final Logger logger = LogManager.getLogger(EntityProfileRunner.class);

static final String NOT_HC_DETECTOR_ERR_MSG = "This is not a high cardinality detector";
static final String EMPTY_ENTITY_ATTRIBUTES = "Empty entity attributes";
static final String NO_ENTITY = "Cannot find entity";
private Client client;
private NamedXContentRegistry xContentRegistry;
Expand All @@ -102,7 +102,7 @@ public void profile(
ActionListener<EntityProfile> listener
) {
if (profilesToCollect == null || profilesToCollect.size() == 0) {
listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
listener.onFailure(new IllegalArgumentException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
return;
}
GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
Expand All @@ -119,18 +119,18 @@ public void profile(
List<String> categoryFields = detector.getCategoryField();
int maxCategoryFields = NumericSetting.maxCategoricalFields();
if (categoryFields == null || categoryFields.size() == 0) {
listener.onFailure(new InvalidParameterException(NOT_HC_DETECTOR_ERR_MSG));
listener.onFailure(new IllegalArgumentException(NOT_HC_DETECTOR_ERR_MSG));
} else if (categoryFields.size() > maxCategoryFields) {
listener
.onFailure(new InvalidParameterException(CommonErrorMessages.getTooManyCategoricalFieldErr(maxCategoryFields)));
.onFailure(new IllegalArgumentException(CommonErrorMessages.getTooManyCategoricalFieldErr(maxCategoryFields)));
} else {
validateEntity(entityValue, categoryFields, detectorId, profilesToCollect, detector, listener);
}
} catch (Exception t) {
listener.onFailure(t);
}
} else {
listener.onFailure(new InvalidParameterException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
listener.onFailure(new IllegalArgumentException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
}
}, listener::onFailure));
}
Expand Down Expand Up @@ -160,12 +160,12 @@ private void validateEntity(
) {
Map<String, String> attributes = entity.getAttributes();
if (attributes == null || attributes.size() != categoryFields.size()) {
listener.onFailure(new InvalidParameterException("Empty entity attributes"));
listener.onFailure(new IllegalArgumentException(EMPTY_ENTITY_ATTRIBUTES));
return;
}
for (String field : categoryFields) {
if (false == attributes.containsKey(field)) {
listener.onFailure(new InvalidParameterException("Cannot find " + field));
listener.onFailure(new IllegalArgumentException("Cannot find " + field));
return;
}
}
Expand All @@ -184,15 +184,15 @@ private void validateEntity(
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
try {
if (searchResponse.getHits().getHits().length == 0) {
listener.onFailure(new InvalidParameterException(NO_ENTITY));
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
return;
}
prepareEntityProfile(listener, detectorId, entity, profilesToCollect, detector, categoryFields.get(0));
} catch (Exception e) {
listener.onFailure(new InvalidParameterException(NO_ENTITY));
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
return;
}
}, e -> listener.onFailure(new InvalidParameterException(NO_ENTITY))));
}, e -> listener.onFailure(new IllegalArgumentException(NO_ENTITY))));

}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/caching/PriorityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.ad.AnomalyDetectorPlugin;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.MemoryTracker.Origin;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.common.exception.LimitExceededException;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.ml.CheckpointDao;
Expand Down Expand Up @@ -637,7 +637,7 @@ public void maintenance() {
});
} catch (Exception e) {
// will be thrown to ES's transport broadcast handler
throw new OpenSearchException("Fail to maintain cache", e);
throw new AnomalyDetectionException("Fail to maintain cache", e);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ public class CommonErrorMessages {
public static final String ALL_FEATURES_DISABLED_ERR_MSG =
"Having trouble querying data because all of your features have been disabled.";
public static final String INVALID_TIMESTAMP_ERR_MSG = "timestamp is invalid";
public static String FAIL_TO_FIND_DETECTOR_MSG = "Fail to find detector with id: ";
public static String FAIL_TO_PARSE_DETECTOR_MSG = "Fail to parse detector with id: ";
// change this error message to make it compatible with old version's integration(nexus) test
public static String FAIL_TO_FIND_DETECTOR_MSG = "Can't find detector with id: ";
public static String FAIL_TO_GET_PROFILE_MSG = "Fail to get profile for detector ";
public static String FAIL_TO_GET_TOTAL_ENTITIES = "Failed to get total entities for detector ";
public static String FAIL_TO_GET_USER_INFO = "Unable to get user information from detector ";
public static String NO_PERMISSION_TO_ACCESS_DETECTOR = "User does not have permissions to access detector: ";
public static String CATEGORICAL_FIELD_NUMBER_SURPASSED = "We don't support categorical fields more than ";
public static String EMPTY_PROFILES_COLLECT = "profiles to collect are missing or invalid";
public static String FAIL_FETCH_ERR_MSG = "Fail to fetch profile for ";
Expand All @@ -71,4 +75,16 @@ public static String getTooManyCategoricalFieldErr(int limit) {
public static String EXCEED_HISTORICAL_ANALYSIS_LIMIT = "Exceed max historical analysis limit per node";
public static String NO_ELIGIBLE_NODE_TO_RUN_DETECTOR = "No eligible node to run detector ";
public static String EMPTY_STALE_RUNNING_ENTITIES = "Empty stale running entities";

public static String FAIL_TO_GET_DETECTOR = "Fail to get detector";
public static String FAIL_TO_GET_DETECTOR_INFO = "Fail to get detector info";
public static String FAIL_TO_CREATE_DETECTOR = "Fail to create detector";
public static String FAIL_TO_UPDATE_DETECTOR = "Fail to update detector";
public static String FAIL_TO_PREVIEW_DETECTOR = "Fail to preview detector";
public static String FAIL_TO_START_DETECTOR = "Fail to start detector";
public static String FAIL_TO_STOP_DETECTOR = "Fail to stop detector";
public static String FAIL_TO_DELETE_DETECTOR = "Fail to delete detector";
public static String FAIL_TO_DELETE_AD_RESULT = "Fail to delete anomaly result";
public static String FAIL_TO_GET_STATS = "Fail to get stats";
public static String FAIL_TO_SEARCH = "Fail to search";
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.dataprocessor.Interpolator;
import org.opensearch.ad.model.AnomalyDetector;
Expand Down Expand Up @@ -425,7 +426,7 @@ public void onResponse(SearchResponse response) {
listener.onResponse(topEntities);
} else if (expirationEpochMs < clock.millis()) {
if (topEntities.isEmpty()) {
listener.onFailure(new IllegalStateException("timeout to get preview results. Please retry later."));
listener.onFailure(new AnomalyDetectionException("timeout to get preview results. Please retry later."));
} else {
logger.info("timeout to get preview results. Send whatever we have.");
listener.onResponse(topEntities);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/opensearch/ad/ml/CheckpointDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.indices.ADIndex;
Expand Down Expand Up @@ -674,7 +675,7 @@ public void batchWrite(BulkRequest request, ActionListener<BulkResponse> listene
clientUtil.<BulkRequest, BulkResponse>execute(BulkAction.INSTANCE, request, listener);
} else {
// create index failure. Notify callers using listener.
listener.onFailure(new RuntimeException("Creating checkpoint with mappings call not acknowledged."));
listener.onFailure(new AnomalyDetectionException("Creating checkpoint with mappings call not acknowledged."));
}
}, exception -> {
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

package org.opensearch.ad.rest.handler;

import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG;
import static org.opensearch.ad.model.ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES;
import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static org.opensearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE;
Expand Down Expand Up @@ -229,7 +230,7 @@ private void updateAnomalyDetector(String detectorId) {

private void onGetAnomalyDetectorResponse(GetResponse response) {
if (!response.isExists()) {
listener.onFailure(new OpenSearchStatusException("AnomalyDetector is not found with id: " + detectorId, RestStatus.NOT_FOUND));
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG, RestStatus.NOT_FOUND));
return;
}
try (XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private void indexAnomalyDetectorJob(AnomalyDetectorJob job, AnomalyDetectorFunc
);
}

private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) throws IOException {
private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) {
if (response == null || (response.getResult() != CREATED && response.getResult() != UPDATED)) {
String errorMsg = getShardsFailure(response);
listener.onFailure(new OpenSearchStatusException(errorMsg, response.status()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,13 @@ private AnomalyDetectorSettings() {}
Setting.Property.Dynamic
);

public static final int MAX_BATCH_TASK_PIECE_SIZE = 10_000;
public static final Setting<Integer> BATCH_TASK_PIECE_SIZE = Setting
.intSetting(
"plugins.anomaly_detection.batch_task_piece_size",
LegacyOpenDistroAnomalyDetectorSettings.BATCH_TASK_PIECE_SIZE,
1,
10_000,
MAX_BATCH_TASK_PIECE_SIZE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/opensearch/ad/task/ADTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.opensearch.ad.AnomalyDetectorPlugin.AD_BATCH_TASK_THREAD_POOL_NAME;
import static org.opensearch.ad.constant.CommonErrorMessages.DETECTOR_IS_RUNNING;
import static org.opensearch.ad.constant.CommonErrorMessages.EXCEED_HISTORICAL_ANALYSIS_LIMIT;
import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG;
import static org.opensearch.ad.constant.CommonErrorMessages.NO_ELIGIBLE_NODE_TO_RUN_DETECTOR;
import static org.opensearch.ad.indices.AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN;
import static org.opensearch.ad.model.ADTask.DETECTOR_ID_FIELD;
Expand Down Expand Up @@ -548,7 +549,7 @@ public <T> void getDetector(String detectorId, Consumer<AnomalyDetector> consume
GetRequest getRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
listener.onFailure(new OpenSearchStatusException("AnomalyDetector is not found", RestStatus.NOT_FOUND));
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG, RestStatus.NOT_FOUND));
return;
}
try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) {
Expand All @@ -574,7 +575,7 @@ public <T> void getDetector(
GetRequest getRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
listener.onFailure(new OpenSearchStatusException("AnomalyDetector is not found", RestStatus.NOT_FOUND));
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG, RestStatus.NOT_FOUND));
return;
}
try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@

package org.opensearch.ad.transport;

import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_START_DETECTOR;
import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_STOP_DETECTOR;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;
import static org.opensearch.ad.util.ParseUtils.getUserContext;
import static org.opensearch.ad.util.ParseUtils.resolveUserAndExecute;
import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -88,14 +91,16 @@ public AnomalyDetectorJobTransportAction(
}

@Override
protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener<AnomalyDetectorJobResponse> listener) {
protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener<AnomalyDetectorJobResponse> actionListener) {
String detectorId = request.getDetectorID();
DetectionDateRange detectionDateRange = request.getDetectionDateRange();
boolean historical = request.isHistorical();
long seqNo = request.getSeqNo();
long primaryTerm = request.getPrimaryTerm();
String rawPath = request.getRawPath();
TimeValue requestTimeout = REQUEST_TIMEOUT.get(settings);
String errorMessage = rawPath.endsWith(RestHandlerUtils.START_JOB) ? FAIL_TO_START_DETECTOR : FAIL_TO_STOP_DETECTOR;
ActionListener<AnomalyDetectorJobResponse> listener = wrapRestActionListener(actionListener, errorMessage);

// By the time request reaches here, the user permissions are validated by Security plugin.
User user = getUserContext(client);
Expand Down
Loading

0 comments on commit 6843b29

Please sign in to comment.