From 9aabfa0eb191d59b179951d5892d78dc046d5b33 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 4 Oct 2023 01:09:22 +0000 Subject: [PATCH] Add conf for spark structured streaming job (#2193) * Add conf for spark structured streaming job Signed-off-by: Peng Huo * update Signed-off-by: Peng Huo * fix format Signed-off-by: Peng Huo * fix format Signed-off-by: Peng Huo * remove unused code Signed-off-by: Peng Huo * fix format Signed-off-by: Peng Huo --------- Signed-off-by: Peng Huo (cherry picked from commit 89b011be5adb399a41b3123d64cc430e7813eca1) Signed-off-by: github-actions[bot] --- .../model/S3GlueSparkSubmitParameters.java | 94 --------- .../model/SparkSubmitParameters.java | 184 ++++++++++++++++++ .../client/EmrServerlessClientImplEMR.java | 1 + .../sql/spark/client/StartJobRequest.java | 10 + .../dispatcher/SparkQueryDispatcher.java | 105 ++-------- .../client/EmrServerlessClientImplTest.java | 3 +- .../sql/spark/client/StartJobRequestTest.java | 29 +++ .../dispatcher/SparkQueryDispatcherTest.java | 106 ++++++---- 8 files changed, 308 insertions(+), 224 deletions(-) delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java deleted file mode 100644 index 13e4947eae..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * - */ - -package org.opensearch.sql.spark.asyncquery.model; - -import static org.opensearch.sql.spark.data.constants.SparkConstants.AWS_SNAPSHOT_REPOSITORY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE; -import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CATALOG_JAR; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CREDENTIALS_PROVIDER_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_AUTH; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_HOST; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_PORT; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_SCHEME; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_CATALOG_HIVE_JAR; -import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS; -import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.JAVA_HOME_LOCATION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_STANDALONE_PACKAGE; - -import java.util.LinkedHashMap; -import java.util.Map; -import lombok.Getter; -import lombok.Setter; - -@Getter -@Setter -public class S3GlueSparkSubmitParameters { - - private String className; - private Map config; - public static final String SPACE = " "; - public static final String EQUALS = "="; - - public S3GlueSparkSubmitParameters() { - this.className = DEFAULT_CLASS_NAME; - this.config = new LinkedHashMap<>(); - this.config.put(S3_AWS_CREDENTIALS_PROVIDER_KEY, DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE); - this.config.put( - HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY, - DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); - this.config.put(SPARK_JARS_KEY, GLUE_CATALOG_HIVE_JAR + "," + FLINT_CATALOG_JAR); - this.config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE); - this.config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY); - this.config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); - this.config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); - this.config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST); - this.config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT); - this.config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); - this.config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH); - this.config.put(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER); - this.config.put(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION); - this.config.put(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS); - } - - public void addParameter(String key, String value) { - this.config.put(key, value); - } - - @Override - public String toString() { - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append(" --class "); - stringBuilder.append(this.className); - stringBuilder.append(SPACE); - for (String key : config.keySet()) { - stringBuilder.append(" --conf "); - stringBuilder.append(key); - stringBuilder.append(EQUALS); - stringBuilder.append(config.get(key)); - stringBuilder.append(SPACE); - } - return stringBuilder.toString(); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java new file mode 100644 index 0000000000..627d6cfcc5 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -0,0 +1,184 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery.model; + +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN; +import static org.opensearch.sql.spark.data.constants.SparkConstants.AWS_SNAPSHOT_REPOSITORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER; +import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CATALOG_JAR; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CREDENTIALS_PROVIDER_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_AUTH; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_HOST; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_PORT; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_SCHEME; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DELEGATE_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_PASSWORD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_USERNAME; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_CATALOG_HIVE_JAR; +import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS; +import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.JAVA_HOME_LOCATION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_STANDALONE_PACKAGE; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.Supplier; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.datasources.auth.AuthenticationType; + +/** Define Spark Submit Parameters. */ +@RequiredArgsConstructor +public class SparkSubmitParameters { + public static final String SPACE = " "; + public static final String EQUALS = "="; + + private final String className; + private final Map config; + + public static class Builder { + + private final String className; + private final Map config; + + private Builder() { + className = DEFAULT_CLASS_NAME; + config = new LinkedHashMap<>(); + + config.put(S3_AWS_CREDENTIALS_PROVIDER_KEY, DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE); + config.put( + HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY, + DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); + config.put(SPARK_JARS_KEY, GLUE_CATALOG_HIVE_JAR + "," + FLINT_CATALOG_JAR); + config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE); + config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY); + config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); + config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); + config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST); + config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT); + config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); + config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH); + config.put(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER); + config.put(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION); + config.put(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS); + } + + public static Builder builder() { + return new Builder(); + } + + public Builder dataSource(DataSourceMetadata metadata) { + if (DataSourceType.S3GLUE.equals(metadata.getConnector())) { + String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN); + + config.put(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn); + config.put(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn); + config.put(HIVE_METASTORE_GLUE_ARN_KEY, roleArn); + config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG); + + setFlintIndexStoreHost( + parseUri( + metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_URI), metadata.getName())); + setFlintIndexStoreAuthProperties( + metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH), + () -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME), + () -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD), + () -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION)); + return this; + } + throw new UnsupportedOperationException( + String.format( + "UnSupported datasource type for async queries:: %s", metadata.getConnector())); + } + + private void setFlintIndexStoreHost(URI uri) { + config.put(FLINT_INDEX_STORE_HOST_KEY, uri.getHost()); + config.put(FLINT_INDEX_STORE_PORT_KEY, String.valueOf(uri.getPort())); + config.put(FLINT_INDEX_STORE_SCHEME_KEY, uri.getScheme()); + } + + private void setFlintIndexStoreAuthProperties( + String authType, + Supplier userName, + Supplier password, + Supplier region) { + if (AuthenticationType.get(authType).equals(AuthenticationType.BASICAUTH)) { + config.put(FLINT_INDEX_STORE_AUTH_KEY, authType); + config.put(FLINT_INDEX_STORE_AUTH_USERNAME, userName.get()); + config.put(FLINT_INDEX_STORE_AUTH_PASSWORD, password.get()); + } else if (AuthenticationType.get(authType).equals(AuthenticationType.AWSSIGV4AUTH)) { + config.put(FLINT_INDEX_STORE_AUTH_KEY, "sigv4"); + config.put(FLINT_INDEX_STORE_AWSREGION_KEY, region.get()); + } else { + config.put(FLINT_INDEX_STORE_AUTH_KEY, authType); + } + } + + private URI parseUri(String opensearchUri, String datasourceName) { + try { + return new URI(opensearchUri); + } catch (URISyntaxException e) { + throw new IllegalArgumentException( + String.format( + "Bad URI in indexstore configuration of the : %s datasoure.", datasourceName)); + } + } + + public Builder structuredStreaming() { + config.put("spark.flint.job.type", "streaming"); + + return this; + } + + public SparkSubmitParameters build() { + return new SparkSubmitParameters(className, config); + } + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(" --class "); + stringBuilder.append(this.className); + stringBuilder.append(SPACE); + for (String key : config.keySet()) { + stringBuilder.append(" --conf "); + stringBuilder.append(key); + stringBuilder.append(EQUALS); + stringBuilder.append(config.get(key)); + stringBuilder.append(SPACE); + } + return stringBuilder.toString(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java index 83e570ece2..1a8e3203b8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java @@ -40,6 +40,7 @@ public String startJobRun(StartJobRequest startJobRequest) { .withApplicationId(startJobRequest.getApplicationId()) .withExecutionRoleArn(startJobRequest.getExecutionRoleArn()) .withTags(startJobRequest.getTags()) + .withExecutionTimeoutMinutes(startJobRequest.executionTimeout()) .withJobDriver( new JobDriver() .withSparkSubmit( diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java index 94689c7030..df8f9f61b1 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java @@ -14,10 +14,20 @@ */ @Data public class StartJobRequest { + + public static final Long DEFAULT_JOB_TIMEOUT = 120L; + private final String query; private final String jobName; private final String applicationId; private final String executionRoleArn; private final String sparkSubmitParams; private final Map tags; + + /** true if it is Spark Structured Streaming job. */ + private final boolean isStructuredStreaming; + + public Long executionTimeout() { + return isStructuredStreaming ? 0L : DEFAULT_JOB_TIMEOUT; + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index ab9112460e..1fdc391c85 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -5,37 +5,16 @@ package org.opensearch.sql.spark.dispatcher; -import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH; -import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD; -import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME; -import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DELEGATE_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_PASSWORD; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_USERNAME; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY; - import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRunState; -import java.net.URI; -import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import lombok.AllArgsConstructor; import org.json.JSONObject; import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.datasources.auth.AuthenticationType; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; -import org.opensearch.sql.spark.asyncquery.model.S3GlueSparkSubmitParameters; +import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; @@ -98,67 +77,6 @@ private StartJobRequest getStartJobRequest(DispatchQueryRequest dispatchQueryReq } } - private String getDataSourceRoleARN(DataSourceMetadata dataSourceMetadata) { - if (DataSourceType.S3GLUE.equals(dataSourceMetadata.getConnector())) { - return dataSourceMetadata.getProperties().get("glue.auth.role_arn"); - } - throw new UnsupportedOperationException( - String.format( - "UnSupported datasource type for async queries:: %s", - dataSourceMetadata.getConnector())); - } - - private String constructSparkParameters(String datasourceName) { - DataSourceMetadata dataSourceMetadata = - dataSourceService.getRawDataSourceMetadata(datasourceName); - S3GlueSparkSubmitParameters s3GlueSparkSubmitParameters = new S3GlueSparkSubmitParameters(); - s3GlueSparkSubmitParameters.addParameter( - DRIVER_ENV_ASSUME_ROLE_ARN_KEY, getDataSourceRoleARN(dataSourceMetadata)); - s3GlueSparkSubmitParameters.addParameter( - EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, getDataSourceRoleARN(dataSourceMetadata)); - s3GlueSparkSubmitParameters.addParameter( - HIVE_METASTORE_GLUE_ARN_KEY, getDataSourceRoleARN(dataSourceMetadata)); - String opensearchuri = dataSourceMetadata.getProperties().get("glue.indexstore.opensearch.uri"); - URI uri; - try { - uri = new URI(opensearchuri); - } catch (URISyntaxException e) { - throw new IllegalArgumentException( - String.format( - "Bad URI in indexstore configuration of the : %s datasoure.", datasourceName)); - } - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_HOST_KEY, uri.getHost()); - s3GlueSparkSubmitParameters.addParameter( - FLINT_INDEX_STORE_PORT_KEY, String.valueOf(uri.getPort())); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_SCHEME_KEY, uri.getScheme()); - s3GlueSparkSubmitParameters.addParameter( - "spark.sql.catalog." + datasourceName, FLINT_DELEGATE_CATALOG); - String auth = dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH); - setFlintIndexStoreAuthProperties(dataSourceMetadata, s3GlueSparkSubmitParameters, auth); - return s3GlueSparkSubmitParameters.toString(); - } - - private static void setFlintIndexStoreAuthProperties( - DataSourceMetadata dataSourceMetadata, - S3GlueSparkSubmitParameters s3GlueSparkSubmitParameters, - String authType) { - if (AuthenticationType.get(authType).equals(AuthenticationType.BASICAUTH)) { - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_KEY, authType); - String username = - dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME); - String password = - dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_USERNAME, username); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_PASSWORD, password); - } else if (AuthenticationType.get(authType).equals(AuthenticationType.AWSSIGV4AUTH)) { - String region = dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_KEY, "sigv4"); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AWSREGION_KEY, region); - } else { - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_KEY, authType); - } - } - private StartJobRequest getStartJobRequestForNonIndexQueries( DispatchQueryRequest dispatchQueryRequest) { StartJobRequest startJobRequest; @@ -172,8 +90,14 @@ private StartJobRequest getStartJobRequestForNonIndexQueries( jobName, dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), - constructSparkParameters(dispatchQueryRequest.getDatasource()), - tags); + SparkSubmitParameters.Builder.builder() + .dataSource( + dataSourceService.getRawDataSourceMetadata( + dispatchQueryRequest.getDatasource())) + .build() + .toString(), + tags, + false); return startJobRequest; } @@ -195,8 +119,15 @@ private StartJobRequest getStartJobRequestForIndexRequest( jobName, dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), - constructSparkParameters(dispatchQueryRequest.getDatasource()), - tags); + SparkSubmitParameters.Builder.builder() + .dataSource( + dataSourceService.getRawDataSourceMetadata( + dispatchQueryRequest.getDatasource())) + .structuredStreaming() + .build() + .toString(), + tags, + true); return startJobRequest; } diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java index 0765b90534..17d4fe55c0 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java @@ -44,7 +44,8 @@ void testStartJobRun() { EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, SPARK_SUBMIT_PARAMETERS, - new HashMap<>())); + new HashMap<>(), + false)); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java new file mode 100644 index 0000000000..783ce8466e --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.client; + +import static org.junit.jupiter.api.Assertions.*; +import static org.opensearch.sql.spark.client.StartJobRequest.DEFAULT_JOB_TIMEOUT; + +import java.util.Map; +import org.junit.jupiter.api.Test; + +class StartJobRequestTest { + + @Test + void executionTimeout() { + assertEquals(DEFAULT_JOB_TIMEOUT, onDemandJob().executionTimeout()); + assertEquals(0L, streamingJob().executionTimeout()); + } + + private StartJobRequest onDemandJob() { + return new StartJobRequest("", "", "", "", "", Map.of(), false); + } + + private StartJobRequest streamingJob() { + return new StartJobRequest("", "", "", "", "", Map.of(), true); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 165c87c7aa..2a8c21d342 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -75,7 +75,8 @@ void testDispatchSelectQuery() { put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); } }), - tags))) + tags, + false))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -103,7 +104,8 @@ void testDispatchSelectQuery() { put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); } }), - tags)); + tags, + false)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -133,7 +135,8 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { put(FLINT_INDEX_STORE_AUTH_PASSWORD, "password"); } }), - tags))) + tags, + false))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithBasicAuth(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -162,7 +165,8 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { put(FLINT_INDEX_STORE_AUTH_PASSWORD, "password"); } }), - tags)); + tags, + false)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -190,7 +194,8 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { { } }), - tags))) + tags, + false))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithNoAuth(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -217,7 +222,8 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { { } }), - tags)); + tags, + false)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -244,14 +250,16 @@ void testDispatchIndexQuery() { "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - }), - tags))) + withStructuredStreaming( + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + })), + tags, + true))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -272,14 +280,16 @@ void testDispatchIndexQuery() { "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - }), - tags)); + withStructuredStreaming( + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + })), + tags, + true)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -308,7 +318,8 @@ void testDispatchWithPPLQuery() { put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); } }), - tags))) + tags, + false))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -336,7 +347,8 @@ void testDispatchWithPPLQuery() { put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); } }), - tags)); + tags, + false)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -365,7 +377,8 @@ void testDispatchQueryWithoutATableAndDataSourceName() { put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); } }), - tags))) + tags, + false))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -393,7 +406,8 @@ void testDispatchQueryWithoutATableAndDataSourceName() { put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); } }), - tags)); + tags, + false)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -420,14 +434,16 @@ void testDispatchIndexQueryWithoutADatasourceName() { "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - }), - tags))) + withStructuredStreaming( + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + })), + tags, + true))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -448,14 +464,16 @@ void testDispatchIndexQueryWithoutADatasourceName() { "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - }), - tags)); + withStructuredStreaming( + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + })), + tags, + true)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -611,6 +629,10 @@ private String constructExpectedSparkSubmitParameterString( + authParamConfigBuilder; } + private String withStructuredStreaming(String parameters) { + return parameters + " --conf spark.flint.job.type=streaming "; + } + private DataSourceMetadata constructMyGlueDataSourceMetadata() { DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); dataSourceMetadata.setName("my_glue");