From d737bfdd64f6a309496027f9ce8093e227c60b3a Mon Sep 17 00:00:00 2001 From: Eric Date: Wed, 8 Nov 2023 21:19:33 +0000 Subject: [PATCH] Revert "Redefine Drop Index as logical delete (#2386) (#2397)" This reverts commit e939bb68ce0f620934c6a234fcffaba9bdb2b304. Signed-off-by: Eric --- .../sql/common/setting/Settings.java | 9 +- docs/user/admin/settings.rst | 26 +- .../setting/OpenSearchSettings.java | 28 +- .../org/opensearch/sql/plugin/SQLPlugin.java | 3 +- spark/build.gradle | 1 - .../AsyncQueryExecutorServiceImpl.java | 1 + .../model/AsyncQueryJobMetadata.java | 17 +- .../spark/dispatcher/AsyncQueryHandler.java | 5 + .../spark/dispatcher/BatchQueryHandler.java | 7 +- .../sql/spark/dispatcher/IndexDMLHandler.java | 116 --- .../dispatcher/InteractiveQueryHandler.java | 1 + .../dispatcher/SparkQueryDispatcher.java | 125 ++- .../dispatcher/StreamingQueryHandler.java | 11 +- .../model/DispatchQueryResponse.java | 1 + .../dispatcher/model/IndexDMLResult.java | 74 -- .../execution/session/SessionManager.java | 4 +- .../execution/statestore/StateModel.java | 4 - .../execution/statestore/StateStore.java | 70 +- .../sql/spark/flint/FlintIndexMetadata.java | 14 +- .../sql/spark/flint/FlintIndexState.java | 54 -- .../sql/spark/flint/FlintIndexStateModel.java | 150 ---- .../spark/flint/operation/FlintIndexOp.java | 111 --- .../flint/operation/FlintIndexOpCancel.java | 76 -- .../flint/operation/FlintIndexOpDelete.java | 39 - ...DefaultSparkSqlFunctionResponseHandle.java | 1 + .../leasemanager/DefaultLeaseManager.java | 42 +- ...AsyncQueryExecutorServiceImplSpecTest.java | 277 +++++- .../AsyncQueryExecutorServiceImplTest.java | 4 +- .../AsyncQueryExecutorServiceSpec.java | 291 ------- .../spark/asyncquery/IndexQuerySpecTest.java | 793 ------------------ ...yncQueryJobMetadataStorageServiceTest.java | 4 +- .../spark/dispatcher/DropIndexResultTest.java | 51 ++ .../spark/dispatcher/IndexDMLHandlerTest.java | 21 - .../dispatcher/SparkQueryDispatcherTest.java | 275 +++++- .../execution/session/SessionManagerTest.java | 3 + .../sql/spark/flint/FlintIndexStateTest.java | 18 - .../flint/operation/FlintIndexOpTest.java | 61 -- .../leasemanager/DefaultLeaseManagerTest.java | 16 +- .../0.1.1/flint_covering_index.json | 37 - .../flint-index-mappings/0.1.1/flint_mv.json | 30 - .../0.1.1/flint_skipping_index.json | 23 - .../flint_covering_index.json | 36 - .../flint-index-mappings/flint_mv.json | 42 - .../flint_skipping_index.json | 22 - .../query_execution_result_mapping.json | 44 - 45 files changed, 769 insertions(+), 2269 deletions(-) delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDelete.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/dispatcher/DropIndexResultTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexStateTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java delete mode 100644 spark/src/test/resources/flint-index-mappings/0.1.1/flint_covering_index.json delete mode 100644 spark/src/test/resources/flint-index-mappings/0.1.1/flint_mv.json delete mode 100644 spark/src/test/resources/flint-index-mappings/0.1.1/flint_skipping_index.json delete mode 100644 spark/src/test/resources/flint-index-mappings/flint_covering_index.json delete mode 100644 spark/src/test/resources/flint-index-mappings/flint_mv.json delete mode 100644 spark/src/test/resources/flint-index-mappings/flint_skipping_index.json delete mode 100644 spark/src/test/resources/query_execution_result_mapping.json diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index c9e348dbd4..61d23a1a34 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -5,6 +5,8 @@ package org.opensearch.sql.common.setting; +import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_ENABLED; + import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import java.util.List; @@ -38,8 +40,8 @@ public enum Key { METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"), SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"), CLUSTER_NAME("cluster.name"), + SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"), SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit"), - SPARK_EXECUTION_REFRESH_JOB_LIMIT("plugins.query.executionengine.spark.refresh_job.limit"), SESSION_INDEX_TTL("plugins.query.executionengine.spark.session.index.ttl"), RESULT_INDEX_TTL("plugins.query.executionengine.spark.result.index.ttl"), AUTO_INDEX_MANAGEMENT_ENABLED( @@ -67,4 +69,9 @@ public static Optional of(String keyValue) { public abstract T getSettingValue(Key key); public abstract List getSettings(); + + /** Helper class */ + public static boolean isSparkExecutionSessionEnabled(Settings settings) { + return settings.getSettingValue(SPARK_EXECUTION_SESSION_ENABLED); + } } diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index f3e8070a23..3acb005c12 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -311,16 +311,15 @@ SQL query:: "status": 400 } - -plugins.query.executionengine.spark.session.limit -================================================== +plugins.query.executionengine.spark.session.enabled +=================================================== Description ----------- -Each cluster can have maximum 100 sessions running in parallel by default. You can increase limit by this setting. +By default, execution engine is executed in session mode. You can disable session mode by this setting. -1. The default value is 100. +1. The default value is true. 2. This setting is node scope. 3. This setting can be updated dynamically. @@ -329,7 +328,7 @@ You can update the setting with a new value like this. SQL query:: sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \ - ... -d '{"transient":{"plugins.query.executionengine.spark.session.limit":200}}' + ... -d '{"transient":{"plugins.query.executionengine.spark.session.enabled":"false"}}' { "acknowledged": true, "persistent": {}, @@ -339,7 +338,7 @@ SQL query:: "executionengine": { "spark": { "session": { - "limit": "200" + "enabled": "false" } } } @@ -348,16 +347,15 @@ SQL query:: } } - -plugins.query.executionengine.spark.refresh_job.limit -===================================================== +plugins.query.executionengine.spark.session.limit +================================================== Description ----------- -Each cluster can have maximum 20 datasources. You can increase limit by this setting. +Each cluster can have maximum 100 sessions running in parallel by default. You can increase limit by this setting. -1. The default value is 20. +1. The default value is 100. 2. This setting is node scope. 3. This setting can be updated dynamically. @@ -366,7 +364,7 @@ You can update the setting with a new value like this. SQL query:: sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \ - ... -d '{"transient":{"plugins.query.executionengine.spark.refresh_job.limit":200}}' + ... -d '{"transient":{"plugins.query.executionengine.spark.session.limit":200}}' { "acknowledged": true, "persistent": {}, @@ -375,7 +373,7 @@ SQL query:: "query": { "executionengine": { "spark": { - "refresh_job": { + "session": { "limit": "200" } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 02b28d58ce..6b5f3cf0f1 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -137,17 +137,17 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); - public static final Setting SPARK_EXECUTION_SESSION_LIMIT_SETTING = - Setting.intSetting( - Key.SPARK_EXECUTION_SESSION_LIMIT.getKeyValue(), - 100, + public static final Setting SPARK_EXECUTION_SESSION_ENABLED_SETTING = + Setting.boolSetting( + Key.SPARK_EXECUTION_SESSION_ENABLED.getKeyValue(), + true, Setting.Property.NodeScope, Setting.Property.Dynamic); - public static final Setting SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING = + public static final Setting SPARK_EXECUTION_SESSION_LIMIT_SETTING = Setting.intSetting( - Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT.getKeyValue(), - 50, + Key.SPARK_EXECUTION_SESSION_LIMIT.getKeyValue(), + 100, Setting.Property.NodeScope, Setting.Property.Dynamic); @@ -252,15 +252,15 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { register( settingBuilder, clusterSettings, - Key.SPARK_EXECUTION_SESSION_LIMIT, - SPARK_EXECUTION_SESSION_LIMIT_SETTING, - new Updater(Key.SPARK_EXECUTION_SESSION_LIMIT)); + Key.SPARK_EXECUTION_SESSION_ENABLED, + SPARK_EXECUTION_SESSION_ENABLED_SETTING, + new Updater(Key.SPARK_EXECUTION_SESSION_ENABLED)); register( settingBuilder, clusterSettings, - Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT, - SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING, - new Updater(Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT)); + Key.SPARK_EXECUTION_SESSION_LIMIT, + SPARK_EXECUTION_SESSION_LIMIT_SETTING, + new Updater(Key.SPARK_EXECUTION_SESSION_LIMIT)); register( settingBuilder, clusterSettings, @@ -350,8 +350,8 @@ public static List> pluginSettings() { .add(METRICS_ROLLING_INTERVAL_SETTING) .add(DATASOURCE_URI_HOSTS_DENY_LIST) .add(SPARK_EXECUTION_ENGINE_CONFIG) + .add(SPARK_EXECUTION_SESSION_ENABLED_SETTING) .add(SPARK_EXECUTION_SESSION_LIMIT_SETTING) - .add(SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING) .add(SESSION_INDEX_TTL_SETTING) .add(RESULT_INDEX_TTL_SETTING) .add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 63c07de032..9d37fe28d0 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -335,8 +335,7 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService( new FlintIndexMetadataReaderImpl(client), client, new SessionManager(stateStore, emrServerlessClient, pluginSettings), - new DefaultLeaseManager(pluginSettings, stateStore), - stateStore); + new DefaultLeaseManager(pluginSettings, stateStore)); return new AsyncQueryExecutorServiceImpl( asyncQueryJobMetadataStorageService, sparkQueryDispatcher, diff --git a/spark/build.gradle b/spark/build.gradle index d703f6b24d..ed91b9820b 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -123,7 +123,6 @@ jacocoTestCoverageVerification { 'org.opensearch.sql.spark.execution.statestore.StateStore', 'org.opensearch.sql.spark.execution.session.SessionModel', 'org.opensearch.sql.spark.execution.statement.StatementModel', - 'org.opensearch.sql.spark.flint.FlintIndexStateModel', // TODO: add tests for purging flint indices 'org.opensearch.sql.spark.cluster.ClusterManagerEventListener*', 'org.opensearch.sql.spark.cluster.FlintIndexRetention', diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java index 1c0979dffb..18ae47c2b9 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java @@ -72,6 +72,7 @@ public CreateAsyncQueryResponse createAsyncQuery( dispatchQueryResponse.getQueryId(), sparkExecutionEngineConfig.getApplicationId(), dispatchQueryResponse.getJobId(), + dispatchQueryResponse.isDropIndexQuery(), dispatchQueryResponse.getResultIndex(), dispatchQueryResponse.getSessionId())); return new CreateAsyncQueryResponse( diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java index d1357f364d..3c59403661 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java @@ -29,6 +29,7 @@ public class AsyncQueryJobMetadata extends StateModel { private final AsyncQueryId queryId; private final String applicationId; private final String jobId; + private final boolean isDropIndexQuery; private final String resultIndex; // optional sessionId. private final String sessionId; @@ -42,6 +43,7 @@ public AsyncQueryJobMetadata( queryId, applicationId, jobId, + false, resultIndex, null, SequenceNumbers.UNASSIGNED_SEQ_NO, @@ -52,12 +54,14 @@ public AsyncQueryJobMetadata( AsyncQueryId queryId, String applicationId, String jobId, + boolean isDropIndexQuery, String resultIndex, String sessionId) { this( queryId, applicationId, jobId, + isDropIndexQuery, resultIndex, sessionId, SequenceNumbers.UNASSIGNED_SEQ_NO, @@ -68,6 +72,7 @@ public AsyncQueryJobMetadata( AsyncQueryId queryId, String applicationId, String jobId, + boolean isDropIndexQuery, String resultIndex, String sessionId, long seqNo, @@ -75,6 +80,7 @@ public AsyncQueryJobMetadata( this.queryId = queryId; this.applicationId = applicationId; this.jobId = jobId; + this.isDropIndexQuery = isDropIndexQuery; this.resultIndex = resultIndex; this.sessionId = sessionId; this.seqNo = seqNo; @@ -100,6 +106,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .field("type", TYPE_JOBMETA) .field("jobId", jobId) .field("applicationId", applicationId) + .field("isDropIndexQuery", isDropIndexQuery) .field("resultIndex", resultIndex) .field("sessionId", sessionId) .endObject(); @@ -113,6 +120,7 @@ public static AsyncQueryJobMetadata copy( copy.getQueryId(), copy.getApplicationId(), copy.getJobId(), + copy.isDropIndexQuery(), copy.getResultIndex(), copy.getSessionId(), seqNo, @@ -168,7 +176,14 @@ public static AsyncQueryJobMetadata fromXContent( throw new IllegalArgumentException("jobId and applicationId are required fields."); } return new AsyncQueryJobMetadata( - queryId, applicationId, jobId, resultIndex, sessionId, seqNo, primaryTerm); + queryId, + applicationId, + jobId, + isDropIndexQuery, + resultIndex, + sessionId, + seqNo, + primaryTerm); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java index b3d2cdd289..2823e64af7 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java @@ -20,6 +20,11 @@ public abstract class AsyncQueryHandler { public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) { + if (asyncQueryJobMetadata.isDropIndexQuery()) { + return SparkQueryDispatcher.DropIndexResult.fromJobId(asyncQueryJobMetadata.getJobId()) + .result(); + } + JSONObject result = getResponseFromResultIndex(asyncQueryJobMetadata); if (result.has(DATA_FIELD)) { JSONObject items = result.getJSONObject(DATA_FIELD); diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index 9e59fb707c..c6bac9b288 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -22,15 +22,12 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.JobType; -import org.opensearch.sql.spark.leasemanager.LeaseManager; -import org.opensearch.sql.spark.leasemanager.model.LeaseRequest; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @RequiredArgsConstructor public class BatchQueryHandler extends AsyncQueryHandler { private final EMRServerlessClient emrServerlessClient; private final JobExecutionResponseReader jobExecutionResponseReader; - protected final LeaseManager leaseManager; @Override protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) { @@ -63,8 +60,6 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { @Override public DispatchQueryResponse submit( DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { - leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource())); - String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query"; Map tags = context.getTags(); DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); @@ -86,6 +81,6 @@ public DispatchQueryResponse submit( dataSourceMetadata.getResultIndex()); String jobId = emrServerlessClient.startJobRun(startJobRequest); return new DispatchQueryResponse( - context.getQueryId(), jobId, dataSourceMetadata.getResultIndex(), null); + context.getQueryId(), jobId, false, dataSourceMetadata.getResultIndex(), null); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java deleted file mode 100644 index 3ab5439ad5..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.dispatcher; - -import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult; - -import com.amazonaws.services.emrserverless.model.JobRunState; -import lombok.RequiredArgsConstructor; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.json.JSONObject; -import org.opensearch.client.Client; -import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; -import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; -import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; -import org.opensearch.sql.spark.client.EMRServerlessClient; -import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; -import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; -import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; -import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; -import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; -import org.opensearch.sql.spark.execution.statestore.StateStore; -import org.opensearch.sql.spark.flint.FlintIndexMetadata; -import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; -import org.opensearch.sql.spark.flint.operation.FlintIndexOp; -import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel; -import org.opensearch.sql.spark.flint.operation.FlintIndexOpDelete; -import org.opensearch.sql.spark.response.JobExecutionResponseReader; - -/** Handle Index DML query. includes * DROP * ALT? */ -@RequiredArgsConstructor -public class IndexDMLHandler extends AsyncQueryHandler { - private static final Logger LOG = LogManager.getLogger(); - - public static final String DROP_INDEX_JOB_ID = "dropIndexJobId"; - - private final EMRServerlessClient emrServerlessClient; - - private final DataSourceService dataSourceService; - - private final DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper; - - private final JobExecutionResponseReader jobExecutionResponseReader; - - private final FlintIndexMetadataReader flintIndexMetadataReader; - - private final Client client; - - private final StateStore stateStore; - - public static boolean isIndexDMLQuery(String jobId) { - return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId); - } - - @Override - public DispatchQueryResponse submit( - DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { - DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); - IndexQueryDetails indexDetails = context.getIndexQueryDetails(); - FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails); - // if index is created without auto refresh. there is no job to cancel. - String status = JobRunState.FAILED.toString(); - String error = ""; - long startTime = 0L; - try { - FlintIndexOp jobCancelOp = - new FlintIndexOpCancel( - stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient); - jobCancelOp.apply(indexMetadata); - - FlintIndexOp indexDeleteOp = - new FlintIndexOpDelete(stateStore, dispatchQueryRequest.getDatasource()); - indexDeleteOp.apply(indexMetadata); - status = JobRunState.SUCCESS.toString(); - } catch (Exception e) { - error = e.getMessage(); - LOG.error(e); - } - - AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()); - IndexDMLResult indexDMLResult = - new IndexDMLResult( - asyncQueryId.getId(), - status, - error, - dispatchQueryRequest.getDatasource(), - System.currentTimeMillis() - startTime, - System.currentTimeMillis()); - String resultIndex = dataSourceMetadata.getResultIndex(); - createIndexDMLResult(stateStore, resultIndex).apply(indexDMLResult); - - return new DispatchQueryResponse(asyncQueryId, DROP_INDEX_JOB_ID, resultIndex, null); - } - - @Override - protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) { - String queryId = asyncQueryJobMetadata.getQueryId().getId(); - return jobExecutionResponseReader.getResultWithQueryId( - queryId, asyncQueryJobMetadata.getResultIndex()); - } - - @Override - protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) { - throw new IllegalStateException("[BUG] can't fetch result of index DML query form server"); - } - - @Override - public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { - throw new IllegalArgumentException("can't cancel index DML query"); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index d6ca83e52a..d75f568275 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -109,6 +109,7 @@ public DispatchQueryResponse submit( return new DispatchQueryResponse( context.getQueryId(), session.getSessionModel().getJobId(), + false, dataSourceMetadata.getResultIndex(), session.getSessionId().getSessionId()); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 0aa183335e..a800e45dd6 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -5,12 +5,26 @@ package org.opensearch.sql.spark.dispatcher; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; + +import com.amazonaws.services.emrserverless.model.JobRunState; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.json.JSONArray; import org.json.JSONObject; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -24,7 +38,7 @@ import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; import org.opensearch.sql.spark.execution.session.SessionManager; -import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @@ -57,8 +71,6 @@ public class SparkQueryDispatcher { private LeaseManager leaseManager; - private StateStore stateStore; - public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) { DataSourceMetadata dataSourceMetadata = this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()); @@ -67,7 +79,7 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) AsyncQueryHandler asyncQueryHandler = sessionManager.isEnabled() ? new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager) - : new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager); + : new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader); DispatchQueryContext.DispatchQueryContextBuilder contextBuilder = DispatchQueryContext.builder() .dataSourceMetadata(dataSourceMetadata) @@ -83,16 +95,15 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) contextBuilder.indexQueryDetails(indexQueryDetails); if (IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())) { - asyncQueryHandler = createIndexDMLHandler(); + // todo, fix in DROP INDEX PR. + return handleDropIndexQuery(dispatchQueryRequest, indexQueryDetails); } else if (IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType()) && indexQueryDetails.isAutoRefresh()) { asyncQueryHandler = - new StreamingQueryHandler( - emrServerlessClient, jobExecutionResponseReader, leaseManager); + new StreamingQueryHandler(emrServerlessClient, jobExecutionResponseReader); } else if (IndexQueryActionType.REFRESH.equals(indexQueryDetails.getIndexQueryActionType())) { // manual refresh should be handled by batch handler - asyncQueryHandler = - new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager); + asyncQueryHandler = new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader); } } return asyncQueryHandler.submit(dispatchQueryRequest, contextBuilder.build()); @@ -102,37 +113,20 @@ public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) if (asyncQueryJobMetadata.getSessionId() != null) { return new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager) .getQueryResponse(asyncQueryJobMetadata); - } else if (IndexDMLHandler.isIndexDMLQuery(asyncQueryJobMetadata.getJobId())) { - return createIndexDMLHandler().getQueryResponse(asyncQueryJobMetadata); } else { - return new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager) + return new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader) .getQueryResponse(asyncQueryJobMetadata); } } public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { - AsyncQueryHandler queryHandler; if (asyncQueryJobMetadata.getSessionId() != null) { - queryHandler = - new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager); - } else if (IndexDMLHandler.isIndexDMLQuery(asyncQueryJobMetadata.getJobId())) { - queryHandler = createIndexDMLHandler(); + return new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager) + .cancelJob(asyncQueryJobMetadata); } else { - queryHandler = - new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager); + return new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader) + .cancelJob(asyncQueryJobMetadata); } - return queryHandler.cancelJob(asyncQueryJobMetadata); - } - - private IndexDMLHandler createIndexDMLHandler() { - return new IndexDMLHandler( - emrServerlessClient, - dataSourceService, - dataSourceUserAuthorizationHelper, - jobExecutionResponseReader, - flintIndexMetadataReader, - client, - stateStore); } // TODO: Revisit this logic. @@ -149,6 +143,40 @@ private static void fillMissingDetails( } } + private DispatchQueryResponse handleDropIndexQuery( + DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) { + DataSourceMetadata dataSourceMetadata = + this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()); + FlintIndexMetadata indexMetadata = + flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails); + // if index is created without auto refresh. there is no job to cancel. + String status = JobRunState.FAILED.toString(); + try { + if (indexMetadata.isAutoRefresh()) { + emrServerlessClient.cancelJobRun( + dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId()); + } + } finally { + String indexName = indexQueryDetails.openSearchIndexName(); + try { + AcknowledgedResponse response = + client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get(); + if (!response.isAcknowledged()) { + LOG.error("failed to delete index"); + } + status = JobRunState.SUCCESS.toString(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("failed to delete index"); + } + } + return new DispatchQueryResponse( + AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()), + new DropIndexResult(status).toJobId(), + true, + dataSourceMetadata.getResultIndex(), + null); + } + private static Map getDefaultTagsForJobSubmission( DispatchQueryRequest dispatchQueryRequest) { Map tags = new HashMap<>(); @@ -156,4 +184,39 @@ private static Map getDefaultTagsForJobSubmission( tags.put(DATASOURCE_TAG_KEY, dispatchQueryRequest.getDatasource()); return tags; } + + @Getter + @RequiredArgsConstructor + public static class DropIndexResult { + private static final int PREFIX_LEN = 10; + + private final String status; + + public static DropIndexResult fromJobId(String jobId) { + String status = new String(Base64.getDecoder().decode(jobId)).substring(PREFIX_LEN); + return new DropIndexResult(status); + } + + public String toJobId() { + String queryId = RandomStringUtils.randomAlphanumeric(PREFIX_LEN) + status; + return Base64.getEncoder().encodeToString(queryId.getBytes(StandardCharsets.UTF_8)); + } + + public JSONObject result() { + JSONObject result = new JSONObject(); + if (JobRunState.SUCCESS.toString().equalsIgnoreCase(status)) { + result.put(STATUS_FIELD, status); + // todo. refactor response handling. + JSONObject dummyData = new JSONObject(); + dummyData.put("result", new JSONArray()); + dummyData.put("schema", new JSONArray()); + dummyData.put("applicationId", "fakeDropIndexApplicationId"); + result.put(DATA_FIELD, dummyData); + } else { + result.put(STATUS_FIELD, status); + result.put(ERROR_FIELD, "failed to drop index"); + } + return result; + } + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index ac5c878c28..81c3438532 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -19,8 +19,6 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; import org.opensearch.sql.spark.dispatcher.model.JobType; -import org.opensearch.sql.spark.leasemanager.LeaseManager; -import org.opensearch.sql.spark.leasemanager.model.LeaseRequest; import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** Handle Streaming Query. */ @@ -29,18 +27,14 @@ public class StreamingQueryHandler extends BatchQueryHandler { public StreamingQueryHandler( EMRServerlessClient emrServerlessClient, - JobExecutionResponseReader jobExecutionResponseReader, - LeaseManager leaseManager) { - super(emrServerlessClient, jobExecutionResponseReader, leaseManager); + JobExecutionResponseReader jobExecutionResponseReader) { + super(emrServerlessClient, jobExecutionResponseReader); this.emrServerlessClient = emrServerlessClient; } @Override public DispatchQueryResponse submit( DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { - - leaseManager.borrow(new LeaseRequest(JobType.STREAMING, dispatchQueryRequest.getDatasource())); - String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query"; IndexQueryDetails indexQueryDetails = context.getIndexQueryDetails(); Map tags = context.getTags(); @@ -66,6 +60,7 @@ public DispatchQueryResponse submit( return new DispatchQueryResponse( AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()), jobId, + false, dataSourceMetadata.getResultIndex(), null); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java index b20648cdfd..e44379daff 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java @@ -9,6 +9,7 @@ public class DispatchQueryResponse { private AsyncQueryId queryId; private String jobId; + private boolean isDropIndexQuery; private String resultIndex; private String sessionId; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java deleted file mode 100644 index b01ecf55ba..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.dispatcher.model; - -import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; - -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import lombok.Data; -import lombok.EqualsAndHashCode; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.sql.spark.execution.statestore.StateModel; - -/** Plugin create Index DML result. */ -@Data -@EqualsAndHashCode(callSuper = false) -public class IndexDMLResult extends StateModel { - private static final String QUERY_ID = "queryId"; - private static final String QUERY_RUNTIME = "queryRunTime"; - private static final String UPDATE_TIME = "updateTime"; - private static final String DOC_ID_PREFIX = "index"; - - private final String queryId; - private final String status; - private final String error; - private final String datasourceName; - private final Long queryRunTime; - private final Long updateTime; - - public static IndexDMLResult copy(IndexDMLResult copy, long seqNo, long primaryTerm) { - return new IndexDMLResult( - copy.queryId, - copy.status, - copy.error, - copy.datasourceName, - copy.queryRunTime, - copy.updateTime); - } - - @Override - public String getId() { - return DOC_ID_PREFIX + queryId; - } - - @Override - public long getSeqNo() { - return SequenceNumbers.UNASSIGNED_SEQ_NO; - } - - @Override - public long getPrimaryTerm() { - return SequenceNumbers.UNASSIGNED_PRIMARY_TERM; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(QUERY_ID, queryId) - .field("status", status) - .field("error", error) - .field(DATASOURCE_NAME, datasourceName) - .field(QUERY_RUNTIME, queryRunTime) - .field(UPDATE_TIME, updateTime) - .field("result", ImmutableList.of()) - .field("schema", ImmutableList.of()) - .endObject(); - return builder; - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java index 0f0a4ce373..c0f7bbcde8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java @@ -5,6 +5,7 @@ package org.opensearch.sql.spark.execution.session; +import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_ENABLED; import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId; import java.util.Optional; @@ -51,8 +52,7 @@ public Optional getSession(SessionId sid) { return Optional.empty(); } - // todo, keep it only for testing, will remove it later. public boolean isEnabled() { - return true; + return settings.getSettingValue(SPARK_EXECUTION_SESSION_ENABLED); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java index fe105cc8e4..b5bf31a6ba 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java @@ -9,10 +9,6 @@ import org.opensearch.core.xcontent.XContentParser; public abstract class StateModel implements ToXContentObject { - public static final String VERSION_1_0 = "1.0"; - public static final String TYPE = "type"; - public static final String STATE = "state"; - public static final String LAST_UPDATE_TIME = "lastUpdateTime"; public abstract String getId(); diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java index f36cbba32c..86d15a7036 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java @@ -6,9 +6,7 @@ package org.opensearch.sql.spark.execution.statestore; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME; -import static org.opensearch.sql.spark.execution.statestore.StateModel.STATE; -import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; @@ -47,14 +45,11 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; -import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; import org.opensearch.sql.spark.execution.session.SessionModel; import org.opensearch.sql.spark.execution.session.SessionState; import org.opensearch.sql.spark.execution.session.SessionType; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; -import org.opensearch.sql.spark.flint.FlintIndexState; -import org.opensearch.sql.spark.flint.FlintIndexStateModel; /** * State Store maintain the state of Session and Statement. State State create/update/get doc on @@ -75,8 +70,7 @@ public class StateStore { private final Client client; private final ClusterService clusterService; - @VisibleForTesting - public T create( + protected T create( T st, StateModel.CopyBuilder builder, String indexName) { try { if (!this.clusterService.state().routingTable().hasIndex(indexName)) { @@ -110,8 +104,7 @@ public T create( } } - @VisibleForTesting - public Optional get( + protected Optional get( String sid, StateModel.FromXContent builder, String indexName) { try { if (!this.clusterService.state().routingTable().hasIndex(indexName)) { @@ -142,8 +135,7 @@ public Optional get( } } - @VisibleForTesting - public T updateState( + protected T updateState( T st, S state, StateModel.StateCopyBuilder builder, String indexName) { try { T model = builder.of(st, state, st.getSeqNo(), st.getPrimaryTerm()); @@ -159,8 +151,18 @@ public T updateState( try (ThreadContext.StoredContext ignored = client.threadPool().getThreadContext().stashContext()) { UpdateResponse updateResponse = client.update(updateRequest).actionGet(); - LOG.debug("Successfully update doc. id: {}", st.getId()); - return builder.of(model, state, updateResponse.getSeqNo(), updateResponse.getPrimaryTerm()); + if (updateResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) { + LOG.debug("Successfully update doc. id: {}", st.getId()); + return builder.of( + model, state, updateResponse.getSeqNo(), updateResponse.getPrimaryTerm()); + } else { + throw new RuntimeException( + String.format( + Locale.ROOT, + "Failed update doc. id: %s, error: %s", + st.getId(), + updateResponse.getResult().getLowercase())); + } } } catch (IOException e) { throw new RuntimeException(e); @@ -301,46 +303,4 @@ public static Supplier activeSessionsCount(StateStore stateStore, String d QueryBuilders.termQuery( SessionModel.SESSION_STATE, SessionState.RUNNING.getSessionState()))); } - - public static BiFunction - updateFlintIndexState(StateStore stateStore, String datasourceName) { - return (old, state) -> - stateStore.updateState( - old, - state, - FlintIndexStateModel::copyWithState, - DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); - } - - public static Function> getFlintIndexState( - StateStore stateStore, String datasourceName) { - return (docId) -> - stateStore.get( - docId, - FlintIndexStateModel::fromXContent, - DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); - } - - public static Function createFlintIndexState( - StateStore stateStore, String datasourceName) { - return (st) -> - stateStore.create( - st, FlintIndexStateModel::copy, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); - } - - public static Function createIndexDMLResult( - StateStore stateStore, String indexName) { - return (result) -> stateStore.create(result, IndexDMLResult::copy, indexName); - } - - public static Supplier activeRefreshJobCount(StateStore stateStore, String datasourceName) { - return () -> - stateStore.count( - DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName), - QueryBuilders.boolQuery() - .must( - QueryBuilders.termQuery( - SessionModel.TYPE, FlintIndexStateModel.FLINT_INDEX_DOC_TYPE)) - .must(QueryBuilders.termQuery(STATE, FlintIndexState.REFRESHING.getState()))); - } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java index 1721263bf8..81b7fa1693 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java @@ -7,7 +7,6 @@ import java.util.Locale; import java.util.Map; -import java.util.Optional; import lombok.Data; @Data @@ -20,13 +19,8 @@ public class FlintIndexMetadata { public static final String AUTO_REFRESH = "auto_refresh"; public static final String AUTO_REFRESH_DEFAULT = "false"; - public static final String APP_ID = "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID"; - public static final String FLINT_INDEX_STATE_DOC_ID = "latestId"; - private final String jobId; private final boolean autoRefresh; - private final String appId; - private final String latestId; public static FlintIndexMetadata fromMetatdata(Map metaMap) { Map propertiesMap = (Map) metaMap.get(PROPERTIES_KEY); @@ -38,12 +32,6 @@ public static FlintIndexMetadata fromMetatdata(Map metaMap) { !((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT)) .toLowerCase(Locale.ROOT) .equalsIgnoreCase(AUTO_REFRESH_DEFAULT); - String appId = (String) envMap.getOrDefault(APP_ID, null); - String latestId = (String) metaMap.getOrDefault(FLINT_INDEX_STATE_DOC_ID, null); - return new FlintIndexMetadata(jobId, autoRefresh, appId, latestId); - } - - public Optional getLatestId() { - return Optional.ofNullable(latestId); + return new FlintIndexMetadata(jobId, autoRefresh); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java deleted file mode 100644 index 0ab4d92c17..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.flint; - -import java.util.Arrays; -import java.util.Locale; -import java.util.Map; -import java.util.stream.Collectors; -import lombok.Getter; - -/** Flint index state. */ -@Getter -public enum FlintIndexState { - // stable state - EMPTY("empty"), - // transitioning state - CREATING("creating"), - // transitioning state - REFRESHING("refreshing"), - // transitioning state - CANCELLING("cancelling"), - // stable state - ACTIVE("active"), - // transitioning state - DELETING("deleting"), - // stable state - DELETED("deleted"), - // stable state - FAILED("failed"), - // unknown state, if some state update in Spark side, not reflect in here. - UNKNOWN("unknown"); - - private final String state; - - FlintIndexState(String state) { - this.state = state; - } - - private static Map STATES = - Arrays.stream(FlintIndexState.values()) - .collect(Collectors.toMap(t -> t.name().toLowerCase(), t -> t)); - - public static FlintIndexState fromString(String key) { - for (FlintIndexState ss : FlintIndexState.values()) { - if (ss.getState().toLowerCase(Locale.ROOT).equals(key)) { - return ss; - } - } - return UNKNOWN; - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java deleted file mode 100644 index bb73f439a2..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.flint; - -import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID; -import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; -import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; -import static org.opensearch.sql.spark.execution.statement.StatementModel.ERROR; -import static org.opensearch.sql.spark.execution.statement.StatementModel.VERSION; - -import java.io.IOException; -import lombok.Builder; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.SneakyThrows; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.core.xcontent.XContentParserUtils; -import org.opensearch.sql.spark.execution.statestore.StateModel; - -/** Flint Index Model maintain the index state. */ -@Getter -@Builder -@EqualsAndHashCode(callSuper = false) -public class FlintIndexStateModel extends StateModel { - public static final String FLINT_INDEX_DOC_TYPE = "flintindexstate"; - public static final String LATEST_ID = "latestId"; - public static final String DOC_ID_PREFIX = "flint"; - - private final FlintIndexState indexState; - private final String applicationId; - private final String jobId; - private final String latestId; - private final String datasourceName; - private final long lastUpdateTime; - private final String error; - - @EqualsAndHashCode.Exclude private final long seqNo; - @EqualsAndHashCode.Exclude private final long primaryTerm; - - public FlintIndexStateModel( - FlintIndexState indexState, - String applicationId, - String jobId, - String latestId, - String datasourceName, - long lastUpdateTime, - String error, - long seqNo, - long primaryTerm) { - this.indexState = indexState; - this.applicationId = applicationId; - this.jobId = jobId; - this.latestId = latestId; - this.datasourceName = datasourceName; - this.lastUpdateTime = lastUpdateTime; - this.error = error; - this.seqNo = seqNo; - this.primaryTerm = primaryTerm; - } - - public static FlintIndexStateModel copy(FlintIndexStateModel copy, long seqNo, long primaryTerm) { - return new FlintIndexStateModel( - copy.indexState, - copy.applicationId, - copy.jobId, - copy.latestId, - copy.datasourceName, - copy.lastUpdateTime, - copy.error, - seqNo, - primaryTerm); - } - - public static FlintIndexStateModel copyWithState( - FlintIndexStateModel copy, FlintIndexState state, long seqNo, long primaryTerm) { - return new FlintIndexStateModel( - state, - copy.applicationId, - copy.jobId, - copy.latestId, - copy.datasourceName, - copy.lastUpdateTime, - copy.error, - seqNo, - primaryTerm); - } - - @SneakyThrows - public static FlintIndexStateModel fromXContent( - XContentParser parser, long seqNo, long primaryTerm) { - FlintIndexStateModelBuilder builder = FlintIndexStateModel.builder(); - XContentParserUtils.ensureExpectedToken( - XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case STATE: - builder.indexState(FlintIndexState.fromString(parser.text())); - case APPLICATION_ID: - builder.applicationId(parser.text()); - break; - case JOB_ID: - builder.jobId(parser.text()); - break; - case LATEST_ID: - builder.latestId(parser.text()); - break; - case DATASOURCE_NAME: - builder.datasourceName(parser.text()); - break; - case LAST_UPDATE_TIME: - builder.lastUpdateTime(parser.longValue()); - break; - case ERROR: - builder.error(parser.text()); - break; - } - } - builder.seqNo(seqNo); - builder.primaryTerm(primaryTerm); - return builder.build(); - } - - @Override - public String getId() { - return latestId; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(VERSION, VERSION_1_0) - .field(TYPE, FLINT_INDEX_DOC_TYPE) - .field(STATE, indexState.getState()) - .field(APPLICATION_ID, applicationId) - .field(JOB_ID, jobId) - .field(LATEST_ID, latestId) - .field(DATASOURCE_NAME, datasourceName) - .field(LAST_UPDATE_TIME, lastUpdateTime) - .field(ERROR, error) - .endObject(); - return builder; - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java deleted file mode 100644 index fb44b27568..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.flint.operation; - -import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState; -import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState; - -import java.util.Locale; -import java.util.Optional; -import lombok.RequiredArgsConstructor; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.sql.spark.execution.statestore.StateStore; -import org.opensearch.sql.spark.flint.FlintIndexMetadata; -import org.opensearch.sql.spark.flint.FlintIndexState; -import org.opensearch.sql.spark.flint.FlintIndexStateModel; - -/** Flint Index Operation. */ -@RequiredArgsConstructor -public abstract class FlintIndexOp { - private static final Logger LOG = LogManager.getLogger(); - - private final StateStore stateStore; - private final String datasourceName; - - /** Apply operation on {@link FlintIndexMetadata} */ - public void apply(FlintIndexMetadata metadata) { - // todo, remove this logic after IndexState feature is enabled in Flint. - Optional latestId = metadata.getLatestId(); - if (latestId.isEmpty()) { - // take action without occ. - FlintIndexStateModel fakeModel = - new FlintIndexStateModel( - FlintIndexState.REFRESHING, - metadata.getAppId(), - metadata.getJobId(), - "", - datasourceName, - System.currentTimeMillis(), - "", - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM); - runOp(fakeModel); - } else { - Optional flintIndexOptional = - getFlintIndexState(stateStore, datasourceName).apply(latestId.get()); - if (flintIndexOptional.isEmpty()) { - String errorMsg = String.format(Locale.ROOT, "no state found. docId: %s", latestId.get()); - LOG.error(errorMsg); - throw new IllegalStateException(errorMsg); - } - FlintIndexStateModel flintIndex = flintIndexOptional.get(); - - // 1.validate state. - FlintIndexState currentState = flintIndex.getIndexState(); - if (!validate(currentState)) { - String errorMsg = - String.format(Locale.ROOT, "validate failed. unexpected state: [%s]", currentState); - LOG.debug(errorMsg); - return; - } - - // 2.begin, move to transitioning state - FlintIndexState transitioningState = transitioningState(); - try { - flintIndex = - updateFlintIndexState(stateStore, datasourceName) - .apply(flintIndex, transitioningState()); - } catch (Exception e) { - String errorMsg = - String.format( - Locale.ROOT, "begin failed. target transitioning state: [%s]", transitioningState); - LOG.error(errorMsg, e); - throw new IllegalStateException(errorMsg, e); - } - - // 3.runOp - runOp(flintIndex); - - // 4.commit, move to stable state - FlintIndexState stableState = stableState(); - try { - updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, stableState); - } catch (Exception e) { - String errorMsg = - String.format(Locale.ROOT, "commit failed. target stable state: [%s]", stableState); - LOG.error(errorMsg, e); - throw new IllegalStateException(errorMsg, e); - } - } - } - - /** - * Validate expected state. - * - *

return true if validate. - */ - abstract boolean validate(FlintIndexState state); - - /** get transitioningState */ - abstract FlintIndexState transitioningState(); - - abstract void runOp(FlintIndexStateModel flintIndex); - - /** get stableState */ - abstract FlintIndexState stableState(); -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java deleted file mode 100644 index ba067e5c03..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.flint.operation; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import lombok.SneakyThrows; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.sql.spark.client.EMRServerlessClient; -import org.opensearch.sql.spark.execution.statestore.StateStore; -import org.opensearch.sql.spark.flint.FlintIndexState; -import org.opensearch.sql.spark.flint.FlintIndexStateModel; - -/** Cancel refreshing job. */ -public class FlintIndexOpCancel extends FlintIndexOp { - private static final Logger LOG = LogManager.getLogger(); - - private final EMRServerlessClient emrServerlessClient; - - public FlintIndexOpCancel( - StateStore stateStore, String datasourceName, EMRServerlessClient emrServerlessClient) { - super(stateStore, datasourceName); - this.emrServerlessClient = emrServerlessClient; - } - - public boolean validate(FlintIndexState state) { - return state == FlintIndexState.REFRESHING || state == FlintIndexState.CANCELLING; - } - - @Override - FlintIndexState transitioningState() { - return FlintIndexState.CANCELLING; - } - - /** cancel EMR-S job, wait cancelled state upto 15s. */ - @SneakyThrows - @Override - void runOp(FlintIndexStateModel flintIndexStateModel) { - String applicationId = flintIndexStateModel.getApplicationId(); - String jobId = flintIndexStateModel.getJobId(); - try { - emrServerlessClient.cancelJobRun( - flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId()); - } catch (IllegalArgumentException e) { - // handle job does not exist case. - LOG.error(e); - return; - } - - // pull job state until timeout or cancelled. - String jobRunState = ""; - int count = 3; - while (count-- != 0) { - jobRunState = - emrServerlessClient.getJobRunResult(applicationId, jobId).getJobRun().getState(); - if (jobRunState.equalsIgnoreCase("Cancelled")) { - break; - } - TimeUnit.SECONDS.sleep(1); - } - if (!jobRunState.equalsIgnoreCase("Cancelled")) { - String errMsg = "cancel job timeout"; - LOG.error(errMsg); - throw new TimeoutException(errMsg); - } - } - - @Override - FlintIndexState stableState() { - return FlintIndexState.ACTIVE; - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDelete.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDelete.java deleted file mode 100644 index d8b275c621..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDelete.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.flint.operation; - -import org.opensearch.sql.spark.execution.statestore.StateStore; -import org.opensearch.sql.spark.flint.FlintIndexState; -import org.opensearch.sql.spark.flint.FlintIndexStateModel; - -/** Flint Index Logical delete operation. Change state to DELETED. */ -public class FlintIndexOpDelete extends FlintIndexOp { - - public FlintIndexOpDelete(StateStore stateStore, String datasourceName) { - super(stateStore, datasourceName); - } - - public boolean validate(FlintIndexState state) { - return state == FlintIndexState.ACTIVE - || state == FlintIndexState.EMPTY - || state == FlintIndexState.DELETING; - } - - @Override - FlintIndexState transitioningState() { - return FlintIndexState.DELETING; - } - - @Override - void runOp(FlintIndexStateModel flintIndex) { - // logically delete, do nothing. - } - - @Override - FlintIndexState stableState() { - return FlintIndexState.DELETED; - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandle.java b/spark/src/main/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandle.java index 422d1caaf1..8fc417cd80 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandle.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandle.java @@ -48,6 +48,7 @@ private void constructIteratorAndSchema(JSONObject responseObject) { List result = new ArrayList<>(); List columnList; JSONObject items = responseObject.getJSONObject("data"); + logger.info("Spark Application ID: " + items.getString("applicationId")); columnList = getColumnList(items.getJSONArray("schema")); for (int i = 0; i < items.getJSONArray("result").length(); i++) { JSONObject row = diff --git a/spark/src/main/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManager.java b/spark/src/main/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManager.java index 375fa7b11e..1635a1801b 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManager.java +++ b/spark/src/main/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManager.java @@ -5,17 +5,13 @@ package org.opensearch.sql.spark.leasemanager; -import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT; import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_LIMIT; import static org.opensearch.sql.spark.execution.statestore.StateStore.ALL_DATASOURCE; -import static org.opensearch.sql.spark.execution.statestore.StateStore.activeRefreshJobCount; import static org.opensearch.sql.spark.execution.statestore.StateStore.activeSessionsCount; import java.util.Arrays; import java.util.List; -import java.util.Locale; import java.util.function.Predicate; -import lombok.RequiredArgsConstructor; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.statestore.StateStore; @@ -36,10 +32,7 @@ public class DefaultLeaseManager implements LeaseManager { public DefaultLeaseManager(Settings settings, StateStore stateStore) { this.settings = settings; this.stateStore = stateStore; - this.concurrentLimitRules = - Arrays.asList( - new ConcurrentSessionRule(settings, stateStore), - new ConcurrentRefreshJobRule(settings, stateStore)); + this.concurrentLimitRules = Arrays.asList(new ConcurrentSessionRule()); } @Override @@ -55,15 +48,10 @@ interface Rule extends Predicate { String description(); } - @RequiredArgsConstructor - public static class ConcurrentSessionRule implements Rule { - private final Settings settings; - private final StateStore stateStore; - + public class ConcurrentSessionRule implements Rule { @Override public String description() { - return String.format( - Locale.ROOT, "domain concurrent active session can not exceed %d", sessionMaxLimit()); + return String.format("domain concurrent active session can not exceed %d", sessionMaxLimit()); } @Override @@ -78,28 +66,4 @@ public int sessionMaxLimit() { return settings.getSettingValue(SPARK_EXECUTION_SESSION_LIMIT); } } - - @RequiredArgsConstructor - public static class ConcurrentRefreshJobRule implements Rule { - private final Settings settings; - private final StateStore stateStore; - - @Override - public String description() { - return String.format( - Locale.ROOT, "domain concurrent refresh job can not exceed %d", refreshJobLimit()); - } - - @Override - public boolean test(LeaseRequest leaseRequest) { - if (leaseRequest.getJobType() == JobType.INTERACTIVE) { - return true; - } - return activeRefreshJobCount(stateStore, ALL_DATASOURCE).get() < refreshJobLimit(); - } - - public int refreshJobLimit() { - return settings.getSettingValue(SPARK_EXECUTION_REFRESH_JOB_LIMIT); - } - } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 56ee56ea5e..862da697d1 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -5,6 +5,7 @@ package org.opensearch.sql.spark.asyncquery; +import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.*; import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_REQUEST_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_SESSION_ID; @@ -13,35 +14,172 @@ import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_DOC_TYPE; import static org.opensearch.sql.spark.execution.statement.StatementModel.SESSION_ID; import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_DOC_TYPE; +import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; +import static org.opensearch.sql.spark.execution.statestore.StateStore.getSession; import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; +import static org.opensearch.sql.spark.execution.statestore.StateStore.updateSessionState; import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState; +import com.amazonaws.services.emrserverless.model.CancelJobRunResult; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobRun; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; +import com.google.common.collect.ImmutableSet; +import java.util.*; +import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import org.junit.jupiter.api.Disabled; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.core.common.Strings; +import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.plugins.Plugin; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; +import org.opensearch.sql.datasources.encryptor.EncryptorImpl; +import org.opensearch.sql.datasources.glue.GlueDataSourceFactory; +import org.opensearch.sql.datasources.service.DataSourceMetadataStorage; +import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.datasources.storage.OpenSearchDataSourceMetadataStorage; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; +import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.client.StartJobRequest; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; +import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; import org.opensearch.sql.spark.execution.session.SessionId; +import org.opensearch.sql.spark.execution.session.SessionManager; +import org.opensearch.sql.spark.execution.session.SessionModel; import org.opensearch.sql.spark.execution.session.SessionState; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl; import org.opensearch.sql.spark.leasemanager.ConcurrencyLimitExceededException; +import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; +import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; import org.opensearch.sql.spark.rest.model.LangType; +import org.opensearch.sql.storage.DataSourceFactory; +import org.opensearch.test.OpenSearchIntegTestCase; + +public class AsyncQueryExecutorServiceImplSpecTest extends OpenSearchIntegTestCase { + public static final String DATASOURCE = "mys3"; + public static final String DSOTHER = "mytest"; + + private ClusterService clusterService; + private org.opensearch.sql.common.setting.Settings pluginSettings; + private NodeClient client; + private DataSourceServiceImpl dataSourceService; + private StateStore stateStore; + private ClusterSettings clusterSettings; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(TestSettingPlugin.class); + } -public class AsyncQueryExecutorServiceImplSpecTest extends AsyncQueryExecutorServiceSpec { + public static class TestSettingPlugin extends Plugin { + @Override + public List> getSettings() { + return OpenSearchSettings.pluginSettings(); + } + } - @Disabled("batch query is unsupported") + @Before + public void setup() { + clusterService = clusterService(); + client = (NodeClient) cluster().client(); + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .putList(DATASOURCE_URI_HOSTS_DENY_LIST.getKey(), Collections.emptyList()) + .build()) + .get(); + clusterSettings = clusterService.getClusterSettings(); + pluginSettings = new OpenSearchSettings(clusterSettings); + dataSourceService = createDataSourceService(); + DataSourceMetadata dm = + new DataSourceMetadata( + DATASOURCE, + StringUtils.EMPTY, + DataSourceType.S3GLUE, + ImmutableList.of(), + ImmutableMap.of( + "glue.auth.type", + "iam_role", + "glue.auth.role_arn", + "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", + "glue.indexstore.opensearch.uri", + "http://localhost:9200", + "glue.indexstore.opensearch.auth", + "noauth"), + null); + dataSourceService.createDataSource(dm); + DataSourceMetadata otherDm = + new DataSourceMetadata( + DSOTHER, + StringUtils.EMPTY, + DataSourceType.S3GLUE, + ImmutableList.of(), + ImmutableMap.of( + "glue.auth.type", + "iam_role", + "glue.auth.role_arn", + "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", + "glue.indexstore.opensearch.uri", + "http://localhost:9200", + "glue.indexstore.opensearch.auth", + "noauth"), + null); + dataSourceService.createDataSource(otherDm); + stateStore = new StateStore(client, clusterService); + createIndex(dm.fromNameToCustomResultIndex()); + createIndex(otherDm.fromNameToCustomResultIndex()); + } + + @After + public void clean() { + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().putNull(SPARK_EXECUTION_SESSION_ENABLED_SETTING.getKey()).build()) + .get(); + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().putNull(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey()).build()) + .get(); + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().putNull(DATASOURCE_URI_HOSTS_DENY_LIST.getKey()).build()) + .get(); + } + + @Test public void withoutSessionCreateAsyncQueryThenGetResultThenCancel() { LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = @@ -69,7 +207,7 @@ public void withoutSessionCreateAsyncQueryThenGetResultThenCancel() { emrsClient.cancelJobRunCalled(1); } - @Disabled("batch query is unsupported") + @Test public void sessionLimitNotImpactBatchQuery() { LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = @@ -91,7 +229,7 @@ public void sessionLimitNotImpactBatchQuery() { emrsClient.startJobRunCalled(2); } - @Disabled("batch query is unsupported") + @Test public void createAsyncQueryCreateJobWithCorrectParameters() { LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = @@ -202,7 +340,7 @@ public void reuseSessionWhenCreateAsyncQuery() { assertEquals(second.getQueryId(), secondModel.get().getQueryId()); } - @Disabled("batch query is unsupported") + @Test public void batchQueryHasTimeout() { LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = @@ -464,4 +602,125 @@ public void concurrentSessionLimitIsDomainLevel() { new CreateAsyncQueryRequest("select 1", DSOTHER, LangType.SQL, null))); assertEquals("domain concurrent active session can not exceed 1", exception.getMessage()); } + + private DataSourceServiceImpl createDataSourceService() { + String masterKey = "a57d991d9b573f75b9bba1df"; + DataSourceMetadataStorage dataSourceMetadataStorage = + new OpenSearchDataSourceMetadataStorage( + client, clusterService, new EncryptorImpl(masterKey)); + return new DataSourceServiceImpl( + new ImmutableSet.Builder() + .add(new GlueDataSourceFactory(pluginSettings)) + .build(), + dataSourceMetadataStorage, + meta -> {}); + } + + private AsyncQueryExecutorService createAsyncQueryExecutorService( + EMRServerlessClient emrServerlessClient) { + StateStore stateStore = new StateStore(client, clusterService); + AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService = + new OpensearchAsyncQueryJobMetadataStorageService(stateStore); + JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); + SparkQueryDispatcher sparkQueryDispatcher = + new SparkQueryDispatcher( + emrServerlessClient, + this.dataSourceService, + new DataSourceUserAuthorizationHelperImpl(client), + jobExecutionResponseReader, + new FlintIndexMetadataReaderImpl(client), + client, + new SessionManager(stateStore, emrServerlessClient, pluginSettings), + new DefaultLeaseManager(pluginSettings, stateStore)); + return new AsyncQueryExecutorServiceImpl( + asyncQueryJobMetadataStorageService, + sparkQueryDispatcher, + this::sparkExecutionEngineConfig); + } + + public static class LocalEMRSClient implements EMRServerlessClient { + + private int startJobRunCalled = 0; + private int cancelJobRunCalled = 0; + private int getJobResult = 0; + + @Getter private StartJobRequest jobRequest; + + @Override + public String startJobRun(StartJobRequest startJobRequest) { + jobRequest = startJobRequest; + startJobRunCalled++; + return "jobId"; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + getJobResult++; + JobRun jobRun = new JobRun(); + jobRun.setState("RUNNING"); + return new GetJobRunResult().withJobRun(jobRun); + } + + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + cancelJobRunCalled++; + return new CancelJobRunResult().withJobRunId(jobId); + } + + public void startJobRunCalled(int expectedTimes) { + assertEquals(expectedTimes, startJobRunCalled); + } + + public void cancelJobRunCalled(int expectedTimes) { + assertEquals(expectedTimes, cancelJobRunCalled); + } + + public void getJobRunResultCalled(int expectedTimes) { + assertEquals(expectedTimes, getJobResult); + } + } + + public SparkExecutionEngineConfig sparkExecutionEngineConfig() { + return new SparkExecutionEngineConfig("appId", "us-west-2", "roleArn", "", "myCluster"); + } + + public void enableSession(boolean enabled) { + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put(SPARK_EXECUTION_SESSION_ENABLED_SETTING.getKey(), enabled) + .build()) + .get(); + } + + public void setSessionLimit(long limit) { + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey(), limit).build()) + .get(); + } + + int search(QueryBuilder query) { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(DATASOURCE_TO_REQUEST_INDEX.apply(DATASOURCE)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(query); + searchRequest.source(searchSourceBuilder); + SearchResponse searchResponse = client.search(searchRequest).actionGet(); + + return searchResponse.getHits().getHits().length; + } + + void setSessionState(String sessionId, SessionState sessionState) { + Optional model = getSession(stateStore, DATASOURCE).apply(sessionId); + SessionModel updated = + updateSessionState(stateStore, DATASOURCE).apply(model.get(), sessionState); + assertEquals(sessionState, updated.getSessionState()); + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java index efb965e9f3..2ed316795f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java @@ -81,7 +81,7 @@ void testCreateAsyncQuery() { LangType.SQL, "arn:aws:iam::270824043731:role/emr-job-execution-role", TEST_CLUSTER_NAME))) - .thenReturn(new DispatchQueryResponse(QUERY_ID, EMR_JOB_ID, null, null)); + .thenReturn(new DispatchQueryResponse(QUERY_ID, EMR_JOB_ID, false, null, null)); CreateAsyncQueryResponse createAsyncQueryResponse = jobExecutorService.createAsyncQuery(createAsyncQueryRequest); verify(asyncQueryJobMetadataStorageService, times(1)) @@ -111,7 +111,7 @@ void testCreateAsyncQueryWithExtraSparkSubmitParameter() { "--conf spark.dynamicAllocation.enabled=false", TEST_CLUSTER_NAME)); when(sparkQueryDispatcher.dispatch(any())) - .thenReturn(new DispatchQueryResponse(QUERY_ID, EMR_JOB_ID, null, null)); + .thenReturn(new DispatchQueryResponse(QUERY_ID, EMR_JOB_ID, false, null, null)); jobExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java deleted file mode 100644 index 35ec778c8e..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.asyncquery; - -import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING; -import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_LIMIT_SETTING; -import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; -import static org.opensearch.sql.spark.execution.statestore.StateStore.getSession; -import static org.opensearch.sql.spark.execution.statestore.StateStore.updateSessionState; - -import com.amazonaws.services.emrserverless.model.CancelJobRunResult; -import com.amazonaws.services.emrserverless.model.GetJobRunResult; -import com.amazonaws.services.emrserverless.model.JobRun; -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.io.Resources; -import java.net.URL; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import lombok.Getter; -import lombok.SneakyThrows; -import org.apache.commons.lang3.StringUtils; -import org.junit.After; -import org.junit.Before; -import org.opensearch.action.admin.indices.create.CreateIndexRequest; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.xcontent.XContentType; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.plugins.Plugin; -import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; -import org.opensearch.sql.datasources.encryptor.EncryptorImpl; -import org.opensearch.sql.datasources.glue.GlueDataSourceFactory; -import org.opensearch.sql.datasources.service.DataSourceMetadataStorage; -import org.opensearch.sql.datasources.service.DataSourceServiceImpl; -import org.opensearch.sql.datasources.storage.OpenSearchDataSourceMetadataStorage; -import org.opensearch.sql.opensearch.setting.OpenSearchSettings; -import org.opensearch.sql.spark.client.EMRServerlessClient; -import org.opensearch.sql.spark.client.StartJobRequest; -import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; -import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; -import org.opensearch.sql.spark.execution.session.SessionManager; -import org.opensearch.sql.spark.execution.session.SessionModel; -import org.opensearch.sql.spark.execution.session.SessionState; -import org.opensearch.sql.spark.execution.statestore.StateStore; -import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl; -import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; -import org.opensearch.sql.spark.response.JobExecutionResponseReader; -import org.opensearch.sql.storage.DataSourceFactory; -import org.opensearch.test.OpenSearchIntegTestCase; - -public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { - public static final String DATASOURCE = "mys3"; - public static final String DSOTHER = "mytest"; - - protected ClusterService clusterService; - protected org.opensearch.sql.common.setting.Settings pluginSettings; - protected NodeClient client; - protected DataSourceServiceImpl dataSourceService; - protected StateStore stateStore; - protected ClusterSettings clusterSettings; - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(TestSettingPlugin.class); - } - - public static class TestSettingPlugin extends Plugin { - @Override - public List> getSettings() { - return OpenSearchSettings.pluginSettings(); - } - } - - @Before - public void setup() { - clusterService = clusterService(); - clusterSettings = clusterService.getClusterSettings(); - pluginSettings = new OpenSearchSettings(clusterSettings); - client = (NodeClient) cluster().client(); - dataSourceService = createDataSourceService(); - DataSourceMetadata dm = - new DataSourceMetadata( - DATASOURCE, - StringUtils.EMPTY, - DataSourceType.S3GLUE, - ImmutableList.of(), - ImmutableMap.of( - "glue.auth.type", - "iam_role", - "glue.auth.role_arn", - "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", - "glue.indexstore.opensearch.uri", - "http://localhost:9200", - "glue.indexstore.opensearch.auth", - "noauth"), - null); - dataSourceService.createDataSource(dm); - DataSourceMetadata otherDm = - new DataSourceMetadata( - DSOTHER, - StringUtils.EMPTY, - DataSourceType.S3GLUE, - ImmutableList.of(), - ImmutableMap.of( - "glue.auth.type", - "iam_role", - "glue.auth.role_arn", - "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", - "glue.indexstore.opensearch.uri", - "http://localhost:9200", - "glue.indexstore.opensearch.auth", - "noauth"), - null); - dataSourceService.createDataSource(otherDm); - stateStore = new StateStore(client, clusterService); - createIndexWithMappings(dm.getResultIndex(), loadResultIndexMappings()); - createIndexWithMappings(otherDm.getResultIndex(), loadResultIndexMappings()); - } - - @After - public void clean() { - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder().putNull(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey()).build()) - .get(); - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder().putNull(SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING.getKey()).build()) - .get(); - } - - private DataSourceServiceImpl createDataSourceService() { - String masterKey = "a57d991d9b573f75b9bba1df"; - DataSourceMetadataStorage dataSourceMetadataStorage = - new OpenSearchDataSourceMetadataStorage( - client, clusterService, new EncryptorImpl(masterKey)); - return new DataSourceServiceImpl( - new ImmutableSet.Builder() - .add(new GlueDataSourceFactory(pluginSettings)) - .build(), - dataSourceMetadataStorage, - meta -> {}); - } - - protected AsyncQueryExecutorService createAsyncQueryExecutorService( - EMRServerlessClient emrServerlessClient) { - StateStore stateStore = new StateStore(client, clusterService); - AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService = - new OpensearchAsyncQueryJobMetadataStorageService(stateStore); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); - SparkQueryDispatcher sparkQueryDispatcher = - new SparkQueryDispatcher( - emrServerlessClient, - this.dataSourceService, - new DataSourceUserAuthorizationHelperImpl(client), - jobExecutionResponseReader, - new FlintIndexMetadataReaderImpl(client), - client, - new SessionManager(stateStore, emrServerlessClient, pluginSettings), - new DefaultLeaseManager(pluginSettings, stateStore), - stateStore); - return new AsyncQueryExecutorServiceImpl( - asyncQueryJobMetadataStorageService, - sparkQueryDispatcher, - this::sparkExecutionEngineConfig); - } - - public static class LocalEMRSClient implements EMRServerlessClient { - - private int startJobRunCalled = 0; - private int cancelJobRunCalled = 0; - private int getJobResult = 0; - - @Getter private StartJobRequest jobRequest; - - @Override - public String startJobRun(StartJobRequest startJobRequest) { - jobRequest = startJobRequest; - startJobRunCalled++; - return "jobId"; - } - - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - getJobResult++; - JobRun jobRun = new JobRun(); - jobRun.setState("RUNNING"); - return new GetJobRunResult().withJobRun(jobRun); - } - - @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - cancelJobRunCalled++; - return new CancelJobRunResult().withJobRunId(jobId); - } - - public void startJobRunCalled(int expectedTimes) { - assertEquals(expectedTimes, startJobRunCalled); - } - - public void cancelJobRunCalled(int expectedTimes) { - assertEquals(expectedTimes, cancelJobRunCalled); - } - - public void getJobRunResultCalled(int expectedTimes) { - assertEquals(expectedTimes, getJobResult); - } - } - - public SparkExecutionEngineConfig sparkExecutionEngineConfig() { - return new SparkExecutionEngineConfig("appId", "us-west-2", "roleArn", "", "myCluster"); - } - - public void enableSession(boolean enabled) { - // doNothing - } - - public void setSessionLimit(long limit) { - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder().put(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey(), limit).build()) - .get(); - } - - public void setConcurrentRefreshJob(long limit) { - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder() - .put(SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING.getKey(), limit) - .build()) - .get(); - } - - int search(QueryBuilder query) { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(DATASOURCE_TO_REQUEST_INDEX.apply(DATASOURCE)); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(query); - searchRequest.source(searchSourceBuilder); - SearchResponse searchResponse = client.search(searchRequest).actionGet(); - - return searchResponse.getHits().getHits().length; - } - - void setSessionState(String sessionId, SessionState sessionState) { - Optional model = getSession(stateStore, DATASOURCE).apply(sessionId); - SessionModel updated = - updateSessionState(stateStore, DATASOURCE).apply(model.get(), sessionState); - assertEquals(sessionState, updated.getSessionState()); - } - - @SneakyThrows - public String loadResultIndexMappings() { - URL url = Resources.getResource("query_execution_result_mapping.json"); - return Resources.toString(url, Charsets.UTF_8); - } - - public void createIndexWithMappings(String indexName, String metadata) { - CreateIndexRequest request = new CreateIndexRequest(indexName); - request.mapping(metadata, XContentType.JSON); - client().admin().indices().create(request).actionGet(); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java deleted file mode 100644 index 45a83b3296..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ /dev/null @@ -1,793 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.asyncquery; - -import com.amazonaws.services.emrserverless.model.CancelJobRunResult; -import com.amazonaws.services.emrserverless.model.GetJobRunResult; -import com.amazonaws.services.emrserverless.model.JobRun; -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableList; -import com.google.common.io.Resources; -import java.net.URL; -import java.util.Optional; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; -import org.junit.Assert; -import org.junit.Test; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; -import org.opensearch.sql.spark.execution.statestore.StateStore; -import org.opensearch.sql.spark.flint.FlintIndexState; -import org.opensearch.sql.spark.flint.FlintIndexStateModel; -import org.opensearch.sql.spark.flint.FlintIndexType; -import org.opensearch.sql.spark.leasemanager.ConcurrencyLimitExceededException; -import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; -import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; -import org.opensearch.sql.spark.rest.model.LangType; - -public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { - - public final FlintDatasetMock LEGACY_SKIPPING = - new FlintDatasetMock( - "DROP SKIPPING INDEX ON mys3.default.http_logs", - FlintIndexType.SKIPPING, - "flint_mys3_default_http_logs_skipping_index") - .isLegacy(true); - public final FlintDatasetMock LEGACY_COVERING = - new FlintDatasetMock( - "DROP INDEX covering ON mys3.default.http_logs", - FlintIndexType.COVERING, - "flint_mys3_default_http_logs_covering_index") - .isLegacy(true); - public final FlintDatasetMock LEGACY_MV = - new FlintDatasetMock( - "DROP MATERIALIZED VIEW mv", FlintIndexType.MATERIALIZED_VIEW, "flint_mv") - .isLegacy(true); - - public final FlintDatasetMock SKIPPING = - new FlintDatasetMock( - "DROP SKIPPING INDEX ON mys3.default.http_logs", - FlintIndexType.SKIPPING, - "flint_mys3_default_http_logs_skipping_index") - .latestId("skippingindexid"); - public final FlintDatasetMock COVERING = - new FlintDatasetMock( - "DROP INDEX covering ON mys3.default.http_logs", - FlintIndexType.COVERING, - "flint_mys3_default_http_logs_covering_index") - .latestId("coveringid"); - public final FlintDatasetMock MV = - new FlintDatasetMock( - "DROP MATERIALIZED VIEW mv", FlintIndexType.MATERIALIZED_VIEW, "flint_mv") - .latestId("mvid"); - - /** - * Happy case. expectation is - * - *

(1) Drop Index response is SUCCESS - */ - @Test - public void legacyBasicDropAndFetchAndCancel() { - ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - return new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled")); - } - }; - - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrsClient); - - // Mock flint index - mockDS.createIndex(); - - // 1.drop index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); - - assertNotNull(response.getQueryId()); - assertTrue(clusterService.state().routingTable().hasIndex(mockDS.indexName)); - - // 2.fetch result - AsyncQueryExecutionResponse asyncQueryResults = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryResults.getStatus()); - assertNull(asyncQueryResults.getError()); - emrsClient.cancelJobRunCalled(1); - - // 3.cancel - IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> asyncQueryExecutorService.cancelQuery(response.getQueryId())); - assertEquals("can't cancel index DML query", exception.getMessage()); - }); - } - - /** - * Legacy Test, without state index support. Not EMR-S job running. expectation is - * - *

(1) Drop Index response is SUCCESS - */ - @Test - public void legacyDropIndexNoJobRunning() { - ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - throw new IllegalArgumentException("Job run is not in a cancellable state"); - } - }; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrsClient); - - // Mock flint index - mockDS.createIndex(); - - // 1.drop index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); - - // 2.fetch result. - AsyncQueryExecutionResponse asyncQueryResults = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryResults.getStatus()); - assertNull(asyncQueryResults.getError()); - }); - } - - /** - * Legacy Test, without state index support. Cancel EMR-S job call timeout. expectation is - * - *

(1) Drop Index response is FAILED - */ - @Test - public void legacyDropIndexCancelJobTimeout() { - ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV) - .forEach( - mockDS -> { - // Mock EMR-S always return running. - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - return new GetJobRunResult().withJobRun(new JobRun().withState("Running")); - } - }; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrsClient); - - // Mock flint index - mockDS.createIndex(); - - // 1. drop index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryResults = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryResults.getStatus()); - assertEquals("cancel job timeout", asyncQueryResults.getError()); - }); - } - - /** - * Happy case. expectation is - * - *

(1) Drop Index response is SUCCESS (2) change index state to: DELETED - */ - @Test - public void dropAndFetchAndCancel() { - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - return new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled")); - } - }; - - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrsClient); - - // Mock flint index - mockDS.createIndex(); - // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); - flintIndexJob.refreshing(); - - // 1.drop index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); - - assertNotNull(response.getQueryId()); - assertTrue(clusterService.state().routingTable().hasIndex(mockDS.indexName)); - - // assert state is DELETED - flintIndexJob.assertState(FlintIndexState.DELETED); - - // 2.fetch result - AsyncQueryExecutionResponse asyncQueryResults = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryResults.getStatus()); - assertNull(asyncQueryResults.getError()); - emrsClient.cancelJobRunCalled(1); - - // 3.cancel - IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> asyncQueryExecutorService.cancelQuery(response.getQueryId())); - assertEquals("can't cancel index DML query", exception.getMessage()); - }); - } - - /** - * Cancel EMR-S job, but not job running. expectation is - * - *

(1) Drop Index response is SUCCESS (2) change index state to: DELETED - */ - @Test - public void dropIndexNoJobRunning() { - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - mockDS -> { - // Mock EMR-S job is not running - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - throw new IllegalArgumentException("Job run is not in a cancellable state"); - } - }; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrsClient); - - // Mock flint index - mockDS.createIndex(); - // Mock index state in refresh state. - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); - flintIndexJob.refreshing(); - - // 1.drop index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); - - // 2.fetch result. - AsyncQueryExecutionResponse asyncQueryResults = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", asyncQueryResults.getStatus()); - assertNull(asyncQueryResults.getError()); - - flintIndexJob.assertState(FlintIndexState.DELETED); - }); - } - - /** - * Cancel EMR-S job call timeout, expectation is - * - *

(1) Drop Index response is failed, (2) change index state to: CANCELLING - */ - @Test - public void dropIndexCancelJobTimeout() { - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - mockDS -> { - // Mock EMR-S always return running. - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - return new GetJobRunResult().withJobRun(new JobRun().withState("Running")); - } - }; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrsClient); - - // Mock flint index - mockDS.createIndex(); - // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); - flintIndexJob.refreshing(); - - // 1. drop index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryResults = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryResults.getStatus()); - assertEquals("cancel job timeout", asyncQueryResults.getError()); - - flintIndexJob.assertState(FlintIndexState.CANCELLING); - }); - } - - /** - * Drop Index operation is retryable, expectation is - * - *

(1) call EMR-S (2) change index state to: DELETED - */ - @Test - public void dropIndexWithIndexInCancellingState() { - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - return new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled")); - } - }; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrsClient); - - // Mock flint index - mockDS.createIndex(); - // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); - flintIndexJob.cancelling(); - - // 1. drop index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - assertEquals( - "SUCCESS", - asyncQueryExecutorService - .getAsyncQueryResults(response.getQueryId()) - .getStatus()); - - flintIndexJob.assertState(FlintIndexState.DELETED); - }); - } - - /** - * No Job running, expectation is - * - *

(1) not call EMR-S (2) change index state to: DELETED - */ - @Test - public void dropIndexWithIndexInActiveState() { - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - Assert.fail("should not call cancelJobRun"); - return null; - } - - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - Assert.fail("should not call getJobRunResult"); - return null; - } - }; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrsClient); - - // Mock flint index - mockDS.createIndex(); - // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); - flintIndexJob.active(); - - // 1. drop index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - assertEquals( - "SUCCESS", - asyncQueryExecutorService - .getAsyncQueryResults(response.getQueryId()) - .getStatus()); - - flintIndexJob.assertState(FlintIndexState.DELETED); - }); - } - - @Test - public void dropIndexWithIndexInDeletingState() { - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - Assert.fail("should not call cancelJobRun"); - return null; - } - - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - Assert.fail("should not call getJobRunResult"); - return null; - } - }; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrsClient); - - // Mock flint index - mockDS.createIndex(); - // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); - flintIndexJob.deleted(); - - // 1. drop index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - assertEquals( - "SUCCESS", - asyncQueryExecutorService - .getAsyncQueryResults(response.getQueryId()) - .getStatus()); - - flintIndexJob.assertState(FlintIndexState.DELETED); - }); - } - - @Test - public void dropIndexWithIndexInDeletedState() { - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - Assert.fail("should not call cancelJobRun"); - return null; - } - - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - Assert.fail("should not call getJobRunResult"); - return null; - } - }; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrsClient); - - // Mock flint index - mockDS.createIndex(); - // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); - flintIndexJob.deleting(); - - // 1. drop index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - assertEquals( - "SUCCESS", - asyncQueryExecutorService - .getAsyncQueryResults(response.getQueryId()) - .getStatus()); - - flintIndexJob.assertState(FlintIndexState.DELETED); - }); - } - - /** - * No Job running, expectation is - * - *

(1) not call EMR-S (2) change index state to: DELETED - */ - @Test - public void dropIndexWithIndexInEmptyState() { - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - Assert.fail("should not call cancelJobRun"); - return null; - } - - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - Assert.fail("should not call getJobRunResult"); - return null; - } - }; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrsClient); - - // Mock flint index - mockDS.createIndex(); - // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); - - // 1. drop index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - assertEquals( - "SUCCESS", - asyncQueryExecutorService - .getAsyncQueryResults(response.getQueryId()) - .getStatus()); - - flintIndexJob.assertState(FlintIndexState.DELETED); - }); - } - - /** - * No Job running, expectation is - * - *

(1) not call EMR-S (2) change index state to: DELETED - */ - @Test - public void edgeCaseNoIndexStateDoc() { - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - mockDS -> { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - Assert.fail("should not call cancelJobRun"); - return null; - } - - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - Assert.fail("should not call getJobRunResult"); - return null; - } - }; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrsClient); - - // Mock flint index - mockDS.createIndex(); - - // 1. drop index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); - - // 2. fetch result - AsyncQueryExecutionResponse asyncQueryResults = - asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); - assertEquals("FAILED", asyncQueryResults.getStatus()); - assertTrue(asyncQueryResults.getError().contains("no state found")); - }); - } - - @Test - public void concurrentRefreshJobLimitNotApplied() { - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(new LocalEMRSClient()); - - // Mock flint index - COVERING.createIndex(); - // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); - flintIndexJob.refreshing(); - - // query with auto refresh - String query = - "CREATE INDEX covering ON mys3.default.http_logs(l_orderkey, " - + "l_quantity) WITH (auto_refresh = true)"; - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); - assertNull(response.getSessionId()); - } - - @Test - public void concurrentRefreshJobLimitAppliedToDDLWithAuthRefresh() { - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(new LocalEMRSClient()); - - setConcurrentRefreshJob(1); - - // Mock flint index - COVERING.createIndex(); - // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); - flintIndexJob.refreshing(); - - // query with auto_refresh = true. - String query = - "CREATE INDEX covering ON mys3.default.http_logs(l_orderkey, " - + "l_quantity) WITH (auto_refresh = true)"; - ConcurrencyLimitExceededException exception = - assertThrows( - ConcurrencyLimitExceededException.class, - () -> - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null))); - assertEquals("domain concurrent refresh job can not exceed 1", exception.getMessage()); - } - - @Test - public void concurrentRefreshJobLimitAppliedToRefresh() { - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(new LocalEMRSClient()); - - setConcurrentRefreshJob(1); - - // Mock flint index - COVERING.createIndex(); - // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); - flintIndexJob.refreshing(); - - // query with auto_refresh = true. - String query = "REFRESH INDEX covering ON mys3.default.http_logs"; - ConcurrencyLimitExceededException exception = - assertThrows( - ConcurrencyLimitExceededException.class, - () -> - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null))); - assertEquals("domain concurrent refresh job can not exceed 1", exception.getMessage()); - } - - @Test - public void concurrentRefreshJobLimitNotAppliedToDDL() { - String query = "CREATE INDEX covering ON mys3.default.http_logs(l_orderkey, l_quantity)"; - - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(new LocalEMRSClient()); - - setConcurrentRefreshJob(1); - - // Mock flint index - COVERING.createIndex(); - // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); - flintIndexJob.refreshing(); - - CreateAsyncQueryResponse asyncQueryResponse = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); - assertNotNull(asyncQueryResponse.getSessionId()); - } - - public class MockFlintSparkJob { - - private FlintIndexStateModel stateModel; - - public MockFlintSparkJob(String latestId) { - assertNotNull(latestId); - stateModel = - new FlintIndexStateModel( - FlintIndexState.EMPTY, - "mockAppId", - "mockJobId", - latestId, - DATASOURCE, - System.currentTimeMillis(), - "", - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM); - stateModel = StateStore.createFlintIndexState(stateStore, DATASOURCE).apply(stateModel); - } - - public void refreshing() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.REFRESHING); - } - - public void cancelling() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.CANCELLING); - } - - public void active() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.ACTIVE); - } - - public void deleting() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.DELETING); - } - - public void deleted() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.DELETED); - } - - void assertState(FlintIndexState expected) { - Optional stateModelOpt = - StateStore.getFlintIndexState(stateStore, DATASOURCE).apply(stateModel.getId()); - assertTrue((stateModelOpt.isPresent())); - assertEquals(expected, stateModelOpt.get().getIndexState()); - } - } - - @RequiredArgsConstructor - public class FlintDatasetMock { - private final String query; - private final FlintIndexType indexType; - private final String indexName; - private boolean isLegacy = false; - private String latestId; - - FlintDatasetMock isLegacy(boolean isLegacy) { - this.isLegacy = isLegacy; - return this; - } - - FlintDatasetMock latestId(String latestId) { - this.latestId = latestId; - return this; - } - - public void createIndex() { - String pathPrefix = isLegacy ? "flint-index-mappings" : "flint-index-mappings/0.1.1"; - switch (indexType) { - case SKIPPING: - createIndexWithMappings( - indexName, loadMappings(pathPrefix + "/" + "flint_skipping_index.json")); - break; - case COVERING: - createIndexWithMappings( - indexName, loadMappings(pathPrefix + "/" + "flint_covering_index.json")); - break; - case MATERIALIZED_VIEW: - createIndexWithMappings(indexName, loadMappings(pathPrefix + "/" + "flint_mv.json")); - break; - } - } - - @SneakyThrows - public void deleteIndex() { - client().admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get(); - } - } - - @SneakyThrows - public static String loadMappings(String path) { - URL url = Resources.getResource(path); - return Resources.toString(url, Charsets.UTF_8); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest.java index cf838db829..de0caf5589 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest.java @@ -46,7 +46,7 @@ public void testStoreJobMetadata() { assertTrue(actual.isPresent()); assertEquals(expected, actual.get()); - assertEquals(expected, actual.get()); + assertFalse(actual.get().isDropIndexQuery()); assertNull(actual.get().getSessionId()); } @@ -57,6 +57,7 @@ public void testStoreJobMetadataWithResultExtraData() { AsyncQueryId.newAsyncQueryId(DS_NAME), EMR_JOB_ID, EMRS_APPLICATION_ID, + true, MOCK_RESULT_INDEX, MOCK_SESSION_ID); @@ -66,6 +67,7 @@ public void testStoreJobMetadataWithResultExtraData() { assertTrue(actual.isPresent()); assertEquals(expected, actual.get()); + assertTrue(actual.get().isDropIndexQuery()); assertEquals("resultIndex", actual.get().getResultIndex()); assertEquals(MOCK_SESSION_ID, actual.get().getSessionId()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/DropIndexResultTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/DropIndexResultTest.java new file mode 100644 index 0000000000..d1c26f52e0 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/DropIndexResultTest.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.dispatcher; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; + +import com.amazonaws.services.emrserverless.model.JobRunState; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; + +public class DropIndexResultTest { + // todo, remove this UT after response refactor. + @Test + public void successRespEncodeDecode() { + // encode jobId + String jobId = + new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString()).toJobId(); + + // decode jobId + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(jobId); + + JSONObject result = dropIndexResult.result(); + assertEquals(JobRunState.SUCCESS.toString(), result.get(STATUS_FIELD)); + assertEquals( + "{\"result\":[],\"schema\":[],\"applicationId\":\"fakeDropIndexApplicationId\"}", + result.get(DATA_FIELD).toString()); + } + + // todo, remove this UT after response refactor. + @Test + public void failedRespEncodeDecode() { + // encode jobId + String jobId = + new SparkQueryDispatcher.DropIndexResult(JobRunState.FAILED.toString()).toJobId(); + + // decode jobId + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(jobId); + + JSONObject result = dropIndexResult.result(); + assertEquals(JobRunState.FAILED.toString(), result.get(STATUS_FIELD)); + assertEquals("failed to drop index", result.get(ERROR_FIELD)); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java deleted file mode 100644 index 8419d50ae1..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.dispatcher; - -import static org.junit.jupiter.api.Assertions.*; - -import org.junit.jupiter.api.Test; - -class IndexDMLHandlerTest { - @Test - public void getResponseFromExecutor() { - assertThrows( - IllegalStateException.class, - () -> - new IndexDMLHandler(null, null, null, null, null, null, null) - .getResponseFromExecutor(null)); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 2a76eabe6a..7663ece350 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -7,11 +7,13 @@ import static org.mockito.Answers.RETURNS_DEEP_STUBS; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -31,10 +33,7 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_USERNAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; -import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.CLUSTER_NAME_TAG_KEY; -import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.DATASOURCE_TAG_KEY; -import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.INDEX_TAG_KEY; -import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.JOB_TYPE_TAG_KEY; +import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.*; import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; @@ -45,6 +44,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; import org.json.JSONObject; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -54,6 +54,7 @@ import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -63,17 +64,16 @@ import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.StartJobRequest; -import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; -import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; -import org.opensearch.sql.spark.dispatcher.model.JobType; +import org.opensearch.sql.spark.dispatcher.model.*; import org.opensearch.sql.spark.execution.session.Session; import org.opensearch.sql.spark.execution.session.SessionId; import org.opensearch.sql.spark.execution.session.SessionManager; import org.opensearch.sql.spark.execution.statement.Statement; import org.opensearch.sql.spark.execution.statement.StatementId; import org.opensearch.sql.spark.execution.statement.StatementState; -import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; +import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.rest.model.LangType; @@ -90,6 +90,8 @@ public class SparkQueryDispatcherTest { @Mock(answer = RETURNS_DEEP_STUBS) private Client openSearchClient; + @Mock private FlintIndexMetadata flintIndexMetadata; + @Mock private SessionManager sessionManager; @Mock private LeaseManager leaseManager; @@ -100,8 +102,6 @@ public class SparkQueryDispatcherTest { @Mock(answer = RETURNS_DEEP_STUBS) private Statement statement; - @Mock private StateStore stateStore; - private SparkQueryDispatcher sparkQueryDispatcher; private final AsyncQueryId QUERY_ID = AsyncQueryId.newAsyncQueryId(DS_NAME); @@ -119,8 +119,7 @@ void setUp() { flintIndexMetadataReader, openSearchClient, sessionManager, - leaseManager, - stateStore); + leaseManager); } @Test @@ -174,6 +173,7 @@ void testDispatchSelectQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -229,6 +229,7 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -282,6 +283,7 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -401,6 +403,7 @@ void testDispatchIndexQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -455,6 +458,7 @@ void testDispatchWithPPLQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -509,6 +513,7 @@ void testDispatchQueryWithoutATableAndDataSourceName() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -567,6 +572,7 @@ void testDispatchIndexQueryWithoutADatasourceName() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -625,6 +631,7 @@ void testDispatchMaterializedViewQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -679,6 +686,7 @@ void testDispatchShowMVQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -733,6 +741,7 @@ void testRefreshIndexQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -787,6 +796,7 @@ void testDispatchDescribeIndexQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -998,6 +1008,245 @@ void testGetQueryResponseWithSuccess() { verifyNoInteractions(emrServerlessClient); } + // todo. refactor query process logic in plugin. + @Test + void testGetQueryResponseOfDropIndex() { + String jobId = + new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString()).toJobId(); + + JSONObject result = + sparkQueryDispatcher.getQueryResponse( + new AsyncQueryJobMetadata( + AsyncQueryId.newAsyncQueryId(DS_NAME), + EMRS_APPLICATION_ID, + jobId, + true, + null, + null)); + verify(jobExecutionResponseReader, times(0)) + .getResultFromOpensearchIndex(anyString(), anyString()); + Assertions.assertEquals("SUCCESS", result.get(STATUS_FIELD)); + } + + @Test + void testDropIndexQuery() throws ExecutionException, InterruptedException { + String query = "DROP INDEX size_year ON my_glue.default.http_logs"; + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .indexName("size_year") + .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) + .autoRefresh(false) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.COVERING) + .build(); + when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) + .thenReturn(flintIndexMetadata); + when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); + // auto_refresh == true + when(flintIndexMetadata.isAutoRefresh()).thenReturn(true); + + when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)) + .thenReturn( + new CancelJobRunResult() + .withJobRunId(EMR_JOB_ID) + .withApplicationId(EMRS_APPLICATION_ID)); + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + + AcknowledgedResponse acknowledgedResponse = mock(AcknowledgedResponse.class); + when(openSearchClient.admin().indices().delete(any()).get()).thenReturn(acknowledgedResponse); + when(acknowledgedResponse.isAcknowledged()).thenReturn(true); + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); + verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); + Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); + Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); + } + + @Test + void testDropSkippingIndexQuery() throws ExecutionException, InterruptedException { + String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) + .autoRefresh(false) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.SKIPPING) + .build(); + when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) + .thenReturn(flintIndexMetadata); + when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); + when(flintIndexMetadata.isAutoRefresh()).thenReturn(true); + + when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)) + .thenReturn( + new CancelJobRunResult() + .withJobRunId(EMR_JOB_ID) + .withApplicationId(EMRS_APPLICATION_ID)); + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + AcknowledgedResponse acknowledgedResponse = mock(AcknowledgedResponse.class); + when(openSearchClient.admin().indices().delete(any()).get()).thenReturn(acknowledgedResponse); + + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); + verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); + Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); + Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); + } + + @Test + void testDropSkippingIndexQueryAutoRefreshFalse() + throws ExecutionException, InterruptedException { + String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) + .autoRefresh(false) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.SKIPPING) + .build(); + when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) + .thenReturn(flintIndexMetadata); + when(flintIndexMetadata.isAutoRefresh()).thenReturn(false); + + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + AcknowledgedResponse acknowledgedResponse = mock(AcknowledgedResponse.class); + when(openSearchClient.admin().indices().delete(any()).get()).thenReturn(acknowledgedResponse); + + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); + verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); + Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); + Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); + } + + @Test + void testDropSkippingIndexQueryDeleteIndexException() + throws ExecutionException, InterruptedException { + String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) + .autoRefresh(false) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.SKIPPING) + .build(); + when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) + .thenReturn(flintIndexMetadata); + when(flintIndexMetadata.isAutoRefresh()).thenReturn(false); + + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + + when(openSearchClient.admin().indices().delete(any()).get()) + .thenThrow(ExecutionException.class); + + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); + verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); + Assertions.assertEquals(JobRunState.FAILED.toString(), dropIndexResult.getStatus()); + Assertions.assertEquals( + "{\"error\":\"failed to drop index\",\"status\":\"FAILED\"}", + dropIndexResult.result().toString()); + Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); + } + + @Test + void testDropMVQuery() throws ExecutionException, InterruptedException { + String query = "DROP MATERIALIZED VIEW mv_1"; + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .mvName("mv_1") + .indexQueryActionType(IndexQueryActionType.DROP) + .fullyQualifiedTableName(null) + .indexType(FlintIndexType.MATERIALIZED_VIEW) + .build(); + when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) + .thenReturn(flintIndexMetadata); + when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); + // auto_refresh == true + when(flintIndexMetadata.isAutoRefresh()).thenReturn(true); + + when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)) + .thenReturn( + new CancelJobRunResult() + .withJobRunId(EMR_JOB_ID) + .withApplicationId(EMRS_APPLICATION_ID)); + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + + AcknowledgedResponse acknowledgedResponse = mock(AcknowledgedResponse.class); + when(openSearchClient.admin().indices().delete(any()).get()).thenReturn(acknowledgedResponse); + when(acknowledgedResponse.isAcknowledged()).thenReturn(true); + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); + verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); + SparkQueryDispatcher.DropIndexResult dropIndexResult = + SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); + Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); + Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); + } + @Test void testDispatchQueryWithExtraSparkSubmitParameters() { DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); @@ -1185,6 +1434,6 @@ private AsyncQueryJobMetadata asyncQueryJobMetadata() { private AsyncQueryJobMetadata asyncQueryJobMetadataWithSessionId( String statementId, String sessionId) { return new AsyncQueryJobMetadata( - new AsyncQueryId(statementId), EMRS_APPLICATION_ID, EMR_JOB_ID, null, sessionId); + new AsyncQueryId(statementId), EMRS_APPLICATION_ID, EMR_JOB_ID, false, null, sessionId); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java index 5a0a53009f..3546a874d9 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java @@ -26,10 +26,13 @@ public class SessionManagerTest { public void sessionEnable() { Assertions.assertTrue( new SessionManager(stateStore, emrClient, sessionSetting(true)).isEnabled()); + Assertions.assertFalse( + new SessionManager(stateStore, emrClient, sessionSetting(false)).isEnabled()); } public static Settings sessionSetting(boolean enabled) { Map settings = new HashMap<>(); + settings.put(Settings.Key.SPARK_EXECUTION_SESSION_ENABLED, enabled); settings.put(Settings.Key.SPARK_EXECUTION_SESSION_LIMIT, 100); return settings(settings); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexStateTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexStateTest.java deleted file mode 100644 index acd76fa11a..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexStateTest.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.flint; - -import static org.junit.jupiter.api.Assertions.*; -import static org.opensearch.sql.spark.flint.FlintIndexState.UNKNOWN; - -import org.junit.jupiter.api.Test; - -class FlintIndexStateTest { - @Test - public void unknownState() { - assertEquals(UNKNOWN, FlintIndexState.fromString("noSupported")); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java deleted file mode 100644 index 5b3c1d74db..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.flint.operation; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; -import static org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceSpec.DATASOURCE; - -import java.util.Optional; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.spark.execution.statestore.StateStore; -import org.opensearch.sql.spark.flint.FlintIndexMetadata; -import org.opensearch.sql.spark.flint.FlintIndexState; -import org.opensearch.sql.spark.flint.FlintIndexStateModel; - -@ExtendWith(MockitoExtension.class) -class FlintIndexOpTest { - @Mock private StateStore stateStore; - - @Mock private FlintIndexMetadata flintIndexMetadata; - - @Mock private FlintIndexStateModel model; - - @Test - public void beginFailed() { - when(stateStore.updateState(any(), any(), any(), any())).thenThrow(RuntimeException.class); - when(stateStore.get(any(), any(), any())).thenReturn(Optional.of(model)); - when(model.getIndexState()).thenReturn(FlintIndexState.ACTIVE); - when(flintIndexMetadata.getLatestId()).thenReturn(Optional.of("latestId")); - - FlintIndexOpDelete indexOp = new FlintIndexOpDelete(stateStore, DATASOURCE); - IllegalStateException exception = - assertThrows(IllegalStateException.class, () -> indexOp.apply(flintIndexMetadata)); - Assertions.assertEquals( - "begin failed. target transitioning state: [DELETING]", exception.getMessage()); - } - - @Test - public void commitFailed() { - when(stateStore.updateState(any(), any(), any(), any())) - .thenReturn(model) - .thenThrow(RuntimeException.class); - when(stateStore.get(any(), any(), any())).thenReturn(Optional.of(model)); - when(model.getIndexState()).thenReturn(FlintIndexState.EMPTY); - when(flintIndexMetadata.getLatestId()).thenReturn(Optional.of("latestId")); - - FlintIndexOpDelete indexOp = new FlintIndexOpDelete(stateStore, DATASOURCE); - IllegalStateException exception = - assertThrows(IllegalStateException.class, () -> indexOp.apply(flintIndexMetadata)); - Assertions.assertEquals( - "commit failed. target stable state: [DELETED]", exception.getMessage()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManagerTest.java b/spark/src/test/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManagerTest.java index 558f7f7b3a..47111c3a38 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManagerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManagerTest.java @@ -5,8 +5,6 @@ package org.opensearch.sql.spark.leasemanager; -import static org.junit.jupiter.api.Assertions.assertTrue; - import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -24,18 +22,6 @@ class DefaultLeaseManagerTest { @Test public void concurrentSessionRuleOnlyApplyToInteractiveQuery() { - assertTrue( - new DefaultLeaseManager.ConcurrentSessionRule(settings, stateStore) - .test(new LeaseRequest(JobType.BATCH, "mys3"))); - assertTrue( - new DefaultLeaseManager.ConcurrentSessionRule(settings, stateStore) - .test(new LeaseRequest(JobType.STREAMING, "mys3"))); - } - - @Test - public void concurrentRefreshRuleOnlyNotAppliedToInteractiveQuery() { - assertTrue( - new DefaultLeaseManager.ConcurrentRefreshJobRule(settings, stateStore) - .test(new LeaseRequest(JobType.INTERACTIVE, "mys3"))); + new DefaultLeaseManager(settings, stateStore).borrow(new LeaseRequest(JobType.BATCH, "mys3")); } } diff --git a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_covering_index.json b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_covering_index.json deleted file mode 100644 index 54ed5e05e1..0000000000 --- a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_covering_index.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "_meta": { - "kind": "covering", - "indexedColumns": [ - { - "columnType": "timestamp", - "columnName": "time" - }, - { - "columnType": "string", - "columnName": "client_ip" - }, - { - "columnType": "int", - "columnName": "client_port" - }, - { - "columnType": "string", - "columnName": "request_url" - } - ], - "name": "test", - "options": { - "auto_refresh": "true", - "index_settings": "{\"number_of_shards\":1,\"number_of_replicas\":1}" - }, - "source": "mys3.default.http_logs", - "version": "0.1.0", - "properties": { - "env": { - "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", - "SERVERLESS_EMR_JOB_ID": "00fe3gu2tgad000q" - } - }, - "latestId": "coveringid" - } -} diff --git a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_mv.json b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_mv.json deleted file mode 100644 index 1a9c74806a..0000000000 --- a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_mv.json +++ /dev/null @@ -1,30 +0,0 @@ -{ - "_meta": { - "kind": "mv", - "indexedColumns": [ - { - "columnType": "timestamp", - "columnName": "start.time" - }, - { - "columnType": "long", - "columnName": "count" - } - ], - "name": "spark_catalog.default.http_logs_metrics_chen", - "options": { - "auto_refresh": "true", - "checkpoint_location": "s3://flint-data-dp-eu-west-1-beta/data/checkpoint/chen-job-1", - "watermark_delay": "30 Minutes" - }, - "source": "SELECT window.start AS `start.time`, COUNT(*) AS count FROM mys3.default.http_logs WHERE status != 200 GROUP BY TUMBLE(`@timestamp`, '6 Hours')", - "version": "0.1.0", - "properties": { - "env": { - "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", - "SERVERLESS_EMR_JOB_ID": "00fe86mkk5q3u00q" - } - }, - "latestId": "mvid" - } -} diff --git a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_skipping_index.json b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_skipping_index.json deleted file mode 100644 index 5e7c9175fd..0000000000 --- a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_skipping_index.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "_meta": { - "kind": "skipping", - "indexedColumns": [ - { - "columnType": "int", - "kind": "VALUE_SET", - "columnName": "status" - } - ], - "name": "flint_mys3_default_http_logs_skipping_index", - "options": {}, - "source": "mys3.default.http_logs", - "version": "0.1.0", - "properties": { - "env": { - "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", - "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" - } - }, - "latestId": "skippingindexid" - } -} diff --git a/spark/src/test/resources/flint-index-mappings/flint_covering_index.json b/spark/src/test/resources/flint-index-mappings/flint_covering_index.json deleted file mode 100644 index f68a1627ab..0000000000 --- a/spark/src/test/resources/flint-index-mappings/flint_covering_index.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "_meta": { - "kind": "covering", - "indexedColumns": [ - { - "columnType": "timestamp", - "columnName": "time" - }, - { - "columnType": "string", - "columnName": "client_ip" - }, - { - "columnType": "int", - "columnName": "client_port" - }, - { - "columnType": "string", - "columnName": "request_url" - } - ], - "name": "test", - "options": { - "auto_refresh": "true", - "index_settings": "{\"number_of_shards\":1,\"number_of_replicas\":1}" - }, - "source": "mys3.default.http_logs", - "version": "0.1.0", - "properties": { - "env": { - "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", - "SERVERLESS_EMR_JOB_ID": "00fe3gu2tgad000q" - } - } - } -} diff --git a/spark/src/test/resources/flint-index-mappings/flint_mv.json b/spark/src/test/resources/flint-index-mappings/flint_mv.json deleted file mode 100644 index 3d130832b8..0000000000 --- a/spark/src/test/resources/flint-index-mappings/flint_mv.json +++ /dev/null @@ -1,42 +0,0 @@ -{ - "_meta": { - "kind": "mv", - "indexedColumns": [ - { - "columnType": "timestamp", - "columnName": "start.time" - }, - { - "columnType": "long", - "columnName": "count" - } - ], - "name": "spark_catalog.default.http_logs_metrics_chen", - "options": { - "auto_refresh": "true", - "checkpoint_location": "s3://flint-data-dp-eu-west-1-beta/data/checkpoint/chen-job-1", - "watermark_delay": "30 Minutes" - }, - "source": "SELECT window.start AS `start.time`, COUNT(*) AS count FROM mys3.default.http_logs WHERE status != 200 GROUP BY TUMBLE(`@timestamp`, '6 Hours')", - "version": "0.1.0", - "properties": { - "env": { - "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", - "SERVERLESS_EMR_JOB_ID": "00fe86mkk5q3u00q" - } - } - }, - "properties": { - "count": { - "type": "long" - }, - "start": { - "properties": { - "time": { - "type": "date", - "format": "strict_date_optional_time_nanos" - } - } - } - } -} diff --git a/spark/src/test/resources/flint-index-mappings/flint_skipping_index.json b/spark/src/test/resources/flint-index-mappings/flint_skipping_index.json deleted file mode 100644 index e4bf849f20..0000000000 --- a/spark/src/test/resources/flint-index-mappings/flint_skipping_index.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "_meta": { - "kind": "skipping", - "indexedColumns": [ - { - "columnType": "int", - "kind": "VALUE_SET", - "columnName": "status" - } - ], - "name": "flint_mys3_default_http_logs_skipping_index", - "options": {}, - "source": "mys3.default.http_logs", - "version": "0.1.0", - "properties": { - "env": { - "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", - "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" - } - } - } -} diff --git a/spark/src/test/resources/query_execution_result_mapping.json b/spark/src/test/resources/query_execution_result_mapping.json deleted file mode 100644 index a76ef77383..0000000000 --- a/spark/src/test/resources/query_execution_result_mapping.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "dynamic": "false", - "properties": { - "applicationId": { - "type": "keyword" - }, - "dataSourceName": { - "type": "keyword" - }, - "error": { - "type": "text" - }, - "jobRunId": { - "type": "keyword" - }, - "queryId": { - "type": "keyword" - }, - "queryRunTime": { - "type": "long" - }, - "queryText": { - "type": "text" - }, - "result": { - "type": "object", - "enabled": false - }, - "schema": { - "type": "object", - "enabled": false - }, - "sessionId": { - "type": "keyword" - }, - "status": { - "type": "keyword" - }, - "updateTime": { - "type": "date", - "format": "strict_date_time||epoch_millis" - } - } -}