Skip to content

Commit

Permalink
Redefine Drop Index as logical delete (#2386)
Browse files Browse the repository at this point in the history
* Redefine Drop Index as logical delete

Signed-off-by: Peng Huo <penghuo@gmail.com>

* merge from 2.x

Signed-off-by: Peng Huo <penghuo@gmail.com>

* add refresh_job limit and disable batch query

Signed-off-by: Peng Huo <penghuo@gmail.com>

* update doc

Signed-off-by: Peng Huo <penghuo@gmail.com>

---------

Signed-off-by: Peng Huo <penghuo@gmail.com>
(cherry picked from commit cb8d953)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Oct 30, 2023
1 parent 1fd492e commit 2b6428d
Show file tree
Hide file tree
Showing 45 changed files with 2,269 additions and 769 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.sql.common.setting;

import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_ENABLED;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import java.util.List;
Expand Down Expand Up @@ -40,8 +38,8 @@ public enum Key {
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),
SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name"),
SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"),
SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit"),
SPARK_EXECUTION_REFRESH_JOB_LIMIT("plugins.query.executionengine.spark.refresh_job.limit"),
SESSION_INDEX_TTL("plugins.query.executionengine.spark.session.index.ttl"),
RESULT_INDEX_TTL("plugins.query.executionengine.spark.result.index.ttl"),
AUTO_INDEX_MANAGEMENT_ENABLED(
Expand Down Expand Up @@ -69,9 +67,4 @@ public static Optional<Key> of(String keyValue) {
public abstract <T> T getSettingValue(Key key);

public abstract List<?> getSettings();

/** Helper class */
public static boolean isSparkExecutionSessionEnabled(Settings settings) {
return settings.getSettingValue(SPARK_EXECUTION_SESSION_ENABLED);
}
}
26 changes: 14 additions & 12 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,16 @@ SQL query::
"status": 400
}

plugins.query.executionengine.spark.session.enabled
===================================================

plugins.query.executionengine.spark.session.limit
==================================================

Description
-----------

By default, execution engine is executed in session mode. You can disable session mode by this setting.
Each cluster can have maximum 100 sessions running in parallel by default. You can increase limit by this setting.

1. The default value is true.
1. The default value is 100.
2. This setting is node scope.
3. This setting can be updated dynamically.

Expand All @@ -328,7 +329,7 @@ You can update the setting with a new value like this.
SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.session.enabled":"false"}}'
... -d '{"transient":{"plugins.query.executionengine.spark.session.limit":200}}'
{
"acknowledged": true,
"persistent": {},
Expand All @@ -338,7 +339,7 @@ SQL query::
"executionengine": {
"spark": {
"session": {
"enabled": "false"
"limit": "200"
}
}
}
Expand All @@ -347,15 +348,16 @@ SQL query::
}
}

plugins.query.executionengine.spark.session.limit
==================================================

plugins.query.executionengine.spark.refresh_job.limit
=====================================================

Description
-----------

Each cluster can have maximum 100 sessions running in parallel by default. You can increase limit by this setting.
Each cluster can have maximum 20 datasources. You can increase limit by this setting.

1. The default value is 100.
1. The default value is 20.
2. This setting is node scope.
3. This setting can be updated dynamically.

Expand All @@ -364,7 +366,7 @@ You can update the setting with a new value like this.
SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.session.limit":200}}'
... -d '{"transient":{"plugins.query.executionengine.spark.refresh_job.limit":200}}'
{
"acknowledged": true,
"persistent": {},
Expand All @@ -373,7 +375,7 @@ SQL query::
"query": {
"executionengine": {
"spark": {
"session": {
"refresh_job": {
"limit": "200"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,20 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> SPARK_EXECUTION_SESSION_ENABLED_SETTING =
Setting.boolSetting(
Key.SPARK_EXECUTION_SESSION_ENABLED.getKeyValue(),
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> SPARK_EXECUTION_SESSION_LIMIT_SETTING =
Setting.intSetting(
Key.SPARK_EXECUTION_SESSION_LIMIT.getKeyValue(),
100,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING =
Setting.intSetting(
Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT.getKeyValue(),
50,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<TimeValue> SESSION_INDEX_TTL_SETTING =
Setting.positiveTimeSetting(
Key.SESSION_INDEX_TTL.getKeyValue(),
Expand Down Expand Up @@ -249,18 +249,18 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.SPARK_EXECUTION_ENGINE_CONFIG,
SPARK_EXECUTION_ENGINE_CONFIG,
new Updater(Key.SPARK_EXECUTION_ENGINE_CONFIG));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_SESSION_ENABLED,
SPARK_EXECUTION_SESSION_ENABLED_SETTING,
new Updater(Key.SPARK_EXECUTION_SESSION_ENABLED));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_SESSION_LIMIT,
SPARK_EXECUTION_SESSION_LIMIT_SETTING,
new Updater(Key.SPARK_EXECUTION_SESSION_LIMIT));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT,
SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING,
new Updater(Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT));
register(
settingBuilder,
clusterSettings,
Expand Down Expand Up @@ -350,8 +350,8 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.add(SPARK_EXECUTION_SESSION_ENABLED_SETTING)
.add(SPARK_EXECUTION_SESSION_LIMIT_SETTING)
.add(SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING)
.add(SESSION_INDEX_TTL_SETTING)
.add(RESULT_INDEX_TTL_SETTING)
.add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService(
new FlintIndexMetadataReaderImpl(client),
client,
new SessionManager(stateStore, emrServerlessClient, pluginSettings),
new DefaultLeaseManager(pluginSettings, stateStore));
new DefaultLeaseManager(pluginSettings, stateStore),
stateStore);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
Expand Down
1 change: 1 addition & 0 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.execution.session.SessionModel',
'org.opensearch.sql.spark.execution.statement.StatementModel',
'org.opensearch.sql.spark.flint.FlintIndexStateModel',
// TODO: add tests for purging flint indices
'org.opensearch.sql.spark.cluster.ClusterManagerEventListener*',
'org.opensearch.sql.spark.cluster.FlintIndexRetention',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public CreateAsyncQueryResponse createAsyncQuery(
dispatchQueryResponse.getQueryId(),
sparkExecutionEngineConfig.getApplicationId(),
dispatchQueryResponse.getJobId(),
dispatchQueryResponse.isDropIndexQuery(),
dispatchQueryResponse.getResultIndex(),
dispatchQueryResponse.getSessionId()));
return new CreateAsyncQueryResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class AsyncQueryJobMetadata extends StateModel {
private final AsyncQueryId queryId;
private final String applicationId;
private final String jobId;
private final boolean isDropIndexQuery;
private final String resultIndex;
// optional sessionId.
private final String sessionId;
Expand All @@ -43,7 +42,6 @@ public AsyncQueryJobMetadata(
queryId,
applicationId,
jobId,
false,
resultIndex,
null,
SequenceNumbers.UNASSIGNED_SEQ_NO,
Expand All @@ -54,14 +52,12 @@ public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
boolean isDropIndexQuery,
String resultIndex,
String sessionId) {
this(
queryId,
applicationId,
jobId,
isDropIndexQuery,
resultIndex,
sessionId,
SequenceNumbers.UNASSIGNED_SEQ_NO,
Expand All @@ -72,15 +68,13 @@ public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
boolean isDropIndexQuery,
String resultIndex,
String sessionId,
long seqNo,
long primaryTerm) {
this.queryId = queryId;
this.applicationId = applicationId;
this.jobId = jobId;
this.isDropIndexQuery = isDropIndexQuery;
this.resultIndex = resultIndex;
this.sessionId = sessionId;
this.seqNo = seqNo;
Expand All @@ -106,7 +100,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field("type", TYPE_JOBMETA)
.field("jobId", jobId)
.field("applicationId", applicationId)
.field("isDropIndexQuery", isDropIndexQuery)
.field("resultIndex", resultIndex)
.field("sessionId", sessionId)
.endObject();
Expand All @@ -120,7 +113,6 @@ public static AsyncQueryJobMetadata copy(
copy.getQueryId(),
copy.getApplicationId(),
copy.getJobId(),
copy.isDropIndexQuery(),
copy.getResultIndex(),
copy.getSessionId(),
seqNo,
Expand Down Expand Up @@ -176,14 +168,7 @@ public static AsyncQueryJobMetadata fromXContent(
throw new IllegalArgumentException("jobId and applicationId are required fields.");
}
return new AsyncQueryJobMetadata(
queryId,
applicationId,
jobId,
isDropIndexQuery,
resultIndex,
sessionId,
seqNo,
primaryTerm);
queryId, applicationId, jobId, resultIndex, sessionId, seqNo, primaryTerm);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@
public abstract class AsyncQueryHandler {

public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
if (asyncQueryJobMetadata.isDropIndexQuery()) {
return SparkQueryDispatcher.DropIndexResult.fromJobId(asyncQueryJobMetadata.getJobId())
.result();
}

JSONObject result = getResponseFromResultIndex(asyncQueryJobMetadata);
if (result.has(DATA_FIELD)) {
JSONObject items = result.getJSONObject(DATA_FIELD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

@RequiredArgsConstructor
public class BatchQueryHandler extends AsyncQueryHandler {
private final EMRServerlessClient emrServerlessClient;
private final JobExecutionResponseReader jobExecutionResponseReader;
protected final LeaseManager leaseManager;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
Expand Down Expand Up @@ -60,6 +63,8 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource()));

String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query";
Map<String, String> tags = context.getTags();
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
Expand All @@ -81,6 +86,6 @@ public DispatchQueryResponse submit(
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
return new DispatchQueryResponse(
context.getQueryId(), jobId, false, dataSourceMetadata.getResultIndex(), null);
context.getQueryId(), jobId, dataSourceMetadata.getResultIndex(), null);
}
}
Loading

0 comments on commit 2b6428d

Please sign in to comment.