From adc81b699c5573f4d8dc5579bea167a92c1cd700 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 6 Dec 2023 22:01:58 +0000 Subject: [PATCH] Validate session with flint datasource passed in async job request (#2448) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Validate session with flint datasource passed in async job request Currently, if there's a session running with Datasource1 and the user makes a request to the async API with the same session but a different catalog Datasource2 then SQL plugin doesn't return a new session for Datasource2. This PR creates a new session when there’s a mismatch between datasource and session_id. Testing done: 1. manual testing 2. added IT. Signed-off-by: Kaituo Li * address comments Signed-off-by: Kaituo Li * add doc Signed-off-by: Kaituo Li --------- Signed-off-by: Kaituo Li (cherry picked from commit 01765258ea8df1e6dd96be45b20ac902f7cea000) Signed-off-by: github-actions[bot] --- .../sql/common/setting/Settings.java | 4 +- docs/user/admin/settings.rst | 145 ++++++++++++++++++ .../setting/OpenSearchSettings.java | 14 ++ .../src/main/antlr/FlintSparkSqlExtensions.g4 | 9 ++ spark/src/main/antlr/SparkSqlBase.g4 | 2 + spark/src/main/antlr/SqlBaseLexer.g4 | 1 + spark/src/main/antlr/SqlBaseParser.g4 | 16 +- .../dispatcher/InteractiveQueryHandler.java | 3 +- .../execution/session/InteractiveSession.java | 15 +- .../sql/spark/execution/session/Session.java | 2 +- .../execution/session/SessionManager.java | 54 ++++++- .../sql/spark/utils/RealTimeProvider.java | 13 ++ .../sql/spark/utils/TimeProvider.java | 10 ++ ...AsyncQueryExecutorServiceImplSpecTest.java | 94 ++++++++++++ .../dispatcher/SparkQueryDispatcherTest.java | 2 +- .../session/InteractiveSessionTest.java | 8 +- .../execution/session/SessionManagerTest.java | 9 +- .../execution/statement/StatementTest.java | 16 +- .../sql/spark/utils/MockTimeProvider.java | 19 +++ 19 files changed, 406 insertions(+), 30 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/utils/RealTimeProvider.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/utils/TimeProvider.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/utils/MockTimeProvider.java diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index c9e348dbd4..28c0bb7f4e 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -43,7 +43,9 @@ public enum Key { SESSION_INDEX_TTL("plugins.query.executionengine.spark.session.index.ttl"), RESULT_INDEX_TTL("plugins.query.executionengine.spark.result.index.ttl"), AUTO_INDEX_MANAGEMENT_ENABLED( - "plugins.query.executionengine.spark.auto_index_management.enabled"); + "plugins.query.executionengine.spark.auto_index_management.enabled"), + SESSION_INACTIVITY_TIMEOUT_MILLIS( + "plugins.query.executionengine.spark.session_inactivity_timeout_millis"); @Getter private final String keyValue; diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 7e175ae719..04b10935de 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -418,3 +418,148 @@ SQL query:: } } + +plugins.query.executionengine.spark.session_inactivity_timeout_millis +=============================== + +Description +----------- + +This setting determines the duration after which a session is considered stale if there has been no update. The default +timeout is 3 minutes (180,000 milliseconds). + +1. Default Value: 180000 (milliseconds) +2. Scope: Node-level +3. Dynamic Update: Yes, this setting can be updated dynamically. + +To change the session inactivity timeout to 10 minutes for example, use the following command: + +SQL query:: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \ + ... -d '{"transient":{"plugins.query.executionengine.spark.session_inactivity_timeout_millis":600000}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "query": { + "executionengine": { + "spark": { + "session_inactivity_timeout_millis": "600000" + } + } + } + } + } + } + + +plugins.query.executionengine.spark.auto_index_management.enabled +=============================== + +Description +----------- +This setting controls the automatic management of request and result indices for each data source. When enabled, it +deletes outdated index documents. + +* Default State: Enabled (true) +* Purpose: Manages auto index management for request and result indices. + +To disable auto index management, use the following command: + +SQL query:: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \ + ... -d '{"transient":{"plugins.query.executionengine.spark.auto_index_management.enabled":false}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "query": { + "executionengine": { + "spark": { + "auto_index_management": { + "enabled": "false" + } + } + } + } + } + } + } + + +plugins.query.executionengine.spark.session.index.ttl +=============================== + +Description +----------- +This setting defines the time-to-live (TTL) for request indices when plugins.query.executionengine.spark.auto_index_management.enabled +is true. By default, request indices older than 14 days are deleted. + +* Default Value: 14 days + +To change the TTL to 30 days for example, execute the following command: + +SQL query:: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \ + ... -d '{"transient":{"plugins.query.executionengine.spark.session.index.ttl":"30d"}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "query": { + "executionengine": { + "spark": { + "session": { + "index": { + "ttl": "30d" + } + } + } + } + } + } + } + } + + +plugins.query.executionengine.spark.result.index.ttl +=============================== + +Description +----------- +This setting specifies the TTL for result indices when plugins.query.executionengine.spark.auto_index_management.enabled +is set to true. The default setting is to delete result indices older than 60 days. + +* Default Value: 60 days + +To modify the TTL to 30 days for example, use this command: + +SQL query:: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \ + ... -d '{"transient":{"plugins.query.executionengine.spark.result.index.ttl":"30d"}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "query": { + "executionengine": { + "spark": { + "result": { + "index": { + "ttl": "30d" + } + } + } + } + } + } + } + } \ No newline at end of file diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 02b28d58ce..cbb0d232a7 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -179,6 +179,13 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING = + Setting.longSetting( + Key.SESSION_INACTIVITY_TIMEOUT_MILLIS.getKeyValue(), + 180000L, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + /** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */ @SuppressWarnings("unchecked") public OpenSearchSettings(ClusterSettings clusterSettings) { @@ -287,6 +294,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { new Updater(Key.DATASOURCES_LIMIT)); registerNonDynamicSettings( settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING); + register( + settingBuilder, + clusterSettings, + Key.SESSION_INACTIVITY_TIMEOUT_MILLIS, + SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING, + new Updater((Key.SESSION_INACTIVITY_TIMEOUT_MILLIS))); defaultSettings = settingBuilder.build(); } @@ -356,6 +369,7 @@ public static List> pluginSettings() { .add(RESULT_INDEX_TTL_SETTING) .add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING) .add(DATASOURCES_LIMIT_SETTING) + .add(SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING) .build(); } diff --git a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 index e44944fcff..cb2e14144f 100644 --- a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 +++ b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 @@ -18,6 +18,7 @@ statement : skippingIndexStatement | coveringIndexStatement | materializedViewStatement + | indexJobManagementStatement ; skippingIndexStatement @@ -109,6 +110,14 @@ dropMaterializedViewStatement : DROP MATERIALIZED VIEW mvName=multipartIdentifier ; +indexJobManagementStatement + : recoverIndexJobStatement + ; + +recoverIndexJobStatement + : RECOVER INDEX JOB identifier + ; + /* * Match all remaining tokens in non-greedy way * so WITH clause won't be captured by this rule. diff --git a/spark/src/main/antlr/SparkSqlBase.g4 b/spark/src/main/antlr/SparkSqlBase.g4 index 597a1e5856..fe6fd3c662 100644 --- a/spark/src/main/antlr/SparkSqlBase.g4 +++ b/spark/src/main/antlr/SparkSqlBase.g4 @@ -165,10 +165,12 @@ IF: 'IF'; IN: 'IN'; INDEX: 'INDEX'; INDEXES: 'INDEXES'; +JOB: 'JOB'; MATERIALIZED: 'MATERIALIZED'; NOT: 'NOT'; ON: 'ON'; PARTITION: 'PARTITION'; +RECOVER: 'RECOVER'; REFRESH: 'REFRESH'; SHOW: 'SHOW'; TRUE: 'TRUE'; diff --git a/spark/src/main/antlr/SqlBaseLexer.g4 b/spark/src/main/antlr/SqlBaseLexer.g4 index e8b5cb012f..9b3dcbc6d1 100644 --- a/spark/src/main/antlr/SqlBaseLexer.g4 +++ b/spark/src/main/antlr/SqlBaseLexer.g4 @@ -408,6 +408,7 @@ VALUES: 'VALUES'; VARCHAR: 'VARCHAR'; VAR: 'VAR'; VARIABLE: 'VARIABLE'; +VARIANT: 'VARIANT'; VERSION: 'VERSION'; VIEW: 'VIEW'; VIEWS: 'VIEWS'; diff --git a/spark/src/main/antlr/SqlBaseParser.g4 b/spark/src/main/antlr/SqlBaseParser.g4 index 84a31dafed..439a12c301 100644 --- a/spark/src/main/antlr/SqlBaseParser.g4 +++ b/spark/src/main/antlr/SqlBaseParser.g4 @@ -298,6 +298,10 @@ replaceTableHeader : (CREATE OR)? REPLACE TABLE identifierReference ; +clusterBySpec + : CLUSTER BY LEFT_PAREN multipartIdentifierList RIGHT_PAREN + ; + bucketSpec : CLUSTERED BY identifierList (SORTED BY orderedIdentifierList)? @@ -383,6 +387,7 @@ createTableClauses :((OPTIONS options=expressionPropertyList) | (PARTITIONED BY partitioning=partitionFieldList) | skewSpec | + clusterBySpec | bucketSpec | rowFormat | createFileFormat | @@ -582,6 +587,10 @@ notMatchedBySourceAction | UPDATE SET assignmentList ; +exceptClause + : EXCEPT LEFT_PAREN exceptCols=multipartIdentifierList RIGHT_PAREN + ; + assignmentList : assignment (COMMA assignment)* ; @@ -964,8 +973,8 @@ primaryExpression | LAST LEFT_PAREN expression (IGNORE NULLS)? RIGHT_PAREN #last | POSITION LEFT_PAREN substr=valueExpression IN str=valueExpression RIGHT_PAREN #position | constant #constantDefault - | ASTERISK #star - | qualifiedName DOT ASTERISK #star + | ASTERISK exceptClause? #star + | qualifiedName DOT ASTERISK exceptClause? #star | LEFT_PAREN namedExpression (COMMA namedExpression)+ RIGHT_PAREN #rowConstructor | LEFT_PAREN query RIGHT_PAREN #subqueryExpression | functionName LEFT_PAREN (setQuantifier? argument+=functionArgument @@ -1081,6 +1090,7 @@ type | DECIMAL | DEC | NUMERIC | VOID | INTERVAL + | VARIANT | ARRAY | STRUCT | MAP | unsupportedType=identifier ; @@ -1540,6 +1550,7 @@ ansiNonReserved | VARCHAR | VAR | VARIABLE + | VARIANT | VERSION | VIEW | VIEWS @@ -1888,6 +1899,7 @@ nonReserved | VARCHAR | VAR | VARIABLE + | VARIANT | VERSION | VIEW | VIEWS diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index 5aa82432bb..1da38f03a7 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -86,7 +86,8 @@ public DispatchQueryResponse submit( session = createdSession.get(); } } - if (session == null || !session.isReady()) { + if (session == null + || !session.isOperationalForDataSource(dispatchQueryRequest.getDatasource())) { // create session if not exist or session dead/fail tags.put(JOB_TYPE_TAG_KEY, JobType.INTERACTIVE.getText()); session = diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java index 3221b33b2c..dd413674a1 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java @@ -25,6 +25,7 @@ import org.opensearch.sql.spark.execution.statement.StatementId; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.rest.model.LangType; +import org.opensearch.sql.spark.utils.TimeProvider; /** * Interactive session. @@ -42,6 +43,9 @@ public class InteractiveSession implements Session { private final StateStore stateStore; private final EMRServerlessClient serverlessClient; private SessionModel sessionModel; + // the threshold of elapsed time in milliseconds before we say a session is stale + private long sessionInactivityTimeoutMilli; + private TimeProvider timeProvider; @Override public void open(CreateSessionRequest createSessionRequest) { @@ -134,7 +138,14 @@ public Optional get(StatementId stID) { } @Override - public boolean isReady() { - return sessionModel.getSessionState() != DEAD && sessionModel.getSessionState() != FAIL; + public boolean isOperationalForDataSource(String dataSourceName) { + boolean isSessionStateValid = + sessionModel.getSessionState() != DEAD && sessionModel.getSessionState() != FAIL; + boolean isDataSourceMatch = sessionId.getDataSourceName().equals(dataSourceName); + boolean isSessionUpdatedRecently = + timeProvider.currentEpochMillis() - sessionModel.getLastUpdateTime() + <= sessionInactivityTimeoutMilli; + + return isSessionStateValid && isDataSourceMatch && isSessionUpdatedRecently; } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java index d3d3411ded..e684d33989 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java @@ -39,5 +39,5 @@ public interface Session { SessionId getSessionId(); /** return true if session is ready to use. */ - boolean isReady(); + boolean isOperationalForDataSource(String dataSourceName); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java index 0f0a4ce373..c3d5807305 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java @@ -5,24 +5,31 @@ package org.opensearch.sql.spark.execution.session; +import static org.opensearch.sql.common.setting.Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS; import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId; import java.util.Optional; -import lombok.RequiredArgsConstructor; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.utils.RealTimeProvider; /** * Singleton Class * *

todo. add Session cache and Session sweeper. */ -@RequiredArgsConstructor public class SessionManager { private final StateStore stateStore; private final EMRServerlessClient emrServerlessClient; - private final Settings settings; + private Settings settings; + + public SessionManager( + StateStore stateStore, EMRServerlessClient emrServerlessClient, Settings settings) { + this.stateStore = stateStore; + this.emrServerlessClient = emrServerlessClient; + this.settings = settings; + } public Session createSession(CreateSessionRequest request) { InteractiveSession session = @@ -35,9 +42,27 @@ public Session createSession(CreateSessionRequest request) { return session; } - public Optional getSession(SessionId sid) { + /** + * Retrieves the session associated with the given session ID. + * + *

This method is particularly used in scenarios where the data source encoded in the session + * ID is deemed untrustworthy. It allows for the safe retrieval of session details based on a + * known and validated session ID, rather than relying on potentially outdated data source + * information. + * + *

For more context on the use case and implementation, refer to the documentation here: + * https://tinyurl.com/bdh6s834 + * + * @param sid The unique identifier of the session. It is used to fetch the corresponding session + * details. + * @param dataSourceName The name of the data source. This parameter is utilized in the session + * retrieval process. + * @return An Optional containing the session associated with the provided session ID. Returns an + * empty Optional if no matching session is found. + */ + public Optional getSession(SessionId sid, String dataSourceName) { Optional model = - StateStore.getSession(stateStore, sid.getDataSourceName()).apply(sid.getSessionId()); + StateStore.getSession(stateStore, dataSourceName).apply(sid.getSessionId()); if (model.isPresent()) { InteractiveSession session = InteractiveSession.builder() @@ -45,12 +70,31 @@ public Optional getSession(SessionId sid) { .stateStore(stateStore) .serverlessClient(emrServerlessClient) .sessionModel(model.get()) + .sessionInactivityTimeoutMilli( + settings.getSettingValue(SESSION_INACTIVITY_TIMEOUT_MILLIS)) + .timeProvider(new RealTimeProvider()) .build(); return Optional.ofNullable(session); } return Optional.empty(); } + /** + * Retrieves the session associated with the provided session ID. + * + *

This method is utilized specifically in scenarios where the data source information encoded + * in the session ID is considered trustworthy. It ensures the retrieval of session details based + * on the session ID, relying on the integrity of the data source information contained within it. + * + * @param sid The session ID used to identify and retrieve the corresponding session. It is + * expected to contain valid and trusted data source information. + * @return An Optional containing the session associated with the provided session ID. If no + * session is found that matches the session ID, an empty Optional is returned. + */ + public Optional getSession(SessionId sid) { + return getSession(sid, sid.getDataSourceName()); + } + // todo, keep it only for testing, will remove it later. public boolean isEnabled() { return true; diff --git a/spark/src/main/java/org/opensearch/sql/spark/utils/RealTimeProvider.java b/spark/src/main/java/org/opensearch/sql/spark/utils/RealTimeProvider.java new file mode 100644 index 0000000000..b42e30532b --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/utils/RealTimeProvider.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.utils; + +public class RealTimeProvider implements TimeProvider { + @Override + public long currentEpochMillis() { + return System.currentTimeMillis(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/utils/TimeProvider.java b/spark/src/main/java/org/opensearch/sql/spark/utils/TimeProvider.java new file mode 100644 index 0000000000..13628af579 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/utils/TimeProvider.java @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.utils; + +public interface TimeProvider { + long currentEpochMillis(); +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 56ee56ea5e..011d97dcdf 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -25,8 +25,10 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.jupiter.api.Disabled; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.core.common.Strings; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; @@ -383,6 +385,98 @@ public void recreateSessionIfNotReady() { assertNotEquals(second.getSessionId(), third.getSessionId()); } + @Test + public void submitQueryWithDifferentDataSourceSessionWillCreateNewSession() { + LocalEMRSClient emrsClient = new LocalEMRSClient(); + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // enable session + enableSession(true); + + // 1. create async query. + CreateAsyncQueryResponse first = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "SHOW SCHEMAS IN " + DATASOURCE, DATASOURCE, LangType.SQL, null)); + assertNotNull(first.getSessionId()); + + // set sessionState to RUNNING + setSessionState(first.getSessionId(), SessionState.RUNNING); + + // 2. reuse session id + CreateAsyncQueryResponse second = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "SHOW SCHEMAS IN " + DATASOURCE, DATASOURCE, LangType.SQL, first.getSessionId())); + + assertEquals(first.getSessionId(), second.getSessionId()); + + // set sessionState to RUNNING + setSessionState(second.getSessionId(), SessionState.RUNNING); + + // 3. given different source, create a new session id + CreateAsyncQueryResponse third = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "SHOW SCHEMAS IN " + DSOTHER, DSOTHER, LangType.SQL, second.getSessionId())); + assertNotEquals(second.getSessionId(), third.getSessionId()); + } + + @Test + public void recreateSessionIfStale() { + LocalEMRSClient emrsClient = new LocalEMRSClient(); + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // enable session + enableSession(true); + + // 1. create async query. + CreateAsyncQueryResponse first = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + assertNotNull(first.getSessionId()); + + // set sessionState to RUNNING + setSessionState(first.getSessionId(), SessionState.RUNNING); + + // 2. reuse session id + CreateAsyncQueryResponse second = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "select 1", DATASOURCE, LangType.SQL, first.getSessionId())); + + assertEquals(first.getSessionId(), second.getSessionId()); + + try { + // set timeout setting to 0 + ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); + org.opensearch.common.settings.Settings settings = + org.opensearch.common.settings.Settings.builder() + .put(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS.getKeyValue(), 0) + .build(); + request.transientSettings(settings); + client().admin().cluster().updateSettings(request).actionGet(60000); + + // 3. not reuse session id + CreateAsyncQueryResponse third = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "select 1", DATASOURCE, LangType.SQL, second.getSessionId())); + assertNotEquals(second.getSessionId(), third.getSessionId()); + } finally { + // set timeout setting to 0 + ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); + org.opensearch.common.settings.Settings settings = + org.opensearch.common.settings.Settings.builder() + .putNull(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS.getKeyValue()) + .build(); + request.transientSettings(settings); + client().admin().cluster().updateSettings(request).actionGet(60000); + } + } + @Test public void submitQueryInInvalidSessionWillCreateNewSession() { LocalEMRSClient emrsClient = new LocalEMRSClient(); diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 2a76eabe6a..dbc087cbae 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -318,7 +318,7 @@ void testDispatchSelectQueryReuseSession() { doReturn(new SessionId(MOCK_SESSION_ID)).when(session).getSessionId(); doReturn(new StatementId(MOCK_STATEMENT_ID)).when(session).submit(any()); when(session.getSessionModel().getJobId()).thenReturn(EMR_JOB_ID); - when(session.isReady()).thenReturn(true); + when(session.isOperationalForDataSource(any())).thenReturn(true); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java index 14ccaf7708..d670fc4ca8 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java @@ -118,7 +118,7 @@ public void closeNotExistSession() { @Test public void sessionManagerCreateSession() { Session session = - new SessionManager(stateStore, emrsClient, sessionSetting(false)) + new SessionManager(stateStore, emrsClient, sessionSetting()) .createSession(createSessionRequest()); TestSession testSession = testSession(session, stateStore); @@ -127,8 +127,7 @@ public void sessionManagerCreateSession() { @Test public void sessionManagerGetSession() { - SessionManager sessionManager = - new SessionManager(stateStore, emrsClient, sessionSetting(false)); + SessionManager sessionManager = new SessionManager(stateStore, emrsClient, sessionSetting()); Session session = sessionManager.createSession(createSessionRequest()); Optional managerSession = sessionManager.getSession(session.getSessionId()); @@ -138,8 +137,7 @@ public void sessionManagerGetSession() { @Test public void sessionManagerGetSessionNotExist() { - SessionManager sessionManager = - new SessionManager(stateStore, emrsClient, sessionSetting(false)); + SessionManager sessionManager = new SessionManager(stateStore, emrsClient, sessionSetting()); Optional managerSession = sessionManager.getSession(SessionId.newSessionId("no-exist")); diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java index 5a0a53009f..44dd5c3a57 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java @@ -24,13 +24,14 @@ public class SessionManagerTest { @Test public void sessionEnable() { - Assertions.assertTrue( - new SessionManager(stateStore, emrClient, sessionSetting(true)).isEnabled()); + Assertions.assertTrue(new SessionManager(stateStore, emrClient, sessionSetting()).isEnabled()); } - public static Settings sessionSetting(boolean enabled) { - Map settings = new HashMap<>(); + public static org.opensearch.sql.common.setting.Settings sessionSetting() { + Map settings = new HashMap<>(); settings.put(Settings.Key.SPARK_EXECUTION_SESSION_LIMIT, 100); + settings.put( + org.opensearch.sql.common.setting.Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS, 10000L); return settings(settings); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java index 29020f2496..97f38d37a7 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java @@ -259,7 +259,7 @@ public void cancelRunningStatementSuccess() { @Test public void submitStatementInRunningSession() { Session session = - new SessionManager(stateStore, emrsClient, sessionSetting(false)) + new SessionManager(stateStore, emrsClient, sessionSetting()) .createSession(createSessionRequest()); // App change state to running @@ -272,7 +272,7 @@ public void submitStatementInRunningSession() { @Test public void submitStatementInNotStartedState() { Session session = - new SessionManager(stateStore, emrsClient, sessionSetting(false)) + new SessionManager(stateStore, emrsClient, sessionSetting()) .createSession(createSessionRequest()); StatementId statementId = session.submit(queryRequest()); @@ -282,7 +282,7 @@ public void submitStatementInNotStartedState() { @Test public void failToSubmitStatementInDeadState() { Session session = - new SessionManager(stateStore, emrsClient, sessionSetting(false)) + new SessionManager(stateStore, emrsClient, sessionSetting()) .createSession(createSessionRequest()); updateSessionState(stateStore, DS_NAME).apply(session.getSessionModel(), SessionState.DEAD); @@ -298,7 +298,7 @@ public void failToSubmitStatementInDeadState() { @Test public void failToSubmitStatementInFailState() { Session session = - new SessionManager(stateStore, emrsClient, sessionSetting(false)) + new SessionManager(stateStore, emrsClient, sessionSetting()) .createSession(createSessionRequest()); updateSessionState(stateStore, DS_NAME).apply(session.getSessionModel(), SessionState.FAIL); @@ -314,7 +314,7 @@ public void failToSubmitStatementInFailState() { @Test public void newStatementFieldAssert() { Session session = - new SessionManager(stateStore, emrsClient, sessionSetting(false)) + new SessionManager(stateStore, emrsClient, sessionSetting()) .createSession(createSessionRequest()); StatementId statementId = session.submit(queryRequest()); Optional statement = session.get(statementId); @@ -332,7 +332,7 @@ public void newStatementFieldAssert() { @Test public void failToSubmitStatementInDeletedSession() { Session session = - new SessionManager(stateStore, emrsClient, sessionSetting(false)) + new SessionManager(stateStore, emrsClient, sessionSetting()) .createSession(createSessionRequest()); // other's delete session @@ -348,7 +348,7 @@ public void failToSubmitStatementInDeletedSession() { @Test public void getStatementSuccess() { Session session = - new SessionManager(stateStore, emrsClient, sessionSetting(false)) + new SessionManager(stateStore, emrsClient, sessionSetting()) .createSession(createSessionRequest()); // App change state to running updateSessionState(stateStore, DS_NAME).apply(session.getSessionModel(), SessionState.RUNNING); @@ -363,7 +363,7 @@ public void getStatementSuccess() { @Test public void getStatementNotExist() { Session session = - new SessionManager(stateStore, emrsClient, sessionSetting(false)) + new SessionManager(stateStore, emrsClient, sessionSetting()) .createSession(createSessionRequest()); // App change state to running updateSessionState(stateStore, DS_NAME).apply(session.getSessionModel(), SessionState.RUNNING); diff --git a/spark/src/test/java/org/opensearch/sql/spark/utils/MockTimeProvider.java b/spark/src/test/java/org/opensearch/sql/spark/utils/MockTimeProvider.java new file mode 100644 index 0000000000..2f4c960ec0 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/utils/MockTimeProvider.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.utils; + +public class MockTimeProvider implements TimeProvider { + private final long fixedTime; + + public MockTimeProvider(long fixedTime) { + this.fixedTime = fixedTime; + } + + @Override + public long currentEpochMillis() { + return fixedTime; + } +}