Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JDBC datasource #1361

Closed
wants to merge 16 commits into from
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ dependencies {
test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
events "skipped", "failed"
exceptionFormat "full"
}
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext contex
.collect(Collectors.toList());
TableFunctionImplementation tableFunctionImplementation
= (TableFunctionImplementation) repository.compile(context.getFunctionProperties(),
dataSourceSchemaIdentifierNameResolver.getDataSourceName(), functionName, arguments);
dataSourceService
.getDataSource(dataSourceSchemaIdentifierNameResolver.getDataSourceName())
.getStorageEngine().getFunctions(), functionName, arguments);
context.push();
TypeEnvironment curEnv = context.peek();
Table table = tableFunctionImplementation.applyArguments();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
public enum DataSourceType {
PROMETHEUS,
OPENSEARCH,
FILESYSTEM
JDBC
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -32,6 +35,7 @@
import org.opensearch.sql.expression.system.SystemFunctions;
import org.opensearch.sql.expression.text.TextFunction;
import org.opensearch.sql.expression.window.WindowFunctions;
import org.opensearch.sql.storage.StorageEngine;

/**
* Builtin Function Repository.
Expand All @@ -40,22 +44,20 @@
*
*/
public class BuiltinFunctionRepository {
public static final String DEFAULT_NAMESPACE = "default";

private final Map<String, Map<FunctionName, FunctionResolver>> namespaceFunctionResolverMap;
private final Map<FunctionName, FunctionResolver> functionResolverMap;

/** The singleton instance. */
private static BuiltinFunctionRepository instance;

/**
* Construct a function repository with the given function registered. This is only used in test.
*
* @param namespaceFunctionResolverMap function supported
* @param functionResolverMap function supported
*/
@VisibleForTesting
BuiltinFunctionRepository(
Map<String, Map<FunctionName, FunctionResolver>> namespaceFunctionResolverMap) {
this.namespaceFunctionResolverMap = namespaceFunctionResolverMap;
BuiltinFunctionRepository(Map<FunctionName, FunctionResolver> functionResolverMap) {
this.functionResolverMap = functionResolverMap;
}

/**
Expand Down Expand Up @@ -86,106 +88,86 @@ public static synchronized BuiltinFunctionRepository getInstance() {
}

/**
* Register {@link DefaultFunctionResolver} to the Builtin Function Repository
* under default namespace.
* Register {@link DefaultFunctionResolver} to the Builtin Function Repository.
*
* @param resolver {@link DefaultFunctionResolver} to be registered
*/
public void register(FunctionResolver resolver) {
register(DEFAULT_NAMESPACE, resolver);
functionResolverMap.put(resolver.getFunctionName(), resolver);
}

/**
* Register {@link DefaultFunctionResolver} to the Builtin Function Repository with
* specified namespace.
*
* @param resolver {@link DefaultFunctionResolver} to be registered
*/
public void register(String namespace, FunctionResolver resolver) {
Map<FunctionName, FunctionResolver> functionResolverMap;
if (!namespaceFunctionResolverMap.containsKey(namespace)) {
functionResolverMap = new HashMap<>();
namespaceFunctionResolverMap.put(namespace, functionResolverMap);
}
namespaceFunctionResolverMap.get(namespace).put(resolver.getFunctionName(), resolver);
}

/**
* Compile FunctionExpression under default namespace.
* Compile FunctionExpression using core function resolver.
*
*/
public FunctionImplementation compile(FunctionProperties functionProperties,
FunctionName functionName, List<Expression> expressions) {
return compile(functionProperties, DEFAULT_NAMESPACE, functionName, expressions);
return compile(functionProperties, Collections.emptyList(), functionName, expressions);
}


/**
* Compile FunctionExpression within given namespace.
* Checks for default namespace first and then tries to compile from given namespace.
* Compile FunctionExpression within {@link StorageEngine} provided {@link FunctionResolver}.
*/
public FunctionImplementation compile(FunctionProperties functionProperties,
String namespace,
Collection<FunctionResolver> dataSourceFunctionResolver,
FunctionName functionName,
List<Expression> expressions) {
List<String> namespaceList = new ArrayList<>(List.of(DEFAULT_NAMESPACE));
if (!namespace.equals(DEFAULT_NAMESPACE)) {
namespaceList.add(namespace);
}
FunctionBuilder resolvedFunctionBuilder = resolve(
namespaceList, new FunctionSignature(functionName, expressions
.stream().map(Expression::type).collect(Collectors.toList())));
FunctionBuilder resolvedFunctionBuilder =
resolve(
dataSourceFunctionResolver,
new FunctionSignature(
functionName,
expressions.stream().map(Expression::type).collect(Collectors.toList())));
return resolvedFunctionBuilder.apply(functionProperties, expressions);
}

/**
* Resolve the {@link FunctionBuilder} in
* repository under a list of namespaces.
* Returns the First FunctionBuilder found.
* So list of namespaces is also the priority of namespaces.
* Resolve the {@link FunctionBuilder} in repository under a list of namespaces. Returns the First
* FunctionBuilder found. So list of namespaces is also the priority of namespaces.
*
* @param functionSignature {@link FunctionSignature} functionsignature.
* @return Original function builder if it's a cast function or all arguments have expected types
* or otherwise wrap its arguments by cast function as needed.
* or otherwise wrap its arguments by cast function as needed.
*/
public FunctionBuilder
resolve(List<String> namespaces,
FunctionSignature functionSignature) {
FunctionName functionName = functionSignature.getFunctionName();
FunctionBuilder result = null;
for (String namespace : namespaces) {
if (namespaceFunctionResolverMap.containsKey(namespace)
&& namespaceFunctionResolverMap.get(namespace).containsKey(functionName)) {
result = getFunctionBuilder(functionSignature, functionName,
namespaceFunctionResolverMap.get(namespace));
break;
}
}
if (result == null) {
throw new ExpressionEvaluationException(
String.format("unsupported function name: %s", functionName.getFunctionName()));
} else {
return result;
}
@VisibleForTesting
public FunctionBuilder resolve(
Collection<FunctionResolver> dataSourceFunctionResolver,
FunctionSignature functionSignature) {
Map<FunctionName, FunctionResolver> dataSourceFunctionMap = dataSourceFunctionResolver.stream()
.collect(Collectors.toMap(FunctionResolver::getFunctionName, t -> t));

// first, resolve in datasource provide function resolver.
// second, resolve in builtin function resolver.
return resolve(functionSignature, dataSourceFunctionMap)
.or(() -> resolve(functionSignature, functionResolverMap))
.orElseThrow(
() ->
new ExpressionEvaluationException(
String.format(
"unsupported function name: %s", functionSignature.getFunctionName())));
}

private FunctionBuilder getFunctionBuilder(
private Optional<FunctionBuilder> resolve(
FunctionSignature functionSignature,
FunctionName functionName,
Map<FunctionName, FunctionResolver> functionResolverMap) {
Pair<FunctionSignature, FunctionBuilder> resolvedSignature =
functionResolverMap.get(functionName).resolve(functionSignature);

List<ExprType> sourceTypes = functionSignature.getParamTypeList();
List<ExprType> targetTypes = resolvedSignature.getKey().getParamTypeList();
FunctionBuilder funcBuilder = resolvedSignature.getValue();
if (isCastFunction(functionName)
|| FunctionSignature.isVarArgFunction(targetTypes)
|| sourceTypes.equals(targetTypes)) {
return funcBuilder;
FunctionName functionName = functionSignature.getFunctionName();
if (functionResolverMap.containsKey(functionName)) {
Pair<FunctionSignature, FunctionBuilder> resolvedSignature =
functionResolverMap.get(functionName).resolve(functionSignature);

List<ExprType> sourceTypes = functionSignature.getParamTypeList();
List<ExprType> targetTypes = resolvedSignature.getKey().getParamTypeList();
FunctionBuilder funcBuilder = resolvedSignature.getValue();
if (isCastFunction(functionName)
|| FunctionSignature.isVarArgFunction(targetTypes)
|| sourceTypes.equals(targetTypes)) {
return Optional.of(funcBuilder);
}
return Optional.of(castArguments(sourceTypes, targetTypes, funcBuilder));
} else {
return Optional.empty();
}
return castArguments(sourceTypes,
targetTypes, funcBuilder);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@
package org.opensearch.sql.analysis;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.data.type.ExprCoreType.LONG;
import static org.opensearch.sql.data.type.ExprCoreType.STRING;

import com.google.common.collect.ImmutableSet;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
import org.opensearch.sql.analysis.symbol.Symbol;
import org.opensearch.sql.analysis.symbol.SymbolTable;
Expand Down Expand Up @@ -52,6 +56,39 @@ protected StorageEngine storageEngine() {
return (dataSourceSchemaName, tableName) -> table;
}

protected StorageEngine prometheusStorageEngine() {
return new StorageEngine() {
@Override
public Collection<FunctionResolver> getFunctions() {
return Collections.singletonList(
new FunctionResolver() {

@Override
public Pair<FunctionSignature, FunctionBuilder> resolve(
FunctionSignature unresolvedSignature) {
FunctionName functionName = FunctionName.of("query_range");
FunctionSignature functionSignature =
new FunctionSignature(functionName, List.of(STRING, LONG, LONG, LONG));
return Pair.of(
functionSignature,
(functionProperties, args) ->
new TestTableFunctionImplementation(functionName, args, table));
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of("query_range");
}
});
}

@Override
public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) {
return table;
}
};
}

protected Table table() {
return Optional.ofNullable(table).orElseGet(() -> new Table() {
@Override
Expand Down Expand Up @@ -109,30 +146,11 @@ protected Environment<Expression, ExprType> typeEnv() {

protected DataSourceService dataSourceService = dataSourceService();

protected Analyzer analyzer = analyzer(expressionAnalyzer(), dataSourceService, table);
protected Analyzer analyzer = analyzer(expressionAnalyzer(), dataSourceService);

protected Analyzer analyzer(ExpressionAnalyzer expressionAnalyzer,
DataSourceService dataSourceService,
Table table) {
DataSourceService dataSourceService) {
BuiltinFunctionRepository functionRepository = BuiltinFunctionRepository.getInstance();
functionRepository.register("prometheus", new FunctionResolver() {

@Override
public Pair<FunctionSignature, FunctionBuilder> resolve(
FunctionSignature unresolvedSignature) {
FunctionName functionName = FunctionName.of("query_range");
FunctionSignature functionSignature =
new FunctionSignature(functionName, List.of(STRING, LONG, LONG, LONG));
return Pair.of(functionSignature,
(functionProperties, args) -> new TestTableFunctionImplementation(functionName, args,
table));
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of("query_range");
}
});
return new Analyzer(expressionAnalyzer, dataSourceService, functionRepository);
}

Expand All @@ -158,19 +176,24 @@ protected LogicalPlan analyze(UnresolvedPlan unresolvedPlan) {

private class DefaultDataSourceService implements DataSourceService {

private StorageEngine storageEngine = storageEngine();
private final DataSource dataSource
= new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine);
private final DataSource opensearchDataSource = new DataSource(DEFAULT_DATASOURCE_NAME,
DataSourceType.OPENSEARCH, storageEngine());
private final DataSource prometheusDataSource
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
= new DataSource("prometheus", DataSourceType.PROMETHEUS, prometheusStorageEngine());


@Override
public Set<DataSource> getDataSources() {
return ImmutableSet.of(dataSource);
return ImmutableSet.of(opensearchDataSource, prometheusDataSource);
}

@Override
public DataSource getDataSource(String dataSourceName) {
return dataSource;
if ("prometheus".equals(dataSourceName)) {
return prometheusDataSource;
} else {
return opensearchDataSource;
}
}

@Override
Expand Down
Loading