Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid sending back verbose error message and wrong 500 error to user; fix hard code query size of historical analysis #150

Merged
merged 2 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@

package org.opensearch.ad;

import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG;
import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_PARSE_DETECTOR_MSG;
import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.rest.RestStatus.INTERNAL_SERVER_ERROR;

import java.security.InvalidParameterException;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -39,6 +42,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -109,7 +113,7 @@ public AnomalyDetectorProfileRunner(

public void profile(String detectorId, ActionListener<DetectorProfile> listener, Set<DetectorProfileName> profilesToCollect) {
if (profilesToCollect.isEmpty()) {
listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
listener.onFailure(new IllegalArgumentException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
return;
}
calculateTotalResponsesToWait(detectorId, profilesToCollect, listener);
Expand All @@ -132,12 +136,16 @@ private void calculateTotalResponsesToWait(
AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId);
prepareProfile(detector, listener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
logger.error(FAIL_TO_PARSE_DETECTOR_MSG + detectorId, e);
listener.onFailure(new OpenSearchStatusException(FAIL_TO_PARSE_DETECTOR_MSG + detectorId, BAD_REQUEST));
}
} else {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG + detectorId, BAD_REQUEST));
}
}, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception))));
}, exception -> {
logger.error(FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception);
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG + detectorId, INTERNAL_SERVER_ERROR));
}));
}

private void prepareProfile(
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.security.InvalidParameterException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -77,6 +76,7 @@ public class EntityProfileRunner extends AbstractProfileRunner {
private final Logger logger = LogManager.getLogger(EntityProfileRunner.class);

static final String NOT_HC_DETECTOR_ERR_MSG = "This is not a high cardinality detector";
static final String EMPTY_ENTITY_ATTRIBUTES = "Empty entity attributes";
static final String NO_ENTITY = "Cannot find entity";
private Client client;
private NamedXContentRegistry xContentRegistry;
Expand All @@ -102,7 +102,7 @@ public void profile(
ActionListener<EntityProfile> listener
) {
if (profilesToCollect == null || profilesToCollect.size() == 0) {
listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
listener.onFailure(new IllegalArgumentException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
return;
}
GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
Expand All @@ -119,18 +119,18 @@ public void profile(
List<String> categoryFields = detector.getCategoryField();
int maxCategoryFields = NumericSetting.maxCategoricalFields();
if (categoryFields == null || categoryFields.size() == 0) {
listener.onFailure(new InvalidParameterException(NOT_HC_DETECTOR_ERR_MSG));
listener.onFailure(new IllegalArgumentException(NOT_HC_DETECTOR_ERR_MSG));
} else if (categoryFields.size() > maxCategoryFields) {
listener
.onFailure(new InvalidParameterException(CommonErrorMessages.getTooManyCategoricalFieldErr(maxCategoryFields)));
.onFailure(new IllegalArgumentException(CommonErrorMessages.getTooManyCategoricalFieldErr(maxCategoryFields)));
} else {
validateEntity(entityValue, categoryFields, detectorId, profilesToCollect, detector, listener);
}
} catch (Exception t) {
listener.onFailure(t);
}
} else {
listener.onFailure(new InvalidParameterException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
listener.onFailure(new IllegalArgumentException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
}
}, listener::onFailure));
}
Expand Down Expand Up @@ -160,12 +160,12 @@ private void validateEntity(
) {
Map<String, String> attributes = entity.getAttributes();
if (attributes == null || attributes.size() != categoryFields.size()) {
listener.onFailure(new InvalidParameterException("Empty entity attributes"));
listener.onFailure(new IllegalArgumentException(EMPTY_ENTITY_ATTRIBUTES));
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
return;
}
for (String field : categoryFields) {
if (false == attributes.containsKey(field)) {
listener.onFailure(new InvalidParameterException("Cannot find " + field));
listener.onFailure(new IllegalArgumentException("Cannot find " + field));
return;
}
}
Expand All @@ -184,15 +184,15 @@ private void validateEntity(
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
try {
if (searchResponse.getHits().getHits().length == 0) {
listener.onFailure(new InvalidParameterException(NO_ENTITY));
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
return;
}
prepareEntityProfile(listener, detectorId, entity, profilesToCollect, detector, categoryFields.get(0));
} catch (Exception e) {
listener.onFailure(new InvalidParameterException(NO_ENTITY));
listener.onFailure(new IllegalArgumentException(NO_ENTITY));
return;
}
}, e -> listener.onFailure(new InvalidParameterException(NO_ENTITY))));
}, e -> listener.onFailure(new IllegalArgumentException(NO_ENTITY))));

}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/caching/PriorityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.ad.AnomalyDetectorPlugin;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.MemoryTracker.Origin;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.common.exception.LimitExceededException;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.ml.CheckpointDao;
Expand Down Expand Up @@ -637,7 +637,7 @@ public void maintenance() {
});
} catch (Exception e) {
// will be thrown to ES's transport broadcast handler
throw new OpenSearchException("Fail to maintain cache", e);
throw new AnomalyDetectionException("Fail to maintain cache", e);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ public class CommonErrorMessages {
public static final String ALL_FEATURES_DISABLED_ERR_MSG =
"Having trouble querying data because all of your features have been disabled.";
public static final String INVALID_TIMESTAMP_ERR_MSG = "timestamp is invalid";
public static String FAIL_TO_FIND_DETECTOR_MSG = "Fail to find detector with id: ";
public static String FAIL_TO_PARSE_DETECTOR_MSG = "Fail to parse detector with id: ";
// change this error message to make it compatible with old version's integration(nexus) test
public static String FAIL_TO_FIND_DETECTOR_MSG = "Can't find detector with id: ";
public static String FAIL_TO_GET_PROFILE_MSG = "Fail to get profile for detector ";
public static String FAIL_TO_GET_TOTAL_ENTITIES = "Failed to get total entities for detector ";
public static String FAIL_TO_GET_USER_INFO = "Unable to get user information from detector ";
public static String NO_PERMISSION_TO_ACCESS_DETECTOR = "User does not have permissions to access detector: ";
public static String CATEGORICAL_FIELD_NUMBER_SURPASSED = "We don't support categorical fields more than ";
public static String EMPTY_PROFILES_COLLECT = "profiles to collect are missing or invalid";
public static String FAIL_FETCH_ERR_MSG = "Fail to fetch profile for ";
Expand All @@ -71,4 +75,16 @@ public static String getTooManyCategoricalFieldErr(int limit) {
public static String EXCEED_HISTORICAL_ANALYSIS_LIMIT = "Exceed max historical analysis limit per node";
public static String NO_ELIGIBLE_NODE_TO_RUN_DETECTOR = "No eligible node to run detector ";
public static String EMPTY_STALE_RUNNING_ENTITIES = "Empty stale running entities";

public static String FAIL_TO_GET_DETECTOR = "Fail to get detector";
public static String FAIL_TO_GET_DETECTOR_INFO = "Fail to get detector info";
public static String FAIL_TO_CREATE_DETECTOR = "Fail to create detector";
public static String FAIL_TO_UPDATE_DETECTOR = "Fail to update detector";
public static String FAIL_TO_PREVIEW_DETECTOR = "Fail to preview detector";
public static String FAIL_TO_START_DETECTOR = "Fail to start detector";
public static String FAIL_TO_STOP_DETECTOR = "Fail to stop detector";
public static String FAIL_TO_DELETE_DETECTOR = "Fail to delete detector";
public static String FAIL_TO_DELETE_AD_RESULT = "Fail to delete anomaly result";
public static String FAIL_TO_GET_STATS = "Fail to get stats";
public static String FAIL_TO_SEARCH = "Fail to search";
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.dataprocessor.Interpolator;
import org.opensearch.ad.model.AnomalyDetector;
Expand Down Expand Up @@ -425,7 +426,7 @@ public void onResponse(SearchResponse response) {
listener.onResponse(topEntities);
} else if (expirationEpochMs < clock.millis()) {
if (topEntities.isEmpty()) {
listener.onFailure(new IllegalStateException("timeout to get preview results. Please retry later."));
listener.onFailure(new AnomalyDetectionException("timeout to get preview results. Please retry later."));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we switching this?

Copy link
Collaborator Author

@ylwu-amzn ylwu-amzn Jul 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In AD we have self-defined exceptions under class "AnomalyDetectionException" , this PR need to get concrete error message based on exception class, if it's "AnomalyDetectionException" (and several other exceptions, check RestHandlerUtils class), we will get message from exception directly, otherwise we use general error message to avoid returning verbose error message to user. Check RestHandlerUtils#wrapRestActionListener for details.

Security team has concern about returning verbose error .

} else {
logger.info("timeout to get preview results. Send whatever we have.");
listener.onResponse(topEntities);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/opensearch/ad/ml/CheckpointDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.indices.ADIndex;
Expand Down Expand Up @@ -674,7 +675,7 @@ public void batchWrite(BulkRequest request, ActionListener<BulkResponse> listene
clientUtil.<BulkRequest, BulkResponse>execute(BulkAction.INSTANCE, request, listener);
} else {
// create index failure. Notify callers using listener.
listener.onFailure(new RuntimeException("Creating checkpoint with mappings call not acknowledged."));
listener.onFailure(new AnomalyDetectionException("Creating checkpoint with mappings call not acknowledged."));
}
}, exception -> {
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

package org.opensearch.ad.rest.handler;

import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG;
import static org.opensearch.ad.model.ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES;
import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static org.opensearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE;
Expand Down Expand Up @@ -229,7 +230,7 @@ private void updateAnomalyDetector(String detectorId) {

private void onGetAnomalyDetectorResponse(GetResponse response) {
if (!response.isExists()) {
listener.onFailure(new OpenSearchStatusException("AnomalyDetector is not found with id: " + detectorId, RestStatus.NOT_FOUND));
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG, RestStatus.NOT_FOUND));
return;
}
try (XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private void indexAnomalyDetectorJob(AnomalyDetectorJob job, AnomalyDetectorFunc
);
}

private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) throws IOException {
private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) {
if (response == null || (response.getResult() != CREATED && response.getResult() != UPDATED)) {
String errorMsg = getShardsFailure(response);
listener.onFailure(new OpenSearchStatusException(errorMsg, response.status()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,13 @@ private AnomalyDetectorSettings() {}
Setting.Property.Dynamic
);

public static final int MAX_BATCH_TASK_PIECE_SIZE = 10_000;
public static final Setting<Integer> BATCH_TASK_PIECE_SIZE = Setting
.intSetting(
"plugins.anomaly_detection.batch_task_piece_size",
LegacyOpenDistroAnomalyDetectorSettings.BATCH_TASK_PIECE_SIZE,
1,
10_000,
MAX_BATCH_TASK_PIECE_SIZE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/opensearch/ad/task/ADTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.opensearch.ad.AnomalyDetectorPlugin.AD_BATCH_TASK_THREAD_POOL_NAME;
import static org.opensearch.ad.constant.CommonErrorMessages.DETECTOR_IS_RUNNING;
import static org.opensearch.ad.constant.CommonErrorMessages.EXCEED_HISTORICAL_ANALYSIS_LIMIT;
import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG;
import static org.opensearch.ad.constant.CommonErrorMessages.NO_ELIGIBLE_NODE_TO_RUN_DETECTOR;
import static org.opensearch.ad.indices.AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN;
import static org.opensearch.ad.model.ADTask.DETECTOR_ID_FIELD;
Expand Down Expand Up @@ -548,7 +549,7 @@ public <T> void getDetector(String detectorId, Consumer<AnomalyDetector> consume
GetRequest getRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
listener.onFailure(new OpenSearchStatusException("AnomalyDetector is not found", RestStatus.NOT_FOUND));
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG, RestStatus.NOT_FOUND));
return;
}
try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) {
Expand All @@ -574,7 +575,7 @@ public <T> void getDetector(
GetRequest getRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
listener.onFailure(new OpenSearchStatusException("AnomalyDetector is not found", RestStatus.NOT_FOUND));
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG, RestStatus.NOT_FOUND));
return;
}
try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@

package org.opensearch.ad.transport;

import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_START_DETECTOR;
import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_STOP_DETECTOR;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;
import static org.opensearch.ad.util.ParseUtils.getUserContext;
import static org.opensearch.ad.util.ParseUtils.resolveUserAndExecute;
import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -88,14 +91,16 @@ public AnomalyDetectorJobTransportAction(
}

@Override
protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener<AnomalyDetectorJobResponse> listener) {
protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener<AnomalyDetectorJobResponse> actionListener) {
String detectorId = request.getDetectorID();
DetectionDateRange detectionDateRange = request.getDetectionDateRange();
boolean historical = request.isHistorical();
long seqNo = request.getSeqNo();
long primaryTerm = request.getPrimaryTerm();
String rawPath = request.getRawPath();
TimeValue requestTimeout = REQUEST_TIMEOUT.get(settings);
String errorMessage = rawPath.endsWith(RestHandlerUtils.START_JOB) ? FAIL_TO_START_DETECTOR : FAIL_TO_STOP_DETECTOR;
ActionListener<AnomalyDetectorJobResponse> listener = wrapRestActionListener(actionListener, errorMessage);

// By the time request reaches here, the user permissions are validated by Security plugin.
User user = getUserContext(client);
Expand Down
Loading