Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redefine Drop Index as logical delete #2386

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.sql.common.setting;

import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_ENABLED;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import java.util.List;
Expand Down Expand Up @@ -40,8 +38,8 @@ public enum Key {
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),
SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name"),
SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"),
SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit"),
SPARK_EXECUTION_REFRESH_JOB_LIMIT("plugins.query.executionengine.spark.refresh_job.limit"),
SESSION_INDEX_TTL("plugins.query.executionengine.spark.session.index.ttl"),
RESULT_INDEX_TTL("plugins.query.executionengine.spark.result.index.ttl"),
AUTO_INDEX_MANAGEMENT_ENABLED(
Expand Down Expand Up @@ -69,9 +67,4 @@ public static Optional<Key> of(String keyValue) {
public abstract <T> T getSettingValue(Key key);

public abstract List<?> getSettings();

/** Helper class */
public static boolean isSparkExecutionSessionEnabled(Settings settings) {
return settings.getSettingValue(SPARK_EXECUTION_SESSION_ENABLED);
}
}
26 changes: 14 additions & 12 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,16 @@ SQL query::
"status": 400
}

plugins.query.executionengine.spark.session.enabled
===================================================

plugins.query.executionengine.spark.session.limit
==================================================

Description
-----------

By default, execution engine is executed in session mode. You can disable session mode by this setting.
Each cluster can have maximum 100 sessions running in parallel by default. You can increase limit by this setting.

1. The default value is true.
1. The default value is 100.
2. This setting is node scope.
3. This setting can be updated dynamically.

Expand All @@ -328,7 +329,7 @@ You can update the setting with a new value like this.
SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.session.enabled":"false"}}'
... -d '{"transient":{"plugins.query.executionengine.spark.session.limit":200}}'
{
"acknowledged": true,
"persistent": {},
Expand All @@ -338,7 +339,7 @@ SQL query::
"executionengine": {
"spark": {
"session": {
"enabled": "false"
"limit": "200"
}
}
}
Expand All @@ -347,15 +348,16 @@ SQL query::
}
}

plugins.query.executionengine.spark.session.limit
==================================================

plugins.query.executionengine.spark.refresh_job.limit
=====================================================

Description
-----------

Each cluster can have maximum 100 sessions running in parallel by default. You can increase limit by this setting.
Each cluster can have maximum 20 datasources. You can increase limit by this setting.

1. The default value is 100.
1. The default value is 20.
2. This setting is node scope.
3. This setting can be updated dynamically.

Expand All @@ -364,7 +366,7 @@ You can update the setting with a new value like this.
SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.session.limit":200}}'
... -d '{"transient":{"plugins.query.executionengine.spark.refresh_job.limit":200}}'
{
"acknowledged": true,
"persistent": {},
Expand All @@ -373,7 +375,7 @@ SQL query::
"query": {
"executionengine": {
"spark": {
"session": {
"refresh_job": {
"limit": "200"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,20 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> SPARK_EXECUTION_SESSION_ENABLED_SETTING =
Setting.boolSetting(
Key.SPARK_EXECUTION_SESSION_ENABLED.getKeyValue(),
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> SPARK_EXECUTION_SESSION_LIMIT_SETTING =
Setting.intSetting(
Key.SPARK_EXECUTION_SESSION_LIMIT.getKeyValue(),
100,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING =
Setting.intSetting(
Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT.getKeyValue(),
50,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<TimeValue> SESSION_INDEX_TTL_SETTING =
Setting.positiveTimeSetting(
Key.SESSION_INDEX_TTL.getKeyValue(),
Expand Down Expand Up @@ -249,18 +249,18 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.SPARK_EXECUTION_ENGINE_CONFIG,
SPARK_EXECUTION_ENGINE_CONFIG,
new Updater(Key.SPARK_EXECUTION_ENGINE_CONFIG));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_SESSION_ENABLED,
SPARK_EXECUTION_SESSION_ENABLED_SETTING,
new Updater(Key.SPARK_EXECUTION_SESSION_ENABLED));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_SESSION_LIMIT,
SPARK_EXECUTION_SESSION_LIMIT_SETTING,
new Updater(Key.SPARK_EXECUTION_SESSION_LIMIT));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT,
SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING,
new Updater(Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT));
register(
settingBuilder,
clusterSettings,
Expand Down Expand Up @@ -350,8 +350,8 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.add(SPARK_EXECUTION_SESSION_ENABLED_SETTING)
.add(SPARK_EXECUTION_SESSION_LIMIT_SETTING)
.add(SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING)
.add(SESSION_INDEX_TTL_SETTING)
.add(RESULT_INDEX_TTL_SETTING)
.add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService(
new FlintIndexMetadataReaderImpl(client),
client,
new SessionManager(stateStore, emrServerlessClient, pluginSettings),
new DefaultLeaseManager(pluginSettings, stateStore));
new DefaultLeaseManager(pluginSettings, stateStore),
stateStore);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
Expand Down
1 change: 1 addition & 0 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.execution.session.SessionModel',
'org.opensearch.sql.spark.execution.statement.StatementModel',
'org.opensearch.sql.spark.flint.FlintIndexStateModel',
// TODO: add tests for purging flint indices
'org.opensearch.sql.spark.cluster.ClusterManagerEventListener*',
'org.opensearch.sql.spark.cluster.FlintIndexRetention',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public CreateAsyncQueryResponse createAsyncQuery(
dispatchQueryResponse.getQueryId(),
sparkExecutionEngineConfig.getApplicationId(),
dispatchQueryResponse.getJobId(),
dispatchQueryResponse.isDropIndexQuery(),
dispatchQueryResponse.getResultIndex(),
dispatchQueryResponse.getSessionId()));
return new CreateAsyncQueryResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class AsyncQueryJobMetadata extends StateModel {
private final AsyncQueryId queryId;
private final String applicationId;
private final String jobId;
private final boolean isDropIndexQuery;
private final String resultIndex;
// optional sessionId.
private final String sessionId;
Expand All @@ -43,7 +42,6 @@ public AsyncQueryJobMetadata(
queryId,
applicationId,
jobId,
false,
resultIndex,
null,
SequenceNumbers.UNASSIGNED_SEQ_NO,
Expand All @@ -54,14 +52,12 @@ public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
boolean isDropIndexQuery,
String resultIndex,
String sessionId) {
this(
queryId,
applicationId,
jobId,
isDropIndexQuery,
resultIndex,
sessionId,
SequenceNumbers.UNASSIGNED_SEQ_NO,
Expand All @@ -72,15 +68,13 @@ public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
boolean isDropIndexQuery,
String resultIndex,
String sessionId,
long seqNo,
long primaryTerm) {
this.queryId = queryId;
this.applicationId = applicationId;
this.jobId = jobId;
this.isDropIndexQuery = isDropIndexQuery;
this.resultIndex = resultIndex;
this.sessionId = sessionId;
this.seqNo = seqNo;
Expand All @@ -106,7 +100,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field("type", TYPE_JOBMETA)
.field("jobId", jobId)
.field("applicationId", applicationId)
.field("isDropIndexQuery", isDropIndexQuery)
.field("resultIndex", resultIndex)
.field("sessionId", sessionId)
.endObject();
Expand All @@ -120,7 +113,6 @@ public static AsyncQueryJobMetadata copy(
copy.getQueryId(),
copy.getApplicationId(),
copy.getJobId(),
copy.isDropIndexQuery(),
copy.getResultIndex(),
copy.getSessionId(),
seqNo,
Expand Down Expand Up @@ -176,14 +168,7 @@ public static AsyncQueryJobMetadata fromXContent(
throw new IllegalArgumentException("jobId and applicationId are required fields.");
}
return new AsyncQueryJobMetadata(
queryId,
applicationId,
jobId,
isDropIndexQuery,
resultIndex,
sessionId,
seqNo,
primaryTerm);
queryId, applicationId, jobId, resultIndex, sessionId, seqNo, primaryTerm);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@
public abstract class AsyncQueryHandler {

public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
if (asyncQueryJobMetadata.isDropIndexQuery()) {
return SparkQueryDispatcher.DropIndexResult.fromJobId(asyncQueryJobMetadata.getJobId())
.result();
}

JSONObject result = getResponseFromResultIndex(asyncQueryJobMetadata);
if (result.has(DATA_FIELD)) {
JSONObject items = result.getJSONObject(DATA_FIELD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

@RequiredArgsConstructor
public class BatchQueryHandler extends AsyncQueryHandler {
private final EMRServerlessClient emrServerlessClient;
private final JobExecutionResponseReader jobExecutionResponseReader;
protected final LeaseManager leaseManager;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
Expand Down Expand Up @@ -60,6 +63,8 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource()));

String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query";
Map<String, String> tags = context.getTags();
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
Expand All @@ -81,6 +86,6 @@ public DispatchQueryResponse submit(
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
return new DispatchQueryResponse(
context.getQueryId(), jobId, false, dataSourceMetadata.getResultIndex(), null);
context.getQueryId(), jobId, dataSourceMetadata.getResultIndex(), null);
}
}
Loading
Loading