Skip to content

Commit

Permalink
Move DataSourceServiceImpl to core module (#1084)
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo authored Nov 18, 2022
1 parent fef20f8 commit e280866
Show file tree
Hide file tree
Showing 24 changed files with 686 additions and 448 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,37 @@

import java.util.Set;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.storage.StorageEngine;
import org.opensearch.sql.datasource.model.DataSourceMetadata;

/**
* DataSource Service manages datasources.
* DataSource Service manage {@link DataSource}.
*/
public interface DataSourceService {

/**
* Returns all datasource objects.
* Returns all DataSource objects.
*
* @return DataSource datasources.
* @return set of {@link DataSource}.
*/
Set<DataSource> getDataSources();

/**
* Returns DataSource with corresponding to the datasource name.
* Returns {@link DataSource} with corresponding to the DataSource name.
*
* @param dataSourceName Name of the datasource.
* @return DataSource datasource.
* @param dataSourceName Name of the {@link DataSource}.
* @return {@link DataSource}.
*/
DataSource getDataSource(String dataSourceName);

/**
* Default opensearch engine is not defined in datasources config.
* So the registration of default datasource happens separately.
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
* @param storageEngine StorageEngine.
* @param metadatas list of {@link DataSourceMetadata}.
*/
void registerDefaultOpenSearchDataSource(StorageEngine storageEngine);
void addDataSource(DataSourceMetadata... metadatas);

/**
* remove all the registered {@link DataSource}.
*/
void clear();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.storage.DataSourceFactory;

/**
* Default implementation of {@link DataSourceService}. It is per-jvm single instance.
*
* <p>{@link DataSourceService} is constructed by the list of {@link DataSourceFactory} at service
* bootstrap time. The set of {@link DataSourceFactory} is immutable. Client could add {@link
* DataSource} defined by {@link DataSourceMetadata} at any time. {@link DataSourceService} use
* {@link DataSourceFactory} to create {@link DataSource}.
*/
public class DataSourceServiceImpl implements DataSourceService {

private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*";

private final ConcurrentHashMap<String, DataSource> dataSourceMap;

private final Map<DataSourceType, DataSourceFactory> dataSourceFactoryMap;

/**
* Construct from the set of {@link DataSourceFactory} at bootstrap time.
*/
public DataSourceServiceImpl(Set<DataSourceFactory> dataSourceFactories) {
dataSourceFactoryMap =
dataSourceFactories.stream()
.collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f));
dataSourceMap = new ConcurrentHashMap<>();
}

@Override
public Set<DataSource> getDataSources() {
return Set.copyOf(dataSourceMap.values());
}

@Override
public DataSource getDataSource(String dataSourceName) {
if (!dataSourceMap.containsKey(dataSourceName)) {
throw new IllegalArgumentException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
}
return dataSourceMap.get(dataSourceName);
}

@Override
public void addDataSource(DataSourceMetadata... metadatas) {
for (DataSourceMetadata metadata : metadatas) {
validateDataSourceMetaData(metadata);
dataSourceMap.put(
metadata.getName(),
dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
}
}

@Override
public void clear() {
dataSourceMap.clear();
}

/**
* This can be moved to a different validator class when we introduce more connectors.
*
* @param metadata {@link DataSourceMetadata}.
*/
private void validateDataSourceMetaData(DataSourceMetadata metadata) {
Preconditions.checkArgument(
!Strings.isNullOrEmpty(metadata.getName()),
"Missing Name Field from a DataSource. Name is a required parameter.");
Preconditions.checkArgument(
!dataSourceMap.containsKey(metadata.getName()),
StringUtils.format(
"Datasource name should be unique, Duplicate datasource found %s.",
metadata.getName()));
Preconditions.checkArgument(
metadata.getName().matches(DATASOURCE_NAME_REGEX),
StringUtils.format(
"DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.",
metadata.getName()));
Preconditions.checkArgument(
!Objects.isNull(metadata.getProperties()),
"Missing properties field in catalog configuration. Properties are required parameters.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.storage.StorageEngine;

/**
* Each user configured datasource mapping to one instance of DataSource per JVM.
*/
@Getter
@RequiredArgsConstructor
@EqualsAndHashCode
public class DataSource {

private final String name;

private final ConnectorType connectorType;
private final DataSourceType connectorType;

@EqualsAndHashCode.Exclude
private final StorageEngine storageEngine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,44 @@

package org.opensearch.sql.datasource.model;


import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.opensearch.sql.datasource.DataSourceService;

@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@EqualsAndHashCode
public class DataSourceMetadata {

@JsonProperty(required = true)
private String name;

@JsonProperty(required = true)
@JsonFormat(with = JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES)
private ConnectorType connector;
private DataSourceType connector;

@JsonProperty(required = true)
private Map<String, String> properties;

/**
* Default OpenSearch {@link DataSourceMetadata}. Which is used to register default OpenSearch
* {@link DataSource} to {@link DataSourceService}.
*/
public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() {
DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
dataSourceMetadata.setName(DEFAULT_DATASOURCE_NAME);
dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH);
dataSourceMetadata.setProperties(ImmutableMap.of());
return dataSourceMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@

package org.opensearch.sql.datasource.model;

public enum ConnectorType {
public enum DataSourceType {
PROMETHEUS,OPENSEARCH
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.storage;

import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;

/**
* {@link DataSourceFactory} is used to create {@link DataSource} from {@link DataSourceMetadata}.
* Each data source define {@link DataSourceFactory} and register to {@link DataSourceService}.
* {@link DataSourceFactory} is one instance per JVM . Each {@link DataSourceType} mapping to one
* {@link DataSourceFactory}.
*/
public interface DataSourceFactory {
/**
* Get {@link DataSourceType}.
*/
DataSourceType getDataSourceType();

/**
* Create {@link DataSource}.
*/
DataSource createDataSource(DataSourceMetadata metadata);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.opensearch.sql.config.TestConfig;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.ConnectorType;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.expression.DSL;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.env.Environment;
Expand Down Expand Up @@ -144,9 +144,7 @@ protected Environment<Expression, ExprType> typeEnv() {
@Bean
protected Analyzer analyzer(ExpressionAnalyzer expressionAnalyzer,
DataSourceService dataSourceService,
StorageEngine storageEngine,
Table table) {
dataSourceService.registerDefaultOpenSearchDataSource(storageEngine);
BuiltinFunctionRepository functionRepository = BuiltinFunctionRepository.getInstance();
functionRepository.register("prometheus", new FunctionResolver() {

Expand Down Expand Up @@ -195,7 +193,7 @@ private class DefaultDataSourceService implements DataSourceService {

private StorageEngine storageEngine = storageEngine();
private final DataSource dataSource
= new DataSource("prometheus", ConnectorType.PROMETHEUS, storageEngine);
= new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine);


@Override
Expand All @@ -209,8 +207,13 @@ public DataSource getDataSource(String dataSourceName) {
}

@Override
public void registerDefaultOpenSearchDataSource(StorageEngine storageEngine) {
this.storageEngine = storageEngine;
public void addDataSource(DataSourceMetadata... metadatas) {
throw new UnsupportedOperationException();
}

@Override
public void clear() {
throw new UnsupportedOperationException();
}
}

Expand Down
Loading

0 comments on commit e280866

Please sign in to comment.