Skip to content

Commit

Permalink
Change error code to honor remote service error code
Browse files Browse the repository at this point in the history
Signed-off-by: zane-neo <zaniu@amazon.com>
  • Loading branch information
zane-neo committed Jan 31, 2024
1 parent dee7989 commit 7702408
Showing 1 changed file with 39 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,23 @@ public void onStream(Publisher<ByteBuffer> stream) {
@Override
public void onError(Throwable error) {
log.error(error.getMessage(), error);
actionListener
.onFailure(
new OpenSearchStatusException(
"Error on communication with remote model: " + error.getMessage(),
RestStatus.INTERNAL_SERVER_ERROR
)
);
if (statusCode == null) {
actionListener
.onFailure(
new OpenSearchStatusException(
"Error on communication with remote model: " + error.getMessage(),
RestStatus.INTERNAL_SERVER_ERROR
)
);
} else {
actionListener
.onFailure(
new OpenSearchStatusException(
"Error on communication with remote model: " + error.getMessage(),
RestStatus.fromCode(statusCode)
)
);
}
}

private void processResponse(Integer statusCode, String body, Map<String, String> parameters, Map<Integer, ModelTensors> tensorOutputs)
Expand Down Expand Up @@ -152,13 +162,17 @@ public void onError(Throwable t) {
+ (t instanceof NullPointerException ? "NullPointerException" : t.getMessage())
);
if (countDownLatch.getCountDownLatch().getCount() == 0) {
actionListener
.onFailure(
new OpenSearchStatusException(
"Error on receiving response body from remote: " + String.join(",", errorMsg),
RestStatus.INTERNAL_SERVER_ERROR
)
);
if (t instanceof OpenSearchStatusException) {
actionListener.onFailure((OpenSearchStatusException) t);
} else {
actionListener
.onFailure(
new OpenSearchStatusException(
"Error on receiving response body from remote: " + String.join(",", errorMsg),
RestStatus.INTERNAL_SERVER_ERROR
)
);
}
} else {
log.debug("Not all responses received, left response count is: " + countDownLatch.getCountDownLatch().getCount());
}
Expand Down Expand Up @@ -188,13 +202,17 @@ public void onComplete() {
+ (e instanceof NullPointerException ? "NullPointerException" : e.getMessage())
);
if (countDownLatch.getCountDownLatch().getCount() == 0) {
actionListener
.onFailure(
new OpenSearchStatusException(
"Error on receiving response from remote: " + String.join(",", errorMsg),
RestStatus.INTERNAL_SERVER_ERROR
)
);
if (e instanceof OpenSearchStatusException) {
actionListener.onFailure((OpenSearchStatusException) e);
} else {
actionListener
.onFailure(
new OpenSearchStatusException(
"Error on receiving response from remote: " + String.join(",", errorMsg),
RestStatus.INTERNAL_SERVER_ERROR
)
);
}
} else {
log.debug("Not all responses received, left response count is: " + countDownLatch.getCountDownLatch().getCount());
}
Expand Down

0 comments on commit 7702408

Please sign in to comment.