Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spark connector #1780

Merged
merged 19 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
public enum DataSourceType {
PROMETHEUS("prometheus"),
OPENSEARCH("opensearch"),
JDBC("jdbc");

SPARK("spark");
private String text;

DataSourceType(String text) {
Expand Down
1 change: 1 addition & 0 deletions plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ dependencies {
api project(':opensearch')
api project(':prometheus')
api project(':datasources')
api project(':spark')

testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.12.13'
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.storage.DataSourceFactory;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
Expand Down Expand Up @@ -221,6 +222,7 @@ private DataSourceServiceImpl createDataSourceService() {
.add(new OpenSearchDataSourceFactory(
new OpenSearchNodeClient(this.client), pluginSettings))
.add(new PrometheusStorageFactory(pluginSettings))
.add(new SparkStorageFactory(this.client, pluginSettings))
.build(),
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
Expand Down
4 changes: 3 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ include 'legacy'
include 'sql'
include 'prometheus'
include 'benchmarks'
include 'datasources'
include 'datasources'
include 'spark'

73 changes: 73 additions & 0 deletions spark/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
}

repositories {
mavenCentral()
}

dependencies {
api project(':core')
implementation project(':datasources')

implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
penghuo marked this conversation as resolved.
Show resolved Hide resolved
implementation group: 'org.json', name: 'json', version: '20230227'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
}

test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
}
}

jacocoTestReport {
reports {
html.enabled true
xml.enabled true
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
test.finalizedBy(project.tasks.jacocoTestReport)

jacocoTestCoverageVerification {
violationRules {
rule {
element = 'CLASS'
excludes = [
'org.opensearch.sql.spark.data.constants.*'
]
limit {
counter = 'LINE'
minimum = 1.0
}
limit {
counter = 'BRANCH'
minimum = 1.0
}
}
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
check.dependsOn jacocoTestCoverageVerification
jacocoTestCoverageVerification.dependsOn jacocoTestReport
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.client;

import java.io.IOException;
import org.json.JSONObject;

/**
* Interface class for Spark Client.
*/
public interface SparkClient {
/**
* This method executes spark sql query.
*
* @param query spark sql query
* @return spark query response
*/
JSONObject sql(String query) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.functions.implementation;

import static org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver.QUERY;

import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.NamedArgumentExpression;
import org.opensearch.sql.expression.env.Environment;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.TableFunctionImplementation;
import org.opensearch.sql.spark.client.SparkClient;
import org.opensearch.sql.spark.request.SparkQueryRequest;
import org.opensearch.sql.spark.storage.SparkTable;
import org.opensearch.sql.storage.Table;

/**
* Spark SQL function implementation.
*/
public class SparkSqlFunctionImplementation extends FunctionExpression
implements TableFunctionImplementation {

private final FunctionName functionName;
private final List<Expression> arguments;
private final SparkClient sparkClient;

/**
* Constructor for spark sql function.
*
* @param functionName name of the function
* @param arguments a list of expressions
* @param sparkClient spark client
*/
public SparkSqlFunctionImplementation(
FunctionName functionName, List<Expression> arguments, SparkClient sparkClient) {
super(functionName, arguments);
this.functionName = functionName;
this.arguments = arguments;
this.sparkClient = sparkClient;
}

@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
throw new UnsupportedOperationException(String.format(
"Spark defined function [%s] is only "
+ "supported in SOURCE clause with spark connector catalog", functionName));
}

@Override
public ExprType type() {
return ExprCoreType.STRUCT;
}

@Override
public String toString() {
List<String> args = arguments.stream()
.map(arg -> String.format("%s=%s",
((NamedArgumentExpression) arg).getArgName(),
((NamedArgumentExpression) arg).getValue().toString()))
.collect(Collectors.toList());
return String.format("%s(%s)", functionName, String.join(", ", args));
}

@Override
public Table applyArguments() {
return new SparkTable(sparkClient, buildQueryFromSqlFunction(arguments));
}

/**
* This method builds a spark query request.
*
* @param arguments spark sql function arguments
* @return spark query request
*/
private SparkQueryRequest buildQueryFromSqlFunction(List<Expression> arguments) {

SparkQueryRequest sparkQueryRequest = new SparkQueryRequest();
arguments.forEach(arg -> {
String argName = ((NamedArgumentExpression) arg).getArgName();
Expression argValue = ((NamedArgumentExpression) arg).getValue();
ExprValue literalValue = argValue.valueOf();
if (argName.equals(QUERY)) {
sparkQueryRequest.setSql((String) literalValue.value());
} else {
throw new ExpressionEvaluationException(
String.format("Invalid Function Argument:%s", argName));
}
});
return sparkQueryRequest;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.functions.resolver;

import static org.opensearch.sql.data.type.ExprCoreType.STRING;

import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.NamedArgumentExpression;
import org.opensearch.sql.expression.function.FunctionBuilder;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.FunctionResolver;
import org.opensearch.sql.expression.function.FunctionSignature;
import org.opensearch.sql.spark.client.SparkClient;
import org.opensearch.sql.spark.functions.implementation.SparkSqlFunctionImplementation;

/**
* Function resolver for sql function of spark connector.
*/
@RequiredArgsConstructor
public class SparkSqlTableFunctionResolver implements FunctionResolver {
private final SparkClient sparkClient;

public static final String SQL = "sql";
public static final String QUERY = "query";

@Override
public Pair<FunctionSignature, FunctionBuilder> resolve(FunctionSignature unresolvedSignature) {
FunctionName functionName = FunctionName.of(SQL);
FunctionSignature functionSignature =
new FunctionSignature(functionName, List.of(STRING));
final List<String> argumentNames = List.of(QUERY);

FunctionBuilder functionBuilder = (functionProperties, arguments) -> {
Boolean argumentsPassedByName = arguments.stream()
.noneMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName()));
Boolean argumentsPassedByPosition = arguments.stream()
.allMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName()));
if (!(argumentsPassedByName || argumentsPassedByPosition)) {
throw new SemanticCheckException("Arguments should be either passed by name or position");
}

if (arguments.size() != argumentNames.size()) {
throw new SemanticCheckException(
String.format("Missing arguments:[%s]",
String.join(",", argumentNames.subList(arguments.size(), argumentNames.size()))));
}

if (argumentsPassedByPosition) {
List<Expression> namedArguments = new ArrayList<>();
for (int i = 0; i < arguments.size(); i++) {
namedArguments.add(new NamedArgumentExpression(argumentNames.get(i),
((NamedArgumentExpression) arguments.get(i)).getValue()));
}
return new SparkSqlFunctionImplementation(functionName, namedArguments, sparkClient);
}
return new SparkSqlFunctionImplementation(functionName, arguments, sparkClient);
};
return Pair.of(functionSignature, functionBuilder);
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of(SQL);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.functions.scan;

import lombok.AllArgsConstructor;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.spark.client.SparkClient;
import org.opensearch.sql.spark.request.SparkQueryRequest;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* TableScanBuilder for sql function of spark connector.
*/
@AllArgsConstructor
public class SparkSqlFunctionTableScanBuilder extends TableScanBuilder {

private final SparkClient sparkClient;

private final SparkQueryRequest sparkQueryRequest;

@Override
public TableScanOperator build() {
//TODO: return SqlFunctionTableScanOperator
return null;
}

@Override
public boolean pushDownProject(LogicalProject project) {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.request;

import lombok.Data;

/**
* Spark query request.
*/
@Data
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
public class SparkQueryRequest {

/**
* SQL.
*/
private String sql;

}
Loading