From 272c84718074f9db46bb3246ba53468d7729758c Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Fri, 8 Sep 2023 09:53:32 -0700 Subject: [PATCH] Create Job API Signed-off-by: Vamsi Manohar --- common/build.gradle | 4 +- integ-test/build.gradle | 1 + spark/build.gradle | 3 +- .../spark/client/EmrServerlessClientImpl.java | 110 ++++++++++++++++++ .../spark/data/constants/SparkConstants.java | 1 + .../TransportCreateJobRequestAction.java | 2 +- .../client/EmrServerlessClientImplTest.java | 70 +++++++++++ .../sql/spark/constants/TestConstants.java | 3 + 8 files changed, 190 insertions(+), 4 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java diff --git a/common/build.gradle b/common/build.gradle index 5cf219fbae..109cad59cb 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -39,8 +39,8 @@ dependencies { api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3' implementation 'com.github.babbel:okhttp-aws-signer:1.0.2' - api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.1' - api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.1' + api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.545' + api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.545' implementation "com.github.seancfoley:ipaddress:5.4.0" testImplementation group: 'junit', name: 'junit', version: '4.13.2' diff --git a/integ-test/build.gradle b/integ-test/build.gradle index eb9b65b52e..82e54a4b56 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -163,6 +163,7 @@ configurations.all { resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.5.31" resolutionStrategy.force "joda-time:joda-time:2.10.12" resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" + resolutionStrategy.force "com.amazonaws:aws-java-sdk-core:1.12.545" } configurations { diff --git a/spark/build.gradle b/spark/build.gradle index b93e3327ce..dc32ee19b5 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -19,7 +19,8 @@ dependencies { implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}" implementation group: 'org.json', name: 'json', version: '20230227' - implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.1' + implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.545' + implementation group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545' testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0' diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java new file mode 100644 index 0000000000..0fa5e34301 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.client; + +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR; + +import com.amazonaws.services.emrserverless.AWSEMRServerless; +import com.amazonaws.services.emrserverless.model.CancelJobRunRequest; +import com.amazonaws.services.emrserverless.model.GetJobRunRequest; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobDriver; +import com.amazonaws.services.emrserverless.model.SparkSubmit; +import com.amazonaws.services.emrserverless.model.StartJobRunRequest; +import com.amazonaws.services.emrserverless.model.StartJobRunResult; +import java.io.IOException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; +import org.opensearch.sql.spark.helper.FlintHelper; +import org.opensearch.sql.spark.response.SparkResponse; + +public class EmrServerlessClientImpl implements SparkClient { + + private final AWSEMRServerless emrServerless; + private final String applicationId; + private final String executionRoleArn; + private final FlintHelper flint; + private final String sparkApplicationJar; + private SparkResponse sparkResponse; + private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class); + + public EmrServerlessClientImpl( + AWSEMRServerless emrServerless, + String applicationId, + String executionRoleArn, + FlintHelper flint, + String sparkApplicationJar, + SparkResponse sparkResponse) { + this.emrServerless = emrServerless; + this.applicationId = applicationId; + this.executionRoleArn = executionRoleArn; + this.flint = flint; + this.sparkApplicationJar = + sparkApplicationJar == null ? SPARK_SQL_APPLICATION_JAR : sparkApplicationJar; + this.sparkResponse = sparkResponse; + } + + @Override + public JSONObject sql(String query) throws IOException { + // TODO: update/ remove for async approach + sparkResponse.setValue(startJobRun("temp", query)); + return sparkResponse.getResultFromOpensearchIndex(); + } + + public String startJobRun(String jobName, String query) { + StartJobRunRequest request = + new StartJobRunRequest() + .withName(jobName) + .withApplicationId(applicationId) + .withExecutionRoleArn(executionRoleArn) + .withJobDriver( + new JobDriver() + .withSparkSubmit( + new SparkSubmit() + .withEntryPoint(sparkApplicationJar) + .withEntryPointArguments(query) + .withSparkSubmitParameters( + "--class org.opensearch.sql.SQLJob" + + " --conf spark.driver.cores=1" + + " --conf spark.driver.memory=1g" + + " --conf spark.executor.cores=2" + + " --conf spark.executor.memory=4g" + + " --conf spark.jars=" + + flint.getFlintIntegrationJar() + + " --conf spark.datasource.flint.host=" + + flint.getFlintHost() + + " --conf spark.datasource.flint.port=" + + flint.getFlintPort() + + " --conf spark.datasource.flint.scheme=" + + flint.getFlintScheme() + + " --conf spark.datasource.flint.auth=" + + flint.getFlintAuth() + + " --conf spark.datasource.flint.region=" + + flint.getFlintRegion() + + " --conf" + + " spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/" + + " --conf" + + " spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/" + + " --conf" + + " spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"))); + StartJobRunResult response = emrServerless.startJobRun(request); + return response.getJobRunId(); + } + + public String getJobRunState(String jobRunId) { + GetJobRunRequest request = + new GetJobRunRequest().withApplicationId(applicationId).withJobRunId(jobRunId); + GetJobRunResult response = emrServerless.getJobRun(request); + return response.getJobRun().getState(); + } + + public void cancelJobRun(String jobRunId) { + // Cancel the job run + emrServerless.cancelJobRun( + new CancelJobRunRequest().withApplicationId(applicationId).withJobRunId(jobRunId)); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 65d5a01ba2..3441090353 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -7,6 +7,7 @@ public class SparkConstants { public static final String EMR = "emr"; + public static final String EMRS = "emr-serverless"; public static final String STEP_ID_FIELD = "stepId.keyword"; public static final String SPARK_SQL_APPLICATION_JAR = "s3://spark-datasource/sql-job.jar"; public static final String SPARK_INDEX_NAME = ".query_execution_result"; diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateJobRequestAction.java b/spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateJobRequestAction.java index 0c43566134..53ae9fad90 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateJobRequestAction.java +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateJobRequestAction.java @@ -20,7 +20,7 @@ public class TransportCreateJobRequestAction extends HandledTransportAction { - public static final String NAME = "cluster:admin/opensearch/ql/jobs/write"; + public static final String NAME = "cluster:admin/opensearch/ql/jobs/create"; public static final ActionType ACTION_TYPE = new ActionType<>(NAME, CreateJobActionResponse::new); diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java new file mode 100644 index 0000000000..a840ee4696 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.client; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.spark.constants.TestConstants.EMRS_APPLICATION_ID; +import static org.opensearch.sql.spark.constants.TestConstants.EMRS_EXECUTION_ROLE; +import static org.opensearch.sql.spark.constants.TestConstants.EMRS_JOB_NAME; +import static org.opensearch.sql.spark.constants.TestConstants.QUERY; + +import com.amazonaws.services.emrserverless.AWSEMRServerless; +import com.amazonaws.services.emrserverless.model.CancelJobRunResult; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobRun; +import com.amazonaws.services.emrserverless.model.StartJobRunResult; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.spark.helper.FlintHelper; +import org.opensearch.sql.spark.response.SparkResponse; + +@ExtendWith(MockitoExtension.class) +public class EmrServerlessClientImplTest { + @Mock private AWSEMRServerless emrServerless; + @Mock private FlintHelper flint; + @Mock private SparkResponse sparkResponse; + + @Test + void testStartJobRun() { + StartJobRunResult response = new StartJobRunResult(); + when(emrServerless.startJobRun(any())).thenReturn(response); + + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl( + emrServerless, EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, flint, null, sparkResponse); + + emrServerlessClient.startJobRun(EMRS_JOB_NAME, QUERY); + } + + @Test + void testGetJobRunState() { + JobRun jobRun = new JobRun(); + jobRun.setState("Running"); + GetJobRunResult response = new GetJobRunResult(); + response.setJobRun(jobRun); + when(emrServerless.getJobRun(any())).thenReturn(response); + + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl( + emrServerless, EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, flint, "", sparkResponse); + + emrServerlessClient.getJobRunState("123"); + } + + @Test + void testCancelJobRun() { + when(emrServerless.cancelJobRun(any())).thenReturn(new CancelJobRunResult()); + + EmrServerlessClientImpl emrServerlessClient = + new EmrServerlessClientImpl( + emrServerless, EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, flint, null, sparkResponse); + + emrServerlessClient.cancelJobRun("123"); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/constants/TestConstants.java b/spark/src/test/java/org/opensearch/sql/spark/constants/TestConstants.java index 2b1020568a..e8a5c2c5cd 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/constants/TestConstants.java +++ b/spark/src/test/java/org/opensearch/sql/spark/constants/TestConstants.java @@ -8,4 +8,7 @@ public class TestConstants { public static final String QUERY = "select 1"; public static final String EMR_CLUSTER_ID = "j-123456789"; + public static final String EMRS_APPLICATION_ID = "xxxxx"; + public static final String EMRS_EXECUTION_ROLE = "execution_role"; + public static final String EMRS_JOB_NAME = "job_name"; }