Skip to content

Commit

Permalink
Create datasource API
Browse files Browse the repository at this point in the history
Signed-off-by: vamsi-amazon <reddyvam@amazon.com>
  • Loading branch information
vamsimanohar committed Mar 11, 2023
1 parent 11d0d6d commit eb77575
Show file tree
Hide file tree
Showing 23 changed files with 702 additions and 129 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ buildscript {
version_tokens = opensearch_version.tokenize('-')
opensearch_build = version_tokens[0] + '.0'
prometheus_binary_version = "2.37.2"
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
if (buildVersionQualifier) {
opensearch_build += "-${buildVersionQualifier}"
}
Expand Down
2 changes: 1 addition & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies {
api "org.antlr:antlr4-runtime:4.7.1"
api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
api group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.17.1'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.10'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.9.1'
Expand Down
4 changes: 3 additions & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ repositories {

dependencies {
api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.10'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.facebook.presto', name: 'presto-matching', version: '0.240'
api group: 'org.apache.commons', name: 'commons-math3', version: '3.6.1'
api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
api group: 'com.google.code.gson', name: 'gson', version: '2.8.9'
implementation 'com.amazonaws:aws-encryption-sdk-java:2.4.0'
api project(':common')

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public interface DataSourceService {
/**
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
* @param metadatas list of {@link DataSourceMetadata}.
* @param metadata {@link DataSourceMetadata}.
*/
void createDataSource(DataSourceMetadata... metadatas);
void createDataSource(DataSourceMetadata metadata);

/**
* Updates {@link DataSource} corresponding to dataSourceMetadata.
Expand All @@ -52,15 +52,4 @@ public interface DataSourceService {
* @param dataSourceName name of the {@link DataSource}.
*/
void deleteDataSource(String dataSourceName);

/**
* This method is to bootstrap
* datasources during the startup of the plugin.
*/
void bootstrapDataSources();

/**
* remove all the registered {@link DataSource}.
*/
void clear();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,23 @@

package org.opensearch.sql.datasource;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand All @@ -30,48 +38,63 @@
*/
public class DataSourceServiceImpl implements DataSourceService {

private static final Logger LOG = LogManager.getLogger();
private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*";

private final ConcurrentHashMap<String, DataSource> dataSourceMap;
private final ConcurrentHashMap<DataSourceMetadata, DataSource> dataSourceMap;

private final Map<DataSourceType, DataSourceFactory> dataSourceFactoryMap;

private final DataSourceMetadataStorage dataSourceMetadataStorage;

/**
* Construct from the set of {@link DataSourceFactory} at bootstrap time.
*/
public DataSourceServiceImpl(Set<DataSourceFactory> dataSourceFactories) {
public DataSourceServiceImpl(Set<DataSourceFactory> dataSourceFactories,
DataSourceMetadataStorage dataSourceMetadataStorage) {
dataSourceFactoryMap =
dataSourceFactories.stream()
.collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f));
dataSourceMap = new ConcurrentHashMap<>();
this.dataSourceMetadataStorage = dataSourceMetadataStorage;
}

@Override
public Set<DataSourceMetadata> getDataSourceMetadataSet() {
return dataSourceMap.values().stream()
.map(dataSource
-> new DataSourceMetadata(dataSource.getName(),
dataSource.getConnectorType(), ImmutableMap.of()))
.collect(Collectors.toSet());
List<DataSourceMetadata> dataSourceMetadataList =
this.dataSourceMetadataStorage.getDataSourceMetadata();
dataSourceMetadataList.add(DataSourceMetadata.defaultOpenSearchDataSourceMetadata());
return Set.copyOf(dataSourceMetadataList);
}


@Override
public DataSource getDataSource(String dataSourceName) {
if (!dataSourceMap.containsKey(dataSourceName)) {
throw new IllegalArgumentException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
Optional<DataSourceMetadata> dataSourceMetadataOptional
= this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName);
if (dataSourceMetadataOptional.isEmpty()) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
return dataSourceMap.get(DataSourceMetadata.defaultOpenSearchDataSourceMetadata());
} else {
throw new IllegalArgumentException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
}
} else if (!dataSourceMap.containsKey(dataSourceMetadataOptional.get())) {
DataSourceMetadata metadata = dataSourceMetadataOptional.get();
clearDataSource(metadata);
dataSourceMap.put(metadata,
dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
}
return dataSourceMap.get(dataSourceName);
return dataSourceMap.get(dataSourceMetadataOptional.get());
}

@Override
public void createDataSource(DataSourceMetadata... metadatas) {
for (DataSourceMetadata metadata : metadatas) {
validateDataSourceMetaData(metadata);
dataSourceMap.put(
metadata.getName(),
dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
public void createDataSource(DataSourceMetadata metadata) {
if (!metadata.getName().equals(DEFAULT_DATASOURCE_NAME)) {
this.dataSourceMetadataStorage.createDataSourceMetadata(metadata);
}
dataSourceMap.put(metadata,
dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
}

@Override
Expand All @@ -84,37 +107,8 @@ public void deleteDataSource(String dataSourceName) {
throw new UnsupportedOperationException("will be supported in future");
}

@Override
public void bootstrapDataSources() {
throw new UnsupportedOperationException("will be supported in future");
}

@Override
public void clear() {
dataSourceMap.clear();
}

/**
* This can be moved to a different validator class when we introduce more connectors.
*
* @param metadata {@link DataSourceMetadata}.
*/
private void validateDataSourceMetaData(DataSourceMetadata metadata) {
Preconditions.checkArgument(
!Strings.isNullOrEmpty(metadata.getName()),
"Missing Name Field from a DataSource. Name is a required parameter.");
Preconditions.checkArgument(
!dataSourceMap.containsKey(metadata.getName()),
StringUtils.format(
"Datasource name should be unique, Duplicate datasource found %s.",
metadata.getName()));
Preconditions.checkArgument(
metadata.getName().matches(DATASOURCE_NAME_REGEX),
StringUtils.format(
"DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.",
metadata.getName()));
Preconditions.checkArgument(
!Objects.isNull(metadata.getProperties()),
"Missing properties field in catalog configuration. Properties are required parameters.");
private void clearDataSource(DataSourceMetadata dataSourceMetadata) {
dataSourceMap.entrySet()
.removeIf(entry -> entry.getKey().getName().equals(dataSourceMetadata.getName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
Expand All @@ -36,6 +37,9 @@ public class DataSourceMetadata {
@JsonFormat(with = JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES)
private DataSourceType connector;

@JsonProperty(required = true)
private List<String> allowedRoles;

@JsonProperty(required = true)
private Map<String, String> properties;

Expand All @@ -45,6 +49,6 @@ public class DataSourceMetadata {
*/
public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() {
return new DataSourceMetadata(DEFAULT_DATASOURCE_NAME,
DataSourceType.OPENSEARCH, ImmutableMap.of());
DataSourceType.OPENSEARCH, null, ImmutableMap.of());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.encryptor;

public interface CredentialInfoEncryptor {

String encrypt(String plainText);

String decrypt(String encryptedText);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.opensearch.sql.encryptor;/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/


import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.CommitmentPolicy;
import com.amazonaws.encryptionsdk.CryptoResult;
import com.amazonaws.encryptionsdk.jce.JceMasterKey;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import javax.crypto.spec.SecretKeySpec;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class CredentialInfoEncryptorImpl implements CredentialInfoEncryptor {

private final String masterKey;

@Override
public String encrypt(String plainText) {

final AwsCrypto crypto = AwsCrypto.builder()
.withCommitmentPolicy(CommitmentPolicy.RequireEncryptRequireDecrypt)
.build();

JceMasterKey jceMasterKey
= JceMasterKey.getInstance(new SecretKeySpec(masterKey.getBytes(), "AES"), "Custom", "",
"AES/GCM/NoPadding");

final CryptoResult<byte[], JceMasterKey> encryptResult = crypto.encryptData(jceMasterKey,
plainText.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(encryptResult.getResult());
}

@Override
public String decrypt(String encryptedText) {
final AwsCrypto crypto = AwsCrypto.builder()
.withCommitmentPolicy(CommitmentPolicy.RequireEncryptRequireDecrypt)
.build();

JceMasterKey jceMasterKey
= JceMasterKey.getInstance(new SecretKeySpec(masterKey.getBytes(), "AES"), "Custom", "",
"AES/GCM/NoPadding");

final CryptoResult<byte[], JceMasterKey> decryptedResult
= crypto.decryptData(jceMasterKey, Base64.getDecoder().decode(encryptedText));
return new String(decryptedResult.getResult());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ class DataSourceServiceImplTest {

static final String NAME = "opensearch";

@Mock private DataSourceFactory dataSourceFactory;
@Mock
private DataSourceFactory dataSourceFactory;

@Mock
private StorageEngine storageEngine;

@Mock private StorageEngine storageEngine;

private DataSourceService dataSourceService;

Expand All @@ -56,7 +59,7 @@ public void setup() {
{
add(dataSourceFactory);
}
});
}, null);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void init() {
DataSourceService dataSourceService = new DataSourceServiceImpl(
new ImmutableSet.Builder<DataSourceFactory>()
.add(new OpenSearchDataSourceFactory(client, defaultSettings()))
.build());
.build(), null);
dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata());

ModulesBuilder modules = new ModulesBuilder();
Expand Down
2 changes: 1 addition & 1 deletion legacy/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ dependencies {
}
implementation group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
implementation group: 'org.json', name: 'json', version:'20180813'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.10'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
// add geo module as dependency. https://github.com/opensearch-project/OpenSearch/pull/4180/.
implementation group: 'org.opensearch.plugin', name: 'geo', version: "${opensearch_version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public enum MetricName {
PPL_REQ_TOTAL("ppl_request_total"),
PPL_REQ_COUNT_TOTAL("ppl_request_count"),
PPL_FAILED_REQ_COUNT_SYS("ppl_failed_request_count_syserr"),
PPL_FAILED_REQ_COUNT_CUS("ppl_failed_request_count_cuserr");
PPL_FAILED_REQ_COUNT_CUS("ppl_failed_request_count_cuserr"),
DATASOURCE_REQ_COUNT("datasource_request_count");

private String name;

Expand Down
3 changes: 3 additions & 0 deletions plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ dependencies {
api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
api group: 'commons-io', name: 'commons-io', version: '2.8.0'
implementation group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
implementation group: 'org.opensearch', name: 'common-utils', version: "${opensearch_build}"

api project(":ppl")
api project(':legacy')
Expand Down
Loading

0 comments on commit eb77575

Please sign in to comment.