Skip to content

Commit

Permalink
Add listener wrapper
Browse files Browse the repository at this point in the history
Signed-off-by: zane-neo <zaniu@amazon.com>
  • Loading branch information
zane-neo committed Nov 24, 2023
1 parent 86e0a6e commit 347f08c
Showing 1 changed file with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.opensearch.OpenSearchSecurityException;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.ml.common.conversation.ActionConstants;
import org.opensearch.ml.common.transport.connector.MLCreateConnectorResponse;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
Expand All @@ -27,6 +29,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;

import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
Expand Down Expand Up @@ -77,11 +80,14 @@ protected Set<String> responseParams() {
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nodeClient) {
TransportPPLQueryRequest transportPPLQueryRequest =
new TransportPPLQueryRequest(PPLQueryRequestFactory.getPPLRequest(request));
LOG.info("request classloader: " + transportPPLQueryRequest.getClass().getClassLoader());
LOG.info("response classloader:" + TransportPPLQueryResponse.class.getClassLoader());

return channel ->
nodeClient.execute(
PPLQueryAction.INSTANCE,
transportPPLQueryRequest,
wrapActionListener(
new ActionListener<>() {
@Override
public void onResponse(TransportPPLQueryResponse response) {
Expand All @@ -107,7 +113,7 @@ public void onFailure(Exception e) {
reportError(channel, e, INTERNAL_SERVER_ERROR);
}
}
});
}));
}

private void sendResponse(RestChannel channel, RestStatus status, String content) {
Expand All @@ -117,4 +123,11 @@ private void sendResponse(RestChannel channel, RestStatus status, String content
private void reportError(final RestChannel channel, final Exception e, final RestStatus status) {
channel.sendResponse(new BytesRestResponse(status, e.getMessage()));
}
private ActionListener<TransportPPLQueryResponse> wrapActionListener(
final ActionListener<TransportPPLQueryResponse> listener) {
return ActionListener.wrap(r -> {
TransportPPLQueryResponse pplQueryResponse = TransportPPLQueryResponse.fromActionResponse(r);
listener.onResponse(pplQueryResponse);
}, listener::onFailure);
}
}

0 comments on commit 347f08c

Please sign in to comment.