Skip to content

Commit

Permalink
add concurrent limit on datasource and sessions (#2390) (#2395)
Browse files Browse the repository at this point in the history
* add concurrent limit on datasource and sessions



* fix ut coverage



---------


(cherry picked from commit d3ce049)

Signed-off-by: Peng Huo <penghuo@gmail.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent dd48b9b commit deb3ccf
Show file tree
Hide file tree
Showing 23 changed files with 649 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum Key {
QUERY_SIZE_LIMIT("plugins.query.size_limit"),
ENCYRPTION_MASTER_KEY("plugins.query.datasources.encryption.masterkey"),
DATASOURCES_URI_HOSTS_DENY_LIST("plugins.query.datasources.uri.hosts.denylist"),
DATASOURCES_LIMIT("plugins.query.datasources.limit"),

METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),
Expand Down
3 changes: 2 additions & 1 deletion datasources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ repositories {
dependencies {
implementation project(':core')
implementation project(':protocol')
implementation project(':opensearch')
implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.opensearch', name: 'opensearch-x-content', version: "${opensearch_version}"
implementation group: 'org.opensearch', name: 'common-utils', version: "${opensearch_build}"
Expand All @@ -35,7 +36,7 @@ dependencies {
test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
events "skipped", "failed"
exceptionFormat "full"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.opensearch.sql.datasources.transport;

import static org.opensearch.sql.common.setting.Settings.Key.DATASOURCES_LIMIT;
import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;

import org.opensearch.action.ActionType;
Expand All @@ -30,6 +31,7 @@ public class TransportCreateDataSourceAction
new ActionType<>(NAME, CreateDataSourceActionResponse::new);

private DataSourceService dataSourceService;
private org.opensearch.sql.opensearch.setting.OpenSearchSettings settings;

/**
* TransportCreateDataSourceAction action for creating datasource.
Expand All @@ -42,33 +44,44 @@ public class TransportCreateDataSourceAction
public TransportCreateDataSourceAction(
TransportService transportService,
ActionFilters actionFilters,
DataSourceServiceImpl dataSourceService) {
DataSourceServiceImpl dataSourceService,
org.opensearch.sql.opensearch.setting.OpenSearchSettings settings) {
super(
TransportCreateDataSourceAction.NAME,
transportService,
actionFilters,
CreateDataSourceActionRequest::new);
this.dataSourceService = dataSourceService;
this.settings = settings;
}

@Override
protected void doExecute(
Task task,
CreateDataSourceActionRequest request,
ActionListener<CreateDataSourceActionResponse> actionListener) {
try {
DataSourceMetadata dataSourceMetadata = request.getDataSourceMetadata();
dataSourceService.createDataSource(dataSourceMetadata);
String responseContent =
new JsonResponseFormatter<String>(PRETTY) {
@Override
protected Object buildJsonObject(String response) {
return response;
}
}.format("Created DataSource with name " + dataSourceMetadata.getName());
actionListener.onResponse(new CreateDataSourceActionResponse(responseContent));
} catch (Exception e) {
actionListener.onFailure(e);
int dataSourceLimit = settings.getSettingValue(DATASOURCES_LIMIT);
if (dataSourceService.getDataSourceMetadata(false).size() >= dataSourceLimit) {
actionListener.onFailure(
new IllegalStateException(
String.format(
"domain concurrent datasources can not" + " exceed %d", dataSourceLimit)));
} else {
try {

DataSourceMetadata dataSourceMetadata = request.getDataSourceMetadata();
dataSourceService.createDataSource(dataSourceMetadata);
String responseContent =
new JsonResponseFormatter<String>(PRETTY) {
@Override
protected Object buildJsonObject(String response) {
return response;
}
}.format("Created DataSource with name " + dataSourceMetadata.getName());
actionListener.onResponse(new CreateDataSourceActionResponse(responseContent));
} catch (Exception e) {
actionListener.onFailure(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package org.opensearch.sql.datasources.transport;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.common.setting.Settings.Key.DATASOURCES_LIMIT;

import java.util.HashSet;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
Expand All @@ -21,6 +26,7 @@
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand All @@ -29,9 +35,13 @@ public class TransportCreateDataSourceActionTest {

@Mock private TransportService transportService;
@Mock private TransportCreateDataSourceAction action;
@Mock private DataSourceServiceImpl dataSourceService;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private DataSourceServiceImpl dataSourceService;

@Mock private Task task;
@Mock private ActionListener<CreateDataSourceActionResponse> actionListener;
@Mock private OpenSearchSettings settings;

@Captor
private ArgumentCaptor<CreateDataSourceActionResponse>
Expand All @@ -43,7 +53,9 @@ public class TransportCreateDataSourceActionTest {
public void setUp() {
action =
new TransportCreateDataSourceAction(
transportService, new ActionFilters(new HashSet<>()), dataSourceService);
transportService, new ActionFilters(new HashSet<>()), dataSourceService, settings);
when(dataSourceService.getDataSourceMetadata(false).size()).thenReturn(1);
when(settings.getSettingValue(DATASOURCES_LIMIT)).thenReturn(20);
}

@Test
Expand Down Expand Up @@ -79,4 +91,30 @@ public void testDoExecuteWithException() {
Assertions.assertTrue(exception instanceof RuntimeException);
Assertions.assertEquals("Error", exception.getMessage());
}

@Test
public void testDataSourcesLimit() {
DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
dataSourceMetadata.setName("test_datasource");
dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS);
CreateDataSourceActionRequest request = new CreateDataSourceActionRequest(dataSourceMetadata);
when(dataSourceService.getDataSourceMetadata(false).size()).thenReturn(1);
when(settings.getSettingValue(DATASOURCES_LIMIT)).thenReturn(1);

action.doExecute(
task,
request,
new ActionListener<CreateDataSourceActionResponse>() {
@Override
public void onResponse(CreateDataSourceActionResponse createDataSourceActionResponse) {
fail();
}

@Override
public void onFailure(Exception e) {
assertEquals("domain concurrent datasources can not exceed 1", e.getMessage());
}
});
verify(dataSourceService, times(0)).createDataSource(dataSourceMetadata);
}
}
37 changes: 35 additions & 2 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,12 @@ SQL query::
}

plugins.query.executionengine.spark.session.limit
===================================================
==================================================

Description
-----------

Each datasource can have maximum 100 sessions running in parallel by default. You can increase limit by this setting.
Each cluster can have maximum 100 sessions running in parallel by default. You can increase limit by this setting.

1. The default value is 100.
2. This setting is node scope.
Expand Down Expand Up @@ -383,3 +383,36 @@ SQL query::
}
}


plugins.query.datasources.limit
===============================

Description
-----------

Each cluster can have maximum 20 datasources. You can increase limit by this setting.

1. The default value is 20.
2. This setting is node scope.
3. This setting can be updated dynamically.

You can update the setting with a new value like this.

SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient":{"plugins.query.datasources.limit":25}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"datasources": {
"limit": "25"
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -34,6 +35,11 @@

public class DataSourceAPIsIT extends PPLIntegTestCase {

@After
public void cleanUp() throws IOException {
wipeAllClusterSettings();
}

@AfterClass
protected static void deleteDataSourcesCreated() throws IOException {
Request deleteRequest = getDeleteDataSourceRequest("create_prometheus");
Expand All @@ -51,6 +57,10 @@ protected static void deleteDataSourcesCreated() throws IOException {
deleteRequest = getDeleteDataSourceRequest("Create_Prometheus");
deleteResponse = client().performRequest(deleteRequest);
Assert.assertEquals(204, deleteResponse.getStatusLine().getStatusCode());

deleteRequest = getDeleteDataSourceRequest("duplicate_prometheus");
deleteResponse = client().performRequest(deleteRequest);
Assert.assertEquals(204, deleteResponse.getStatusLine().getStatusCode());
}

@SneakyThrows
Expand Down Expand Up @@ -283,4 +293,45 @@ public void issue2196() {
Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.password"));
Assert.assertEquals("Prometheus Creation for Integ test", dataSourceMetadata.getDescription());
}

@Test
public void datasourceLimitTest() throws InterruptedException, IOException {
DataSourceMetadata d1 = mockDataSourceMetadata("duplicate_prometheus");
Request createRequest = getCreateDataSourceRequest(d1);
Response response = client().performRequest(createRequest);
Assert.assertEquals(201, response.getStatusLine().getStatusCode());
// Datasource is not immediately created. so introducing a sleep of 2s.
Thread.sleep(2000);

updateClusterSettings(new ClusterSetting(TRANSIENT, "plugins.query.datasources.limit", "1"));

DataSourceMetadata d2 = mockDataSourceMetadata("d2");
ResponseException exception =
Assert.assertThrows(
ResponseException.class, () -> client().performRequest(getCreateDataSourceRequest(d2)));
Assert.assertEquals(400, exception.getResponse().getStatusLine().getStatusCode());
String prometheusGetResponseString = getResponseBody(exception.getResponse());
JsonObject errorMessage = new Gson().fromJson(prometheusGetResponseString, JsonObject.class);
Assert.assertEquals(
"domain concurrent datasources can not exceed 1",
errorMessage.get("error").getAsJsonObject().get("details").getAsString());
}

public DataSourceMetadata mockDataSourceMetadata(String name) {
return new DataSourceMetadata(
name,
"Prometheus Creation for Integ test",
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of(
"prometheus.uri",
"https://localhost:9090",
"prometheus.auth.type",
"basicauth",
"prometheus.auth.username",
"username",
"prometheus.auth.password",
"password"),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> DATASOURCES_LIMIT_SETTING =
Setting.intSetting(
Key.DATASOURCES_LIMIT.getKeyValue(),
20,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -272,6 +279,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.AUTO_INDEX_MANAGEMENT_ENABLED,
AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
new Updater(Key.AUTO_INDEX_MANAGEMENT_ENABLED));
register(
settingBuilder,
clusterSettings,
Key.DATASOURCES_LIMIT,
DATASOURCES_LIMIT_SETTING,
new Updater(Key.DATASOURCES_LIMIT));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -342,6 +355,7 @@ public static List<Setting<?>> pluginSettings() {
.add(SESSION_INDEX_TTL_SETTING)
.add(RESULT_INDEX_TTL_SETTING)
.add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING)
.add(DATASOURCES_LIMIT_SETTING)
.build();
}

Expand Down
6 changes: 4 additions & 2 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
Expand Down Expand Up @@ -258,7 +259,7 @@ public Collection<Object> createComponents(
OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
environment.settings());
return ImmutableList.of(
dataSourceService, asyncQueryExecutorService, clusterManagerEventListener);
dataSourceService, asyncQueryExecutorService, clusterManagerEventListener, pluginSettings);
}

@Override
Expand Down Expand Up @@ -333,7 +334,8 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService(
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client),
client,
new SessionManager(stateStore, emrServerlessClient, pluginSettings));
new SessionManager(stateStore, emrServerlessClient, pluginSettings),
new DefaultLeaseManager(pluginSettings, stateStore));
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import com.amazonaws.services.emrserverless.model.JobRunState;
import org.json.JSONObject;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;

/** Process async query request. */
public abstract class AsyncQueryHandler {
Expand Down Expand Up @@ -45,5 +48,8 @@ protected abstract JSONObject getResponseFromResultIndex(
protected abstract JSONObject getResponseFromExecutor(
AsyncQueryJobMetadata asyncQueryJobMetadata);

abstract String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata);
public abstract String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata);

public abstract DispatchQueryResponse submit(
DispatchQueryRequest request, DispatchQueryContext context);
}
Loading

0 comments on commit deb3ccf

Please sign in to comment.