diff --git a/plugin/src/main/java/org/opensearch/ml/rest/MyRestPPLQueryAction.java b/plugin/src/main/java/org/opensearch/ml/rest/MyRestPPLQueryAction.java index d6f08c0444..d186ad2bdd 100644 --- a/plugin/src/main/java/org/opensearch/ml/rest/MyRestPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/ml/rest/MyRestPPLQueryAction.java @@ -11,8 +11,10 @@ import org.opensearch.OpenSearchSecurityException; import org.opensearch.client.node.NodeClient; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.ActionResponse; import org.opensearch.core.rest.RestStatus; import org.opensearch.ml.common.conversation.ActionConstants; +import org.opensearch.ml.common.transport.connector.MLCreateConnectorResponse; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestChannel; @@ -27,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.Function; import static org.opensearch.core.rest.RestStatus.BAD_REQUEST; import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; @@ -77,11 +80,14 @@ protected Set responseParams() { protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nodeClient) { TransportPPLQueryRequest transportPPLQueryRequest = new TransportPPLQueryRequest(PPLQueryRequestFactory.getPPLRequest(request)); + LOG.info("request classloader: " + transportPPLQueryRequest.getClass().getClassLoader()); + LOG.info("response classloader:" + TransportPPLQueryResponse.class.getClassLoader()); return channel -> nodeClient.execute( PPLQueryAction.INSTANCE, transportPPLQueryRequest, + wrapActionListener( new ActionListener<>() { @Override public void onResponse(TransportPPLQueryResponse response) { @@ -107,7 +113,7 @@ public void onFailure(Exception e) { reportError(channel, e, INTERNAL_SERVER_ERROR); } } - }); + })); } private void sendResponse(RestChannel channel, RestStatus status, String content) { @@ -117,4 +123,11 @@ private void sendResponse(RestChannel channel, RestStatus status, String content private void reportError(final RestChannel channel, final Exception e, final RestStatus status) { channel.sendResponse(new BytesRestResponse(status, e.getMessage())); } + private ActionListener wrapActionListener( + final ActionListener listener) { + return ActionListener.wrap(r -> { + TransportPPLQueryResponse pplQueryResponse = TransportPPLQueryResponse.fromActionResponse(r); + listener.onResponse(pplQueryResponse); + }, listener::onFailure); + } }