Skip to content

Commit

Permalink
Initial commit of new job APIs
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vmmusings committed Sep 6, 2023
1 parent 24e01d6 commit 90b0eb1
Show file tree
Hide file tree
Showing 8 changed files with 431 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.rest;

import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.opensearch.rest.RestRequest.Method.GET;
import static org.opensearch.rest.RestRequest.Method.POST;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.sql.datasources.exceptions.ErrorMessage;
import org.opensearch.sql.datasources.utils.Scheduler;
import org.opensearch.sql.spark.rest.model.SubmitJobRequest;
import org.opensearch.sql.spark.transport.TransportGetQueryResultRequest;
import org.opensearch.sql.spark.transport.TransportSubmitJobRequest;
import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionRequest;
import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse;
import org.opensearch.sql.spark.transport.model.SubmitJobActionRequest;
import org.opensearch.sql.spark.transport.model.SubmitJobActionResponse;

public class RestJobManagementAction extends BaseRestHandler {

public static final String JOB_ACTIONS = "job_actions";
public static final String BASE_JOB_ACTION_URL = "/_plugins/_query/_jobs";

private static final Logger LOG = LogManager.getLogger(RestJobManagementAction.class);

@Override
public String getName() {
return JOB_ACTIONS;
}

@Override
public List<Route> routes() {
return ImmutableList.of(

/*
*
* Create a new job with spark execution engine.
* Request URL: POST
* Request body:
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionRequest]
* Response body:
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionResponse]
*/
new Route(POST, BASE_JOB_ACTION_URL),

/*
* GET query result from job {{jobId}} execution.
* Request URL: GET
* Request body:
* Ref [org.opensearch.sql.spark.transport.model.GetJobQueryResultActionRequest]
* Response body:
* Ref [org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse]
*/
new Route(
GET,
String.format(Locale.ROOT, "%s/{%s}/result", BASE_JOB_ACTION_URL, "jobId")));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {
switch (restRequest.method()) {
case POST:
return executePostRequest(restRequest, nodeClient);
case GET:
return executeGetRequest(restRequest, nodeClient);
default:
return restChannel ->
restChannel.sendResponse(
new BytesRestResponse(
RestStatus.METHOD_NOT_ALLOWED, String.valueOf(restRequest.method())));
}
}

private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {

SubmitJobRequest submitJobRequest
= SubmitJobRequest.fromXContentParser(restRequest.contentParser());
return restChannel ->
Scheduler.schedule(
nodeClient,
() ->
nodeClient.execute(
TransportSubmitJobRequest.ACTION_TYPE,
new SubmitJobActionRequest(submitJobRequest),
new ActionListener<>() {
@Override
public void onResponse(
SubmitJobActionResponse submitJobActionResponse) {
restChannel.sendResponse(
new BytesRestResponse(
RestStatus.CREATED,
"application/json; charset=UTF-8",
submitJobRequest.getQuery()));
}

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
}
}));
}

private RestChannelConsumer executeGetRequest(RestRequest restRequest, NodeClient nodeClient) {
String jobId = restRequest.param("jobId");
return restChannel ->
Scheduler.schedule(
nodeClient,
() ->
nodeClient.execute(
TransportGetQueryResultRequest.ACTION_TYPE,
new GetJobQueryResultActionRequest(jobId),
new ActionListener<>() {
@Override
public void onResponse(
GetJobQueryResultActionResponse getJobQueryResultActionResponse) {
restChannel.sendResponse(
new BytesRestResponse(
RestStatus.OK,
"application/json; charset=UTF-8",
getJobQueryResultActionResponse.getResult()));
}

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
}
}));
}


private void handleException(Exception e, RestChannel restChannel) {
if (e instanceof OpenSearchException) {
OpenSearchException exception = (OpenSearchException) e;
reportError(restChannel, exception, exception.status());
} else {
LOG.error("Error happened during request handling", e);
if (isClientError(e)) {
reportError(restChannel, e, BAD_REQUEST);
} else {
reportError(restChannel, e, SERVICE_UNAVAILABLE);
}
}
}

private void reportError(final RestChannel channel, final Exception e, final RestStatus status) {
channel.sendResponse(
new BytesRestResponse(status, new ErrorMessage(e, status.getStatus()).toString()));
}

private static boolean isClientError(Exception e) {
return e instanceof IllegalArgumentException
|| e instanceof IllegalStateException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.rest.model;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.opensearch.core.xcontent.XContentParser;

@Data
@AllArgsConstructor
public class SubmitJobRequest {

private String query;

public static SubmitJobRequest fromXContentParser(XContentParser parser) throws IOException {
String query = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
if (fieldName.equals("query")) {
query = parser.textOrNull();
} else {
throw new IllegalArgumentException("Unknown field: " + fieldName);
}
}
return new SubmitJobRequest(query);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.spark.transport;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionRequest;
import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

public class TransportGetQueryResultRequest
extends HandledTransportAction<GetJobQueryResultActionRequest, GetJobQueryResultActionResponse> {

public static final String NAME = "cluster:admin/opensearch/ql/jobs/submit";
public static final ActionType<GetJobQueryResultActionResponse> ACTION_TYPE =
new ActionType<>(NAME, GetJobQueryResultActionResponse::new);



protected TransportGetQueryResultRequest(TransportService transportService,
ActionFilters actionFilters) {
super(NAME, transportService, actionFilters, GetJobQueryResultActionRequest::new);
}

@Override
protected void doExecute(Task task, GetJobQueryResultActionRequest request,
ActionListener<GetJobQueryResultActionResponse> listener) {
try {
String responseContent = "successful_result";
listener.onResponse(new GetJobQueryResultActionResponse(responseContent));
} catch (Exception e) {
listener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.spark.transport;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.spark.transport.model.SubmitJobActionRequest;
import org.opensearch.sql.spark.transport.model.SubmitJobActionResponse;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

public class TransportSubmitJobRequest
extends HandledTransportAction<SubmitJobActionRequest, SubmitJobActionResponse> {

public static final String NAME = "cluster:admin/opensearch/ql/jobs/submit";
public static final ActionType<SubmitJobActionResponse> ACTION_TYPE =
new ActionType<>(NAME, SubmitJobActionResponse::new);



protected TransportSubmitJobRequest(TransportService transportService,
ActionFilters actionFilters) {
super(NAME, transportService, actionFilters, SubmitJobActionRequest::new);
}

@Override
protected void doExecute(Task task, SubmitJobActionRequest request,
ActionListener<SubmitJobActionResponse> listener) {
try {
String responseContent = "submitted_job";
listener.onResponse(new SubmitJobActionResponse(responseContent));
} catch (Exception e) {
listener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.spark.transport.model;

import java.io.IOException;
import lombok.Getter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.StreamInput;

public class GetJobQueryResultActionRequest extends ActionRequest {

@Getter
private String jobId;

/** Constructor of SubmitJobActionRequest from StreamInput. */
public GetJobQueryResultActionRequest(StreamInput in) throws IOException {
super(in);
}

public GetJobQueryResultActionRequest(String jobId) {
this.jobId = jobId;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.spark.transport.model;

import java.io.IOException;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

@RequiredArgsConstructor
public class GetJobQueryResultActionResponse extends ActionResponse {

@Getter private final String result;

public GetJobQueryResultActionResponse(StreamInput in) throws IOException {
super(in);
result = in.readString();
}

@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
streamOutput.writeString(result);
}
}
Loading

0 comments on commit 90b0eb1

Please sign in to comment.