Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Fix the profile API returns prematurely. #340

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public void profile(String detectorId, ActionListener<DetectorProfile> listener,
listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
return;
}

calculateTotalResponsesToWait(detectorId, profilesToCollect, listener);
}

Expand All @@ -118,10 +117,38 @@ private void calculateTotalResponsesToWait(
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, xContentParser.nextToken(), xContentParser);
AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId);

prepareProfile(detector, listener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
}
} else {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
Copy link
Contributor

@ylwu-amzn ylwu-amzn Dec 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use AnomalyDetectionException here? Same question for other places

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do it. Most of transport APIs use AnomalyDetectionException (except the recent ones added by Sarat). When I reviewed Sarat's PRs, I thought about pointing it out, but didn't because changing to use AnomalyDetectionException does not add too much benefit except that we have standardized the exceptions we throw. Our public APIs do not standardize the exception it throws back to the user. Take profile API as an example, sometimes it throws IOException, sometimes NullPointerException, and sometimes RuntimeException. Do you think we should change all of public APIs and transport APIs to use AnomalyDetectionException? The benefit is that we can have a standard wrapper exception to throw. The drawback is that this might be another PR due to the large changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we only catch exceptions in AD realtime job and count in failure stats. We may need to count all exceptions from other places, then we need to wrap the exception.
No so urgent. We can fix it later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

}
}, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception))));
}

private void prepareProfile(
AnomalyDetector detector,
ActionListener<DetectorProfile> listener,
Set<DetectorProfileName> profilesToCollect
) {
String detectorId = detector.getDetectorId();
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
XContentParser parser = XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

boolean isMultiEntityDetector = detector.isMultientityDetector();

int totalResponsesToWait = 0;

if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
totalResponsesToWait++;
}
Expand Down Expand Up @@ -158,50 +185,20 @@ private void calculateTotalResponsesToWait(
new MultiResponsesDelegateActionListener<DetectorProfile>(
listener,
totalResponsesToWait,
"Fail to fetch profile for " + detectorId,
CommonErrorMessages.FAIL_FETCH_ERR_MSG + detectorId,
false
);

prepareProfile(detector, delegateListener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
}
} else {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
}
}, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception))));
}

private void prepareProfile(
AnomalyDetector detector,
MultiResponsesDelegateActionListener<DetectorProfile> listener,
Set<DetectorProfileName> profilesToCollect
) {
String detectorId = detector.getDetectorId();
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
XContentParser parser = XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
GetRequest getStateRequest = new GetRequest(DetectorInternalState.DETECTOR_STATE_INDEX, detectorId);
client.get(getStateRequest, onGetDetectorState(listener, detectorId, enabledTimeMs));
client.get(getStateRequest, onGetDetectorState(delegateListener, detectorId, enabledTimeMs));
}

boolean isMultiEntityDetector = detector.isMultientityDetector();

// total number of listeners we need to define. Needed by MultiResponsesDelegateActionListener to decide
// when to consolidate results and return to users
if (isMultiEntityDetector) {
if (profilesToCollect.contains(DetectorProfileName.TOTAL_ENTITIES)) {
profileEntityStats(listener, detector);
profileEntityStats(delegateListener, detector);
}
if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)
|| profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE)
Expand All @@ -210,24 +207,24 @@ private void prepareProfile(
|| profilesToCollect.contains(DetectorProfileName.ACTIVE_ENTITIES)
|| profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(DetectorProfileName.STATE)) {
profileModels(detector, profilesToCollect, job, true, listener);
profileModels(detector, profilesToCollect, job, true, delegateListener);
}
} else {
if (profilesToCollect.contains(DetectorProfileName.STATE)
|| profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)) {
profileStateRelated(detector, listener, job.isEnabled(), profilesToCollect);
profileStateRelated(detector, delegateListener, job.isEnabled(), profilesToCollect);
}
if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)
|| profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE)
|| profilesToCollect.contains(DetectorProfileName.TOTAL_SIZE_IN_BYTES)
|| profilesToCollect.contains(DetectorProfileName.MODELS)) {
profileModels(detector, profilesToCollect, job, false, listener);
profileModels(detector, profilesToCollect, job, false, delegateListener);
}
}

} catch (IOException | XContentParseException | NullPointerException e) {
logger.error(e);
listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
} catch (Exception e) {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
listener.onFailure(e);
}
} else {
onGetDetectorForPrepare(listener, profilesToCollect);
Expand Down Expand Up @@ -261,20 +258,19 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
DetectorProfile profile = profileBuilder.totalEntities(value).build();
listener.onResponse(profile);
}, searchException -> { listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId()); })
);
}, searchException -> {
logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId());
yizheliu-amazon marked this conversation as resolved.
Show resolved Hide resolved
listener.onFailure(searchException);
}));
}
}

private void onGetDetectorForPrepare(
MultiResponsesDelegateActionListener<DetectorProfile> listener,
Set<DetectorProfileName> profiles
) {
private void onGetDetectorForPrepare(ActionListener<DetectorProfile> listener, Set<DetectorProfileName> profiles) {
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
if (profiles.contains(DetectorProfileName.STATE)) {
profileBuilder.state(DetectorState.DISABLED);
}
listener.respondImmediately(profileBuilder.build());
listener.onResponse(profileBuilder.build());
}

/**
Expand Down Expand Up @@ -340,8 +336,8 @@ private ActionListener<GetResponse> onGetDetectorState(
listener.onResponse(profileBuilder.build());

} catch (IOException | XContentParseException | NullPointerException e) {
logger.error(e);
listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
listener.onFailure(e);
}
} else {
// detector state for this detector does not exist
Expand Down Expand Up @@ -475,7 +471,7 @@ private ActionListener<SearchResponse> onInittedEver(
"Fail to find any anomaly result with anomaly score larger than 0 after AD job enabled time for detector {}",
detector.getDetectorId()
);
listener.failImmediately(new RuntimeException("Fail to find detector state: " + detector.getDetectorId(), exception));
listener.onFailure(exception);
}
});
}
Expand Down Expand Up @@ -523,7 +519,7 @@ private ActionListener<RCFPollingResponse> onPollRCFUpdates(
new ParameterizedMessage("Fail to get init progress through messaging for {}", detector.getDetectorId()),
exception
);
listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG + detector.getDetectorId(), exception);
listener.onFailure(exception);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.CATEGORY_FIELD_LIMIT;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import java.security.InvalidParameterException;
import java.util.List;
import java.util.Optional;
Expand All @@ -35,7 +34,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -113,25 +111,7 @@ public void profile(
new InvalidParameterException(CommonErrorMessages.CATEGORICAL_FIELD_NUMBER_SURPASSED + CATEGORY_FIELD_LIMIT)
);
} else {
int totalResponsesToWait = 0;
if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(EntityProfileName.STATE)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.MODELS)) {
totalResponsesToWait++;
}
MultiResponsesDelegateActionListener<EntityProfile> delegateListener =
new MultiResponsesDelegateActionListener<EntityProfile>(
listener,
totalResponsesToWait,
"Fail to fetch profile for " + entityValue + " of detector " + detectorId,
false
);
prepareEntityProfile(delegateListener, detectorId, entityValue, profilesToCollect, detector, categoryField.get(0));
prepareEntityProfile(listener, detectorId, entityValue, profilesToCollect, detector, categoryField.get(0));
}
} catch (Exception t) {
listener.onFailure(t);
Expand All @@ -143,7 +123,7 @@ public void profile(
}

private void prepareEntityProfile(
MultiResponsesDelegateActionListener<EntityProfile> delegateListener,
ActionListener<EntityProfile> listener,
String detectorId,
String entityValue,
Set<EntityProfileName> profilesToCollect,
Expand All @@ -158,8 +138,8 @@ private void prepareEntityProfile(
request,
ActionListener
.wrap(
r -> getJob(detectorId, categoryField, entityValue, profilesToCollect, detector, r, delegateListener),
delegateListener::failImmediately
r -> getJob(detectorId, categoryField, entityValue, profilesToCollect, detector, r, listener),
listener::onFailure
)
);
}
Expand All @@ -171,7 +151,7 @@ private void getJob(
Set<EntityProfileName> profilesToCollect,
AnomalyDetector detector,
EntityProfileResponse entityProfileResponse,
MultiResponsesDelegateActionListener<EntityProfile> delegateListener
ActionListener<EntityProfile> listener
) {
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
Expand All @@ -184,6 +164,25 @@ private void getJob(
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);

int totalResponsesToWait = 0;
if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(EntityProfileName.STATE)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.MODELS)) {
totalResponsesToWait++;
}
MultiResponsesDelegateActionListener<EntityProfile> delegateListener =
new MultiResponsesDelegateActionListener<EntityProfile>(
listener,
totalResponsesToWait,
CommonErrorMessages.FAIL_FETCH_ERR_MSG + entityValue + " of detector " + detectorId,
false
);

if (profilesToCollect.contains(EntityProfileName.MODELS)) {
EntityProfile.Builder builder = new EntityProfile.Builder(categoryField, entityValue);
if (false == job.isEnabled()) {
Expand Down Expand Up @@ -233,20 +232,20 @@ private void getJob(
delegateListener.onResponse(builder.build());
}));
}
} catch (IOException | XContentParseException | NullPointerException e) {
logger.error(e);
delegateListener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
} catch (Exception e) {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
listener.onFailure(e);
}
} else {
sendUnknownState(profilesToCollect, categoryField, entityValue, true, delegateListener);
sendUnknownState(profilesToCollect, categoryField, entityValue, true, listener);
}
}, exception -> {
if (exception instanceof IndexNotFoundException) {
logger.info(exception.getMessage());
sendUnknownState(profilesToCollect, categoryField, entityValue, true, delegateListener);
sendUnknownState(profilesToCollect, categoryField, entityValue, true, listener);
} else {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG + detectorId, exception);
delegateListener.failImmediately(exception);
listener.onFailure(exception);
}
}));
}
Expand Down Expand Up @@ -285,14 +284,14 @@ private void sendUnknownState(
String categoryField,
String entityValue,
boolean immediate,
MultiResponsesDelegateActionListener<EntityProfile> delegateListener
ActionListener<EntityProfile> delegateListener
) {
EntityProfile.Builder builder = new EntityProfile.Builder(categoryField, entityValue);
if (profilesToCollect.contains(EntityProfileName.STATE)) {
builder.state(EntityState.UNKNOWN);
}
if (immediate) {
delegateListener.respondImmediately(builder.build());
delegateListener.onResponse(builder.build());
} else {
delegateListener.onResponse(builder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ public class CommonErrorMessages {
public static String FAIL_TO_GET_TOTAL_ENTITIES = "Failed to get total entities for detector ";
public static String CATEGORICAL_FIELD_NUMBER_SURPASSED = "We don't support categorical fields more than ";
public static String EMPTY_PROFILES_COLLECT = "profiles to collect are missing or invalid";
public static String FAIL_FETCH_ERR_MSG = "Fail to fetch profile for ";
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (modelProfile != null) {
builder.field(CommonName.MODEL, modelProfile);
}
if (state != null) {
if (state != null && state != EntityState.UNKNOWN) {
ylwu-amzn marked this conversation as resolved.
Show resolved Hide resolved
builder.field(CommonName.STATE, state);
}
builder.endObject();
Expand Down Expand Up @@ -263,7 +263,7 @@ public String toString() {
if (modelProfile != null) {
builder.append(CommonName.MODELS, modelProfile);
}
if (state != null) {
if (state != null && state != EntityState.UNKNOWN) {
builder.append(CommonName.STATE, state);
}
return builder.toString();
Expand Down Expand Up @@ -330,7 +330,7 @@ public void merge(Mergeable other) {
if (otherProfile.modelProfile != null) {
this.modelProfile = otherProfile.modelProfile;
}
if (otherProfile.getState() != null) {
if (otherProfile.getState() != null && otherProfile.getState() != EntityState.UNKNOWN) {
this.state = otherProfile.getState();
}
}
Expand Down
Loading