Skip to content

Commit

Permalink
Create Job API
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vmmusings committed Sep 8, 2023
1 parent f96505d commit 272c847
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 4 deletions.
4 changes: 2 additions & 2 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ dependencies {
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
implementation 'com.github.babbel:okhttp-aws-signer:1.0.2'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.545'
implementation "com.github.seancfoley:ipaddress:5.4.0"

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
Expand Down
1 change: 1 addition & 0 deletions integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ configurations.all {
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.5.31"
resolutionStrategy.force "joda-time:joda-time:2.10.12"
resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36"
resolutionStrategy.force "com.amazonaws:aws-java-sdk-core:1.12.545"
}

configurations {
Expand Down
3 changes: 2 additions & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ dependencies {

implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.json', name: 'json', version: '20230227'
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.1'
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.545'
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.client;

import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR;

import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.model.CancelJobRunRequest;
import com.amazonaws.services.emrserverless.model.GetJobRunRequest;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobDriver;
import com.amazonaws.services.emrserverless.model.SparkSubmit;
import com.amazonaws.services.emrserverless.model.StartJobRunRequest;
import com.amazonaws.services.emrserverless.model.StartJobRunResult;
import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.spark.helper.FlintHelper;
import org.opensearch.sql.spark.response.SparkResponse;

public class EmrServerlessClientImpl implements SparkClient {

private final AWSEMRServerless emrServerless;
private final String applicationId;
private final String executionRoleArn;
private final FlintHelper flint;
private final String sparkApplicationJar;
private SparkResponse sparkResponse;
private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class);

public EmrServerlessClientImpl(
AWSEMRServerless emrServerless,
String applicationId,
String executionRoleArn,
FlintHelper flint,
String sparkApplicationJar,
SparkResponse sparkResponse) {
this.emrServerless = emrServerless;
this.applicationId = applicationId;
this.executionRoleArn = executionRoleArn;
this.flint = flint;
this.sparkApplicationJar =
sparkApplicationJar == null ? SPARK_SQL_APPLICATION_JAR : sparkApplicationJar;
this.sparkResponse = sparkResponse;
}

@Override
public JSONObject sql(String query) throws IOException {
// TODO: update/ remove for async approach
sparkResponse.setValue(startJobRun("temp", query));
return sparkResponse.getResultFromOpensearchIndex();

Check warning on line 55 in spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java#L54-L55

Added lines #L54 - L55 were not covered by tests
}

public String startJobRun(String jobName, String query) {
StartJobRunRequest request =
new StartJobRunRequest()
.withName(jobName)
.withApplicationId(applicationId)
.withExecutionRoleArn(executionRoleArn)
.withJobDriver(
new JobDriver()
.withSparkSubmit(
new SparkSubmit()
.withEntryPoint(sparkApplicationJar)
.withEntryPointArguments(query)
.withSparkSubmitParameters(
"--class org.opensearch.sql.SQLJob"
+ " --conf spark.driver.cores=1"
+ " --conf spark.driver.memory=1g"
+ " --conf spark.executor.cores=2"
+ " --conf spark.executor.memory=4g"
+ " --conf spark.jars="
+ flint.getFlintIntegrationJar()
+ " --conf spark.datasource.flint.host="
+ flint.getFlintHost()
+ " --conf spark.datasource.flint.port="
+ flint.getFlintPort()
+ " --conf spark.datasource.flint.scheme="
+ flint.getFlintScheme()
+ " --conf spark.datasource.flint.auth="
+ flint.getFlintAuth()
+ " --conf spark.datasource.flint.region="
+ flint.getFlintRegion()
+ " --conf"
+ " spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/"
+ " --conf"
+ " spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/"
+ " --conf"
+ " spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")));
StartJobRunResult response = emrServerless.startJobRun(request);
return response.getJobRunId();
}

public String getJobRunState(String jobRunId) {
GetJobRunRequest request =
new GetJobRunRequest().withApplicationId(applicationId).withJobRunId(jobRunId);
GetJobRunResult response = emrServerless.getJobRun(request);
return response.getJobRun().getState();
}

public void cancelJobRun(String jobRunId) {
// Cancel the job run
emrServerless.cancelJobRun(
new CancelJobRunRequest().withApplicationId(applicationId).withJobRunId(jobRunId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

public class SparkConstants {
public static final String EMR = "emr";
public static final String EMRS = "emr-serverless";
public static final String STEP_ID_FIELD = "stepId.keyword";
public static final String SPARK_SQL_APPLICATION_JAR = "s3://spark-datasource/sql-job.jar";
public static final String SPARK_INDEX_NAME = ".query_execution_result";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public class TransportCreateJobRequestAction
extends HandledTransportAction<CreateJobActionRequest, CreateJobActionResponse> {

public static final String NAME = "cluster:admin/opensearch/ql/jobs/write";
public static final String NAME = "cluster:admin/opensearch/ql/jobs/create";
public static final ActionType<CreateJobActionResponse> ACTION_TYPE =
new ActionType<>(NAME, CreateJobActionResponse::new);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.client;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_APPLICATION_ID;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_EXECUTION_ROLE;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_JOB_NAME;
import static org.opensearch.sql.spark.constants.TestConstants.QUERY;

import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRun;
import com.amazonaws.services.emrserverless.model.StartJobRunResult;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.spark.helper.FlintHelper;
import org.opensearch.sql.spark.response.SparkResponse;

@ExtendWith(MockitoExtension.class)
public class EmrServerlessClientImplTest {
@Mock private AWSEMRServerless emrServerless;
@Mock private FlintHelper flint;
@Mock private SparkResponse sparkResponse;

@Test
void testStartJobRun() {
StartJobRunResult response = new StartJobRunResult();
when(emrServerless.startJobRun(any())).thenReturn(response);

EmrServerlessClientImpl emrServerlessClient =
new EmrServerlessClientImpl(
emrServerless, EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, flint, null, sparkResponse);

emrServerlessClient.startJobRun(EMRS_JOB_NAME, QUERY);
}

@Test
void testGetJobRunState() {
JobRun jobRun = new JobRun();
jobRun.setState("Running");
GetJobRunResult response = new GetJobRunResult();
response.setJobRun(jobRun);
when(emrServerless.getJobRun(any())).thenReturn(response);

EmrServerlessClientImpl emrServerlessClient =
new EmrServerlessClientImpl(
emrServerless, EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, flint, "", sparkResponse);

emrServerlessClient.getJobRunState("123");
}

@Test
void testCancelJobRun() {
when(emrServerless.cancelJobRun(any())).thenReturn(new CancelJobRunResult());

EmrServerlessClientImpl emrServerlessClient =
new EmrServerlessClientImpl(
emrServerless, EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, flint, null, sparkResponse);

emrServerlessClient.cancelJobRun("123");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@
public class TestConstants {
public static final String QUERY = "select 1";
public static final String EMR_CLUSTER_ID = "j-123456789";
public static final String EMRS_APPLICATION_ID = "xxxxx";
public static final String EMRS_EXECUTION_ROLE = "execution_role";
public static final String EMRS_JOB_NAME = "job_name";
}

0 comments on commit 272c847

Please sign in to comment.