Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async query get result bug fix #2443

Merged
merged 12 commits into from
Dec 1, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.execution.statement.StatementState;

/** Process async query request. */
public abstract class AsyncQueryHandler {
Expand All @@ -33,10 +34,20 @@ public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata)
result.put(ERROR_FIELD, error);
return result;
} else {
return getResponseFromExecutor(asyncQueryJobMetadata);
JSONObject statement = getResponseFromExecutor(asyncQueryJobMetadata);

// Consider statement still running if state is success but query result unavailable
if (isSuccessState(statement)) {
statement.put(STATUS_FIELD, StatementState.RUNNING.getState());
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
}
return statement;
}
}

private boolean isSuccessState(JSONObject statement) {
return StatementState.SUCCESS.getState().equalsIgnoreCase(statement.optString(STATUS_FIELD));
}

protected abstract JSONObject getResponseFromResultIndex(
AsyncQueryJobMetadata asyncQueryJobMetadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.spark.dispatcher;

import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult;

import com.amazonaws.services.emrserverless.model.JobRunState;
Expand All @@ -24,6 +26,7 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
Expand Down Expand Up @@ -106,7 +109,11 @@ protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQuery

@Override
protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
throw new IllegalStateException("[BUG] can't fetch result of index DML query form server");
// Consider statement still running if result doc created in submit() is not available yet
JSONObject result = new JSONObject();
result.put(STATUS_FIELD, StatementState.RUNNING.getState());
result.put(ERROR_FIELD, "");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the error field mandatory even if there is no error ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In getAsyncQueryResults() API code, it always fetch status and error field from what is returned here. Although it uses optString, I thought it maybe more clear to pass it here.

return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import java.util.List;
import java.util.Optional;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.node.NodeClient;
Expand All @@ -41,6 +43,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand All @@ -63,6 +66,9 @@
import org.opensearch.sql.spark.execution.session.SessionState;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexType;
import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.storage.DataSourceFactory;
Expand Down Expand Up @@ -189,10 +195,17 @@ private DataSourceServiceImpl createDataSourceService() {

protected AsyncQueryExecutorService createAsyncQueryExecutorService(
EMRServerlessClient emrServerlessClient) {
return createAsyncQueryExecutorService(
emrServerlessClient, new JobExecutionResponseReader(client));
}

/** Pass a custom response reader which can mock interaction between PPL plugin and EMR-S job. */
protected AsyncQueryExecutorService createAsyncQueryExecutorService(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader) {
StateStore stateStore = new StateStore(client, clusterService);
AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService =
new OpensearchAsyncQueryJobMetadataStorageService(stateStore);
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
emrServerlessClient,
Expand All @@ -215,6 +228,7 @@ public static class LocalEMRSClient implements EMRServerlessClient {
private int startJobRunCalled = 0;
private int cancelJobRunCalled = 0;
private int getJobResult = 0;
private String jobState = "RUNNING";
dai-chen marked this conversation as resolved.
Show resolved Hide resolved

@Getter private StartJobRequest jobRequest;

Expand All @@ -229,7 +243,7 @@ public String startJobRun(StartJobRequest startJobRequest) {
public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
getJobResult++;
JobRun jobRun = new JobRun();
jobRun.setState("RUNNING");
jobRun.setState(jobState);
return new GetJobRunResult().withJobRun(jobRun);
}

Expand All @@ -250,6 +264,10 @@ public void cancelJobRunCalled(int expectedTimes) {
public void getJobRunResultCalled(int expectedTimes) {
assertEquals(expectedTimes, getJobResult);
}

public void setJobState(String jobState) {
this.jobState = jobState;
}
}

public SparkExecutionEngineConfig sparkExecutionEngineConfig() {
Expand Down Expand Up @@ -306,6 +324,111 @@ public String loadResultIndexMappings() {
return Resources.toString(url, Charsets.UTF_8);
}

public class MockFlintSparkJob {

private FlintIndexStateModel stateModel;

public MockFlintSparkJob(String latestId) {
assertNotNull(latestId);
stateModel =
new FlintIndexStateModel(
FlintIndexState.EMPTY,
"mockAppId",
"mockJobId",
latestId,
DATASOURCE,
System.currentTimeMillis(),
"",
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
stateModel = StateStore.createFlintIndexState(stateStore, DATASOURCE).apply(stateModel);
}

public void refreshing() {
stateModel =
StateStore.updateFlintIndexState(stateStore, DATASOURCE)
.apply(stateModel, FlintIndexState.REFRESHING);
}

public void cancelling() {
stateModel =
StateStore.updateFlintIndexState(stateStore, DATASOURCE)
.apply(stateModel, FlintIndexState.CANCELLING);
}

public void active() {
stateModel =
StateStore.updateFlintIndexState(stateStore, DATASOURCE)
.apply(stateModel, FlintIndexState.ACTIVE);
}

public void deleting() {
stateModel =
StateStore.updateFlintIndexState(stateStore, DATASOURCE)
.apply(stateModel, FlintIndexState.DELETING);
}

public void deleted() {
stateModel =
StateStore.updateFlintIndexState(stateStore, DATASOURCE)
.apply(stateModel, FlintIndexState.DELETED);
}

void assertState(FlintIndexState expected) {
Optional<FlintIndexStateModel> stateModelOpt =
StateStore.getFlintIndexState(stateStore, DATASOURCE).apply(stateModel.getId());
assertTrue((stateModelOpt.isPresent()));
assertEquals(expected, stateModelOpt.get().getIndexState());
}
}

@RequiredArgsConstructor
public class FlintDatasetMock {
final String query;
final FlintIndexType indexType;
final String indexName;
boolean isLegacy = false;
String latestId;

FlintDatasetMock isLegacy(boolean isLegacy) {
this.isLegacy = isLegacy;
return this;
}

FlintDatasetMock latestId(String latestId) {
this.latestId = latestId;
return this;
}

public void createIndex() {
String pathPrefix = isLegacy ? "flint-index-mappings" : "flint-index-mappings/0.1.1";
switch (indexType) {
case SKIPPING:
createIndexWithMappings(
indexName, loadMappings(pathPrefix + "/" + "flint_skipping_index.json"));
break;
case COVERING:
createIndexWithMappings(
indexName, loadMappings(pathPrefix + "/" + "flint_covering_index.json"));
break;
case MATERIALIZED_VIEW:
createIndexWithMappings(indexName, loadMappings(pathPrefix + "/" + "flint_mv.json"));
break;
}
}

@SneakyThrows
public void deleteIndex() {
client().admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get();
}
}

@SneakyThrows
public static String loadMappings(String path) {
URL url = Resources.getResource(path);
return Resources.toString(url, Charsets.UTF_8);
}

public void createIndexWithMappings(String indexName, String metadata) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.mapping(metadata, XContentType.JSON);
Expand Down
Loading
Loading