Skip to content

Commit

Permalink
merge conflict
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo committed Oct 3, 2023
2 parents 06f7a74 + 79cac7d commit ab3096a
Show file tree
Hide file tree
Showing 30 changed files with 765 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public String explain() {
@Override
public void open() {
List<ExprValue> exprValues = new ArrayList<>();
Set<DataSourceMetadata> dataSourceMetadataSet = dataSourceService.getDataSourceMetadata(true);
Set<DataSourceMetadata> dataSourceMetadataSet = dataSourceService.getDataSourceMetadata(false);
for (DataSourceMetadata dataSourceMetadata : dataSourceMetadataSet) {
exprValues.add(
new ExprTupleValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void testIterator() {
Collections.emptyList(),
ImmutableMap.of()))
.collect(Collectors.toSet());
when(dataSourceService.getDataSourceMetadata(true)).thenReturn(dataSourceMetadata);
when(dataSourceService.getDataSourceMetadata(false)).thenReturn(dataSourceMetadata);

assertFalse(dataSourceTableScan.hasNext());
dataSourceTableScan.open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Map;

public enum AuthenticationType {
NOAUTH("noauth"),
BASICAUTH("basicauth"),
AWSSIGV4AUTH("awssigv4");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.auth.AuthenticationType;
import org.opensearch.sql.datasources.utils.DatasourceValidationUtils;
import org.opensearch.sql.storage.DataSourceFactory;

Expand All @@ -20,9 +21,14 @@ public class GlueDataSourceFactory implements DataSourceFactory {
// Glue configuration properties
public static final String GLUE_AUTH_TYPE = "glue.auth.type";
public static final String GLUE_ROLE_ARN = "glue.auth.role_arn";
public static final String FLINT_URI = "glue.indexstore.opensearch.uri";
public static final String FLINT_AUTH = "glue.indexstore.opensearch.auth";
public static final String FLINT_REGION = "glue.indexstore.opensearch.region";
public static final String GLUE_INDEX_STORE_OPENSEARCH_URI = "glue.indexstore.opensearch.uri";
public static final String GLUE_INDEX_STORE_OPENSEARCH_AUTH = "glue.indexstore.opensearch.auth";
public static final String GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME =
"glue.indexstore.opensearch.auth.username";
public static final String GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD =
"glue.indexstore.opensearch.auth.password";
public static final String GLUE_INDEX_STORE_OPENSEARCH_REGION =
"glue.indexstore.opensearch.region";

@Override
public DataSourceType getDataSourceType() {
Expand All @@ -46,11 +52,28 @@ public DataSource createDataSource(DataSourceMetadata metadata) {

private void validateGlueDataSourceConfiguration(Map<String, String> dataSourceMetadataConfig)
throws URISyntaxException, UnknownHostException {

DatasourceValidationUtils.validateLengthAndRequiredFields(
dataSourceMetadataConfig,
Set.of(GLUE_AUTH_TYPE, GLUE_ROLE_ARN, FLINT_URI, FLINT_REGION, FLINT_AUTH));
Set.of(
GLUE_AUTH_TYPE,
GLUE_ROLE_ARN,
GLUE_INDEX_STORE_OPENSEARCH_URI,
GLUE_INDEX_STORE_OPENSEARCH_AUTH));
AuthenticationType authenticationType =
AuthenticationType.get(dataSourceMetadataConfig.get(GLUE_INDEX_STORE_OPENSEARCH_AUTH));
if (AuthenticationType.BASICAUTH.equals(authenticationType)) {
DatasourceValidationUtils.validateLengthAndRequiredFields(
dataSourceMetadataConfig,
Set.of(
GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME,
GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD));
} else if (AuthenticationType.AWSSIGV4AUTH.equals(authenticationType)) {
DatasourceValidationUtils.validateLengthAndRequiredFields(
dataSourceMetadataConfig, Set.of(GLUE_INDEX_STORE_OPENSEARCH_REGION));
}
DatasourceValidationUtils.validateHost(
dataSourceMetadataConfig.get(FLINT_URI),
dataSourceMetadataConfig.get(GLUE_INDEX_STORE_OPENSEARCH_URI),
pluginSettings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.auth.AuthenticationType;
import org.opensearch.sql.datasources.encryptor.Encryptor;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.datasources.service.DataSourceMetadataStorage;
Expand Down Expand Up @@ -93,7 +92,9 @@ public Optional<DataSourceMetadata> getDataSourceMetadata(String datasourceName)
createDataSourcesIndex();
return Optional.empty();
}
return searchInDataSourcesIndex(QueryBuilders.termQuery("name", datasourceName)).stream()
// todo, in case docId == datasourceName, could read doc directly.
return searchInDataSourcesIndex(QueryBuilders.termQuery("name.keyword", datasourceName))
.stream()
.findFirst()
.map(x -> this.encryptDecryptAuthenticationData(x, false));
}
Expand Down Expand Up @@ -252,26 +253,13 @@ private List<DataSourceMetadata> searchInDataSourcesIndex(QueryBuilder query) {
}
}

@SuppressWarnings("missingswitchdefault")
// Encrypt and Decrypt irrespective of auth type.If properties name ends in username, password,
// secret_key and access_key.
private DataSourceMetadata encryptDecryptAuthenticationData(
DataSourceMetadata dataSourceMetadata, Boolean isEncryption) {
Map<String, String> propertiesMap = dataSourceMetadata.getProperties();
Optional<AuthenticationType> authTypeOptional =
propertiesMap.keySet().stream()
.filter(s -> s.endsWith("auth.type"))
.findFirst()
.map(propertiesMap::get)
.map(AuthenticationType::get);
if (authTypeOptional.isPresent()) {
switch (authTypeOptional.get()) {
case BASICAUTH:
handleBasicAuthPropertiesEncryptionDecryption(propertiesMap, isEncryption);
break;
case AWSSIGV4AUTH:
handleSigV4PropertiesEncryptionDecryption(propertiesMap, isEncryption);
break;
}
}
handleBasicAuthPropertiesEncryptionDecryption(propertiesMap, isEncryption);
handleSigV4PropertiesEncryptionDecryption(propertiesMap, isEncryption);
return dataSourceMetadata;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.opensearch.sql.datasources.transport;

import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
Expand All @@ -17,6 +19,7 @@
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -56,9 +59,14 @@ protected void doExecute(
try {
DataSourceMetadata dataSourceMetadata = request.getDataSourceMetadata();
dataSourceService.createDataSource(dataSourceMetadata);
actionListener.onResponse(
new CreateDataSourceActionResponse(
"Created DataSource with name " + dataSourceMetadata.getName()));
String responseContent =
new JsonResponseFormatter<String>(PRETTY) {
@Override
protected Object buildJsonObject(String response) {
return response;
}
}.format("Created DataSource with name " + dataSourceMetadata.getName());
actionListener.onResponse(new CreateDataSourceActionResponse(responseContent));
} catch (Exception e) {
actionListener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.opensearch.sql.datasources.transport;

import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
Expand All @@ -16,6 +18,7 @@
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -55,9 +58,14 @@ protected void doExecute(
ActionListener<UpdateDataSourceActionResponse> actionListener) {
try {
dataSourceService.updateDataSource(request.getDataSourceMetadata());
actionListener.onResponse(
new UpdateDataSourceActionResponse(
"Updated DataSource with name " + request.getDataSourceMetadata().getName()));
String responseContent =
new JsonResponseFormatter<String>(PRETTY) {
@Override
protected Object buildJsonObject(String response) {
return response;
}
}.format("Updated DataSource with name " + request.getDataSourceMetadata().getName());
actionListener.onResponse(new UpdateDataSourceActionResponse(responseContent));
} catch (Exception e) {
actionListener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void testCreateGLueDatSource() {
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "false");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
Expand All @@ -59,6 +59,94 @@ void testCreateGLueDatSource() {
"Glue storage engine is not supported.", unsupportedOperationException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithBasicAuthForIndexStore() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(Collections.emptyList());
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "basicauth");
properties.put("glue.indexstore.opensearch.auth.username", "username");
properties.put("glue.indexstore.opensearch.auth.password", "password");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
DataSource dataSource = glueDatasourceFactory.createDataSource(metadata);
Assertions.assertEquals(DataSourceType.S3GLUE, dataSource.getConnectorType());
UnsupportedOperationException unsupportedOperationException =
Assertions.assertThrows(
UnsupportedOperationException.class,
() ->
dataSource
.getStorageEngine()
.getTable(new DataSourceSchemaName("my_glue", "default"), "alb_logs"));
Assertions.assertEquals(
"Glue storage engine is not supported.", unsupportedOperationException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithAwsSigV4AuthForIndexStore() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(Collections.emptyList());
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "awssigv4");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
DataSource dataSource = glueDatasourceFactory.createDataSource(metadata);
Assertions.assertEquals(DataSourceType.S3GLUE, dataSource.getConnectorType());
UnsupportedOperationException unsupportedOperationException =
Assertions.assertThrows(
UnsupportedOperationException.class,
() ->
dataSource
.getStorageEngine()
.getTable(new DataSourceSchemaName("my_glue", "default"), "alb_logs"));
Assertions.assertEquals(
"Glue storage engine is not supported.", unsupportedOperationException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithBasicAuthForIndexStoreAndMissingFields() {
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "basicauth");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata));
Assertions.assertEquals(
"Missing [glue.indexstore.opensearch.auth.password,"
+ " glue.indexstore.opensearch.auth.username] fields in the connector properties.",
illegalArgumentException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithInvalidFlintHost() {
Expand All @@ -71,7 +159,7 @@ void testCreateGLueDatSourceWithInvalidFlintHost() {
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "false");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
Expand Down Expand Up @@ -100,7 +188,7 @@ void testCreateGLueDatSourceWithInvalidFlintHostSyntax() {
properties.put(
"glue.indexstore.opensearch.uri",
"http://dummyprometheus.com:9090? paramt::localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "false");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public void testDoExecute() {
.onResponse(createDataSourceActionResponseArgumentCaptor.capture());
CreateDataSourceActionResponse createDataSourceActionResponse =
createDataSourceActionResponseArgumentCaptor.getValue();
Assertions.assertEquals(
"Created DataSource with name test_datasource", createDataSourceActionResponse.getResult());
String responseAsJson = "\"Created DataSource with name test_datasource\"";
Assertions.assertEquals(responseAsJson, createDataSourceActionResponse.getResult());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ public void testDoExecute() {
.onResponse(updateDataSourceActionResponseArgumentCaptor.capture());
UpdateDataSourceActionResponse updateDataSourceActionResponse =
updateDataSourceActionResponseArgumentCaptor.getValue();
Assertions.assertEquals(
"Updated DataSource with name test_datasource", updateDataSourceActionResponse.getResult());
String responseAsJson = "\"Updated DataSource with name test_datasource\"";

Assertions.assertEquals(responseAsJson, updateDataSourceActionResponse.getResult());
}

@Test
Expand Down
3 changes: 2 additions & 1 deletion docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Async Query Interface Endpoints
Introduction
============

For supporting `S3Glue <../ppl/admin/connector/s3glue_connector.rst>`_ and Cloudwatch datasources connectors, we have introduced a new execution engine on top of Spark.
For supporting `S3Glue <../ppl/admin/connectors/s3glue_connector.rst>`_ and Cloudwatch datasources connectors, we have introduced a new execution engine on top of Spark.
All the queries to be executed on spark execution engine can only be submitted via Async Query APIs. Below sections will list all the new APIs introduced.


Expand Down Expand Up @@ -45,6 +45,7 @@ Sample Request::
curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"datasource" : "my_glue",
"lang" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10"
}'
Expand Down
Loading

0 comments on commit ab3096a

Please sign in to comment.