diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java index b3d2cdd289..d61ac17aa3 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java @@ -15,6 +15,7 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; +import org.opensearch.sql.spark.execution.statement.StatementState; /** Process async query request. */ public abstract class AsyncQueryHandler { @@ -33,10 +34,20 @@ public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) result.put(ERROR_FIELD, error); return result; } else { - return getResponseFromExecutor(asyncQueryJobMetadata); + JSONObject statement = getResponseFromExecutor(asyncQueryJobMetadata); + + // Consider statement still running if state is success but query result unavailable + if (isSuccessState(statement)) { + statement.put(STATUS_FIELD, StatementState.RUNNING.getState()); + } + return statement; } } + private boolean isSuccessState(JSONObject statement) { + return StatementState.SUCCESS.getState().equalsIgnoreCase(statement.optString(STATUS_FIELD)); + } + protected abstract JSONObject getResponseFromResultIndex( AsyncQueryJobMetadata asyncQueryJobMetadata); diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index 3ab5439ad5..a03cd64986 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -5,6 +5,8 @@ package org.opensearch.sql.spark.dispatcher; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult; import com.amazonaws.services.emrserverless.model.JobRunState; @@ -24,6 +26,7 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; +import org.opensearch.sql.spark.execution.statement.StatementState; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; @@ -106,7 +109,11 @@ protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQuery @Override protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) { - throw new IllegalStateException("[BUG] can't fetch result of index DML query form server"); + // Consider statement still running if result doc created in submit() is not available yet + JSONObject result = new JSONObject(); + result.put(STATUS_FIELD, StatementState.RUNNING.getState()); + result.put(ERROR_FIELD, ""); + return result; } @Override diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 663e5db852..c7054dd200 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -15,6 +15,7 @@ import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRun; +import com.amazonaws.services.emrserverless.model.JobRunState; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -27,11 +28,13 @@ import java.util.List; import java.util.Optional; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.node.NodeClient; @@ -41,6 +44,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -63,6 +67,9 @@ import org.opensearch.sql.spark.execution.session.SessionState; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; +import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.storage.DataSourceFactory; @@ -189,10 +196,17 @@ private DataSourceServiceImpl createDataSourceService() { protected AsyncQueryExecutorService createAsyncQueryExecutorService( EMRServerlessClient emrServerlessClient) { + return createAsyncQueryExecutorService( + emrServerlessClient, new JobExecutionResponseReader(client)); + } + + /** Pass a custom response reader which can mock interaction between PPL plugin and EMR-S job. */ + protected AsyncQueryExecutorService createAsyncQueryExecutorService( + EMRServerlessClient emrServerlessClient, + JobExecutionResponseReader jobExecutionResponseReader) { StateStore stateStore = new StateStore(client, clusterService); AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService = new OpensearchAsyncQueryJobMetadataStorageService(stateStore); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); SparkQueryDispatcher sparkQueryDispatcher = new SparkQueryDispatcher( emrServerlessClient, @@ -215,6 +229,7 @@ public static class LocalEMRSClient implements EMRServerlessClient { private int startJobRunCalled = 0; private int cancelJobRunCalled = 0; private int getJobResult = 0; + private JobRunState jobState = JobRunState.RUNNING; @Getter private StartJobRequest jobRequest; @@ -229,7 +244,7 @@ public String startJobRun(StartJobRequest startJobRequest) { public GetJobRunResult getJobRunResult(String applicationId, String jobId) { getJobResult++; JobRun jobRun = new JobRun(); - jobRun.setState("RUNNING"); + jobRun.setState(jobState.toString()); return new GetJobRunResult().withJobRun(jobRun); } @@ -250,6 +265,10 @@ public void cancelJobRunCalled(int expectedTimes) { public void getJobRunResultCalled(int expectedTimes) { assertEquals(expectedTimes, getJobResult); } + + public void setJobState(JobRunState jobState) { + this.jobState = jobState; + } } public SparkExecutionEngineConfig sparkExecutionEngineConfig() { @@ -306,6 +325,111 @@ public String loadResultIndexMappings() { return Resources.toString(url, Charsets.UTF_8); } + public class MockFlintSparkJob { + + private FlintIndexStateModel stateModel; + + public MockFlintSparkJob(String latestId) { + assertNotNull(latestId); + stateModel = + new FlintIndexStateModel( + FlintIndexState.EMPTY, + "mockAppId", + "mockJobId", + latestId, + DATASOURCE, + System.currentTimeMillis(), + "", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + stateModel = StateStore.createFlintIndexState(stateStore, DATASOURCE).apply(stateModel); + } + + public void refreshing() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.REFRESHING); + } + + public void cancelling() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.CANCELLING); + } + + public void active() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.ACTIVE); + } + + public void deleting() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.DELETING); + } + + public void deleted() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.DELETED); + } + + void assertState(FlintIndexState expected) { + Optional stateModelOpt = + StateStore.getFlintIndexState(stateStore, DATASOURCE).apply(stateModel.getId()); + assertTrue((stateModelOpt.isPresent())); + assertEquals(expected, stateModelOpt.get().getIndexState()); + } + } + + @RequiredArgsConstructor + public class FlintDatasetMock { + final String query; + final FlintIndexType indexType; + final String indexName; + boolean isLegacy = false; + String latestId; + + FlintDatasetMock isLegacy(boolean isLegacy) { + this.isLegacy = isLegacy; + return this; + } + + FlintDatasetMock latestId(String latestId) { + this.latestId = latestId; + return this; + } + + public void createIndex() { + String pathPrefix = isLegacy ? "flint-index-mappings" : "flint-index-mappings/0.1.1"; + switch (indexType) { + case SKIPPING: + createIndexWithMappings( + indexName, loadMappings(pathPrefix + "/" + "flint_skipping_index.json")); + break; + case COVERING: + createIndexWithMappings( + indexName, loadMappings(pathPrefix + "/" + "flint_covering_index.json")); + break; + case MATERIALIZED_VIEW: + createIndexWithMappings(indexName, loadMappings(pathPrefix + "/" + "flint_mv.json")); + break; + } + } + + @SneakyThrows + public void deleteIndex() { + client().admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get(); + } + } + + @SneakyThrows + public static String loadMappings(String path) { + URL url = Resources.getResource(path); + return Resources.toString(url, Charsets.UTF_8); + } + public void createIndexWithMappings(String indexName, String metadata) { CreateIndexRequest request = new CreateIndexRequest(indexName); request.mapping(metadata, XContentType.JSON); diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java new file mode 100644 index 0000000000..bba38693cd --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -0,0 +1,317 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery; + +import static org.opensearch.action.support.WriteRequest.RefreshPolicy.WAIT_UNTIL; +import static org.opensearch.sql.data.model.ExprValueUtils.tupleValue; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; +import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; + +import com.amazonaws.services.emrserverless.model.JobRunState; +import com.google.common.collect.ImmutableList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.json.JSONObject; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; +import org.opensearch.sql.spark.execution.statement.StatementModel; +import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexType; +import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; +import org.opensearch.sql.spark.rest.model.LangType; + +public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec { + + /** Mock Flint index and index state */ + private final FlintDatasetMock mockIndex = + new FlintDatasetMock( + "DROP SKIPPING INDEX ON mys3.default.http_logs", + FlintIndexType.SKIPPING, + "flint_mys3_default_http_logs_skipping_index") + .latestId("skippingindexid"); + + private MockFlintSparkJob mockIndexState; + + @Before + public void doSetUp() { + mockIndexState = new MockFlintSparkJob(mockIndex.latestId); + } + + @Test + public void testInteractiveQueryGetResult() { + createAsyncQuery("SELECT 1") + .withInteraction(InteractionStep::pluginSearchQueryResult) + .assertQueryResults("waiting", null) + .withInteraction( + interaction -> { + interaction.emrJobWriteResultDoc(createResultDoc(interaction.queryId)); + interaction.emrJobUpdateStatementState(StatementState.SUCCESS); + return interaction.pluginSearchQueryResult(); + }) + .assertQueryResults("SUCCESS", ImmutableList.of(tupleValue(Map.of("1", 1)))); + } + + @Test + public void testInteractiveQueryGetResultWithConcurrentEmrJobUpdate() { + createAsyncQuery("SELECT 1") + .withInteraction(InteractionStep::pluginSearchQueryResult) + .assertQueryResults("waiting", null) + .withInteraction( + interaction -> { + JSONObject result = interaction.pluginSearchQueryResult(); + interaction.emrJobWriteResultDoc(createResultDoc(interaction.queryId)); + interaction.emrJobUpdateStatementState(StatementState.SUCCESS); + return result; + }) + .assertQueryResults("running", null) + .withInteraction(InteractionStep::pluginSearchQueryResult) + .assertQueryResults("SUCCESS", ImmutableList.of(tupleValue(Map.of("1", 1)))); + } + + @Test + public void testBatchQueryGetResult() { + createAsyncQuery("REFRESH SKIPPING INDEX ON test") + .withInteraction( + interaction -> { + interaction.emrJobWriteResultDoc(createEmptyResultDoc(interaction.queryId)); + interaction.emrJobUpdateJobState(JobRunState.SUCCESS); + return interaction.pluginSearchQueryResult(); + }) + .assertQueryResults("SUCCESS", ImmutableList.of()); + } + + @Test + public void testBatchQueryGetResultWithConcurrentEmrJobUpdate() { + createAsyncQuery("REFRESH SKIPPING INDEX ON test") + .withInteraction( + interaction -> { + JSONObject result = interaction.pluginSearchQueryResult(); + interaction.emrJobWriteResultDoc(createEmptyResultDoc(interaction.queryId)); + interaction.emrJobUpdateJobState(JobRunState.SUCCESS); + return result; + }) + .assertQueryResults("running", null) + .withInteraction(InteractionStep::pluginSearchQueryResult) + .assertQueryResults("SUCCESS", ImmutableList.of()); + } + + @Test + public void testStreamingQueryGetResult() { + // Create mock index with index state refreshing + mockIndex.createIndex(); + mockIndexState.refreshing(); + try { + createAsyncQuery( + "CREATE SKIPPING INDEX ON mys3.default.http_logs " + + "(l_orderkey VALUE_SET) WITH (auto_refresh = true)") + .withInteraction( + interaction -> { + interaction.emrJobWriteResultDoc(createEmptyResultDoc(interaction.queryId)); + interaction.emrJobUpdateJobState(JobRunState.SUCCESS); + return interaction.pluginSearchQueryResult(); + }) + .assertQueryResults("SUCCESS", ImmutableList.of()); + } finally { + mockIndex.deleteIndex(); + mockIndexState.deleted(); + } + } + + @Test + public void testStreamingQueryGetResultWithConcurrentEmrJobUpdate() { + // Create mock index with index state refreshing + mockIndex.createIndex(); + mockIndexState.refreshing(); + try { + createAsyncQuery( + "CREATE SKIPPING INDEX ON mys3.default.http_logs " + + "(l_orderkey VALUE_SET) WITH (auto_refresh = true)") + .withInteraction( + interaction -> { + JSONObject result = interaction.pluginSearchQueryResult(); + interaction.emrJobWriteResultDoc(createEmptyResultDoc(interaction.queryId)); + interaction.emrJobUpdateJobState(JobRunState.SUCCESS); + return result; + }) + .assertQueryResults("running", null) + .withInteraction(InteractionStep::pluginSearchQueryResult) + .assertQueryResults("SUCCESS", ImmutableList.of()); + } finally { + mockIndex.deleteIndex(); + mockIndexState.deleted(); + } + } + + @Test + public void testDropIndexQueryGetResult() { + // Create mock index with index state refreshing + mockIndex.createIndex(); + mockIndexState.refreshing(); + + LocalEMRSClient emrClient = new LocalEMRSClient(); + emrClient.setJobState(JobRunState.CANCELLED); + createAsyncQuery(mockIndex.query, emrClient) + .withInteraction(InteractionStep::pluginSearchQueryResult) + .assertQueryResults("SUCCESS", ImmutableList.of()); + } + + @Test + public void testDropIndexQueryGetResultWithResultDocRefreshDelay() { + // Create mock index with index state refreshing + mockIndex.createIndex(); + mockIndexState.refreshing(); + + LocalEMRSClient emrClient = new LocalEMRSClient(); + emrClient.setJobState(JobRunState.CANCELLED); + createAsyncQuery(mockIndex.query, emrClient) + .withInteraction(interaction -> new JSONObject()) // simulate result index refresh delay + .assertQueryResults("running", null) + .withInteraction(InteractionStep::pluginSearchQueryResult) + .assertQueryResults("SUCCESS", ImmutableList.of()); + } + + private AssertionHelper createAsyncQuery(String query) { + return new AssertionHelper(query, new LocalEMRSClient()); + } + + private AssertionHelper createAsyncQuery(String query, LocalEMRSClient emrClient) { + return new AssertionHelper(query, emrClient); + } + + private class AssertionHelper { + private final AsyncQueryExecutorService queryService; + private final CreateAsyncQueryResponse createQueryResponse; + private Interaction interaction; + + AssertionHelper(String query, LocalEMRSClient emrClient) { + this.queryService = + createAsyncQueryExecutorService( + emrClient, + /* + * Custom reader that intercepts get results call and inject extra steps defined in + * current interaction. Intercept both get methods for different query handler which + * will only call either of them. + */ + new JobExecutionResponseReader(client) { + @Override + public JSONObject getResultFromOpensearchIndex(String jobId, String resultIndex) { + return interaction.interact(new InteractionStep(emrClient, jobId, resultIndex)); + } + + @Override + public JSONObject getResultWithQueryId(String queryId, String resultIndex) { + return interaction.interact(new InteractionStep(emrClient, queryId, resultIndex)); + } + }); + this.createQueryResponse = + queryService.createAsyncQuery( + new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + } + + AssertionHelper withInteraction(Interaction interaction) { + this.interaction = interaction; + return this; + } + + AssertionHelper assertQueryResults(String status, List data) { + AsyncQueryExecutionResponse results = + queryService.getAsyncQueryResults(createQueryResponse.getQueryId()); + assertEquals(status, results.getStatus()); + assertEquals(data, results.getResults()); + return this; + } + } + + /** Define an interaction between PPL plugin and EMR-S job. */ + private interface Interaction { + + JSONObject interact(InteractionStep interaction); + } + + /** + * Each method in this class is one step that can happen in an interaction. These methods are + * called in any order to simulate concurrent scenario. + */ + private class InteractionStep { + private final LocalEMRSClient emrClient; + final String queryId; + final String resultIndex; + + private InteractionStep(LocalEMRSClient emrClient, String queryId, String resultIndex) { + this.emrClient = emrClient; + this.queryId = queryId; + this.resultIndex = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; + } + + /** Simulate PPL plugin search query_execution_result */ + JSONObject pluginSearchQueryResult() { + return new JobExecutionResponseReader(client).getResultWithQueryId(queryId, resultIndex); + } + + /** Simulate EMR-S bulk writes query_execution_result with refresh = wait_for */ + void emrJobWriteResultDoc(Map resultDoc) { + try { + IndexRequest request = + new IndexRequest().index(resultIndex).setRefreshPolicy(WAIT_UNTIL).source(resultDoc); + client.index(request).get(); + } catch (Exception e) { + Assert.fail("Failed to write result doc: " + e.getMessage()); + } + } + + /** Simulate EMR-S updates query_execution_request with state */ + void emrJobUpdateStatementState(StatementState newState) { + StatementModel stmt = getStatement(stateStore, DATASOURCE).apply(queryId).get(); + StateStore.updateStatementState(stateStore, DATASOURCE).apply(stmt, newState); + } + + void emrJobUpdateJobState(JobRunState jobState) { + emrClient.setJobState(jobState); + } + } + + private Map createEmptyResultDoc(String queryId) { + Map document = new HashMap<>(); + document.put("result", ImmutableList.of()); + document.put("schema", ImmutableList.of()); + document.put("jobRunId", "XXX"); + document.put("applicationId", "YYY"); + document.put("dataSourceName", DATASOURCE); + document.put("status", "SUCCESS"); + document.put("error", ""); + document.put("queryId", queryId); + document.put("queryText", "SELECT 1"); + document.put("sessionId", "ZZZ"); + document.put("updateTime", 1699124602715L); + document.put("queryRunTime", 123); + return document; + } + + private Map createResultDoc(String queryId) { + Map document = new HashMap<>(); + document.put("result", ImmutableList.of("{'1':1}")); + document.put("schema", ImmutableList.of("{'column_name':'1','data_type':'integer'}")); + document.put("jobRunId", "XXX"); + document.put("applicationId", "YYY"); + document.put("dataSourceName", DATASOURCE); + document.put("status", "SUCCESS"); + document.put("error", ""); + document.put("queryId", queryId); + document.put("queryText", "SELECT 1"); + document.put("sessionId", "ZZZ"); + document.put("updateTime", 1699124602715L); + document.put("queryRunTime", 123); + return document; + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index 45a83b3296..49ac538e65 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -8,21 +8,11 @@ import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRun; -import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; -import com.google.common.io.Resources; -import java.net.URL; -import java.util.Optional; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; import org.junit.Assert; import org.junit.Test; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexState; -import org.opensearch.sql.spark.flint.FlintIndexStateModel; import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.leasemanager.ConcurrencyLimitExceededException; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; @@ -685,109 +675,4 @@ public void concurrentRefreshJobLimitNotAppliedToDDL() { new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); assertNotNull(asyncQueryResponse.getSessionId()); } - - public class MockFlintSparkJob { - - private FlintIndexStateModel stateModel; - - public MockFlintSparkJob(String latestId) { - assertNotNull(latestId); - stateModel = - new FlintIndexStateModel( - FlintIndexState.EMPTY, - "mockAppId", - "mockJobId", - latestId, - DATASOURCE, - System.currentTimeMillis(), - "", - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM); - stateModel = StateStore.createFlintIndexState(stateStore, DATASOURCE).apply(stateModel); - } - - public void refreshing() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.REFRESHING); - } - - public void cancelling() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.CANCELLING); - } - - public void active() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.ACTIVE); - } - - public void deleting() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.DELETING); - } - - public void deleted() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.DELETED); - } - - void assertState(FlintIndexState expected) { - Optional stateModelOpt = - StateStore.getFlintIndexState(stateStore, DATASOURCE).apply(stateModel.getId()); - assertTrue((stateModelOpt.isPresent())); - assertEquals(expected, stateModelOpt.get().getIndexState()); - } - } - - @RequiredArgsConstructor - public class FlintDatasetMock { - private final String query; - private final FlintIndexType indexType; - private final String indexName; - private boolean isLegacy = false; - private String latestId; - - FlintDatasetMock isLegacy(boolean isLegacy) { - this.isLegacy = isLegacy; - return this; - } - - FlintDatasetMock latestId(String latestId) { - this.latestId = latestId; - return this; - } - - public void createIndex() { - String pathPrefix = isLegacy ? "flint-index-mappings" : "flint-index-mappings/0.1.1"; - switch (indexType) { - case SKIPPING: - createIndexWithMappings( - indexName, loadMappings(pathPrefix + "/" + "flint_skipping_index.json")); - break; - case COVERING: - createIndexWithMappings( - indexName, loadMappings(pathPrefix + "/" + "flint_covering_index.json")); - break; - case MATERIALIZED_VIEW: - createIndexWithMappings(indexName, loadMappings(pathPrefix + "/" + "flint_mv.json")); - break; - } - } - - @SneakyThrows - public void deleteIndex() { - client().admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get(); - } - } - - @SneakyThrows - public static String loadMappings(String path) { - URL url = Resources.getResource(path); - return Resources.toString(url, Charsets.UTF_8); - } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java index 8419d50ae1..01c46c3c0b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java @@ -6,16 +6,19 @@ package org.opensearch.sql.spark.dispatcher; import static org.junit.jupiter.api.Assertions.*; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; +import org.json.JSONObject; import org.junit.jupiter.api.Test; class IndexDMLHandlerTest { @Test public void getResponseFromExecutor() { - assertThrows( - IllegalStateException.class, - () -> - new IndexDMLHandler(null, null, null, null, null, null, null) - .getResponseFromExecutor(null)); + JSONObject result = + new IndexDMLHandler(null, null, null, null, null, null, null).getResponseFromExecutor(null); + + assertEquals("running", result.getString(STATUS_FIELD)); + assertEquals("", result.getString(ERROR_FIELD)); } }