Skip to content

Commit

Permalink
fix format
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo committed Oct 3, 2023
1 parent ab3096a commit e8d32de
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_AUTH;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_HOST;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_PORT;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_REGION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_SCHEME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DELEGATE_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY;
Expand Down Expand Up @@ -56,9 +55,7 @@
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.auth.AuthenticationType;

/**
* Define Spark Submit Parameters.
*/
/** Define Spark Submit Parameters. */
@RequiredArgsConstructor
public class SparkSubmitParameters {
public static final String SPACE = " ";
Expand Down Expand Up @@ -107,8 +104,9 @@ public Builder dataSource(DataSourceMetadata metadata) {
config.put(HIVE_METASTORE_GLUE_ARN_KEY, roleArn);
config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG);

URI uri = parseUri(metadata.getProperties().get("glue.indexstore.opensearch.uri"),
metadata.getName());
URI uri =
parseUri(
metadata.getProperties().get("glue.indexstore.opensearch.uri"), metadata.getName());
flintConfig(
metadata,
uri.getHost(),
Expand All @@ -119,21 +117,19 @@ public Builder dataSource(DataSourceMetadata metadata) {
}
throw new UnsupportedOperationException(
String.format(
"UnSupported datasource type for async queries:: %s",
metadata.getConnector()));
"UnSupported datasource type for async queries:: %s", metadata.getConnector()));
}

private void flintConfig(DataSourceMetadata metadata, String host, String port, String scheme,
String auth) {
private void flintConfig(
DataSourceMetadata metadata, String host, String port, String scheme, String auth) {
config.put(FLINT_INDEX_STORE_HOST_KEY, host);
config.put(FLINT_INDEX_STORE_PORT_KEY, port);
config.put(FLINT_INDEX_STORE_SCHEME_KEY, scheme);
setFlintIndexStoreAuthProperties(metadata, auth);
}

private void setFlintIndexStoreAuthProperties(
DataSourceMetadata dataSourceMetadata,
String authType) {
DataSourceMetadata dataSourceMetadata, String authType) {
if (AuthenticationType.get(authType).equals(AuthenticationType.BASICAUTH)) {
config.put(FLINT_INDEX_STORE_AUTH_KEY, authType);
String username =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ public class StartJobRequest {
private final String sparkSubmitParams;
private final Map<String, String> tags;

/**
* true if it is Spark Structured Streaming job.
*/
/** true if it is Spark Structured Streaming job. */
private final boolean isStructuredStreaming;

public Long executionTimeout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,13 @@ private StartJobRequest getStartJobRequestForNonIndexQueries(
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.Builder.builder()
.dataSource(dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()))
.build().toString(),
tags, false);
.dataSource(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.build()
.toString(),
tags,
false);
return startJobRequest;
}

Expand All @@ -128,10 +132,14 @@ private StartJobRequest getStartJobRequestForIndexRequest(
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.Builder.builder()
.dataSource(dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()))
.dataSource(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.structuredStreaming()
.build().toString(),
tags, true);
.build()
.toString(),
tags,
true);
return startJobRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ void testStartJobRun() {
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
SPARK_SUBMIT_PARAMETERS,
new HashMap<>(), false));
new HashMap<>(),
false));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ void testDispatchSelectQuery() {
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
}),
tags, false)))
tags,
false)))
.thenReturn(EMR_JOB_ID);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();
when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata);
Expand Down Expand Up @@ -103,7 +104,8 @@ void testDispatchSelectQuery() {
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
}),
tags, false));
tags,
false));
Assertions.assertEquals(EMR_JOB_ID, jobId);
}

Expand Down Expand Up @@ -133,7 +135,8 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() {
put(FLINT_INDEX_STORE_AUTH_PASSWORD, "password");
}
}),
tags, false)))
tags,
false)))
.thenReturn(EMR_JOB_ID);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithBasicAuth();
when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata);
Expand Down Expand Up @@ -162,7 +165,8 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() {
put(FLINT_INDEX_STORE_AUTH_PASSWORD, "password");
}
}),
tags, false));
tags,
false));
Assertions.assertEquals(EMR_JOB_ID, jobId);
}

Expand Down Expand Up @@ -190,7 +194,8 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() {
{
}
}),
tags, false)))
tags,
false)))
.thenReturn(EMR_JOB_ID);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithNoAuth();
when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata);
Expand All @@ -217,7 +222,8 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() {
{
}
}),
tags, false));
tags,
false));
Assertions.assertEquals(EMR_JOB_ID, jobId);
}

Expand All @@ -244,14 +250,16 @@ void testDispatchIndexQuery() {
"TEST_CLUSTER:index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
withStructuredStreaming(constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
})),
tags, true)))
withStructuredStreaming(
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
})),
tags,
true)))
.thenReturn(EMR_JOB_ID);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();
when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata);
Expand All @@ -272,14 +280,16 @@ void testDispatchIndexQuery() {
"TEST_CLUSTER:index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
withStructuredStreaming(constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
})),
tags, true));
withStructuredStreaming(
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
})),
tags,
true));
Assertions.assertEquals(EMR_JOB_ID, jobId);
}

Expand Down Expand Up @@ -308,7 +318,8 @@ void testDispatchWithPPLQuery() {
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
}),
tags, false)))
tags,
false)))
.thenReturn(EMR_JOB_ID);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();
when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata);
Expand Down Expand Up @@ -336,7 +347,8 @@ void testDispatchWithPPLQuery() {
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
}),
tags, false));
tags,
false));
Assertions.assertEquals(EMR_JOB_ID, jobId);
}

Expand Down Expand Up @@ -365,7 +377,8 @@ void testDispatchQueryWithoutATableAndDataSourceName() {
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
}),
tags, false)))
tags,
false)))
.thenReturn(EMR_JOB_ID);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();
when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata);
Expand Down Expand Up @@ -393,7 +406,8 @@ void testDispatchQueryWithoutATableAndDataSourceName() {
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
}),
tags, false));
tags,
false));
Assertions.assertEquals(EMR_JOB_ID, jobId);
}

Expand All @@ -420,14 +434,16 @@ void testDispatchIndexQueryWithoutADatasourceName() {
"TEST_CLUSTER:index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
withStructuredStreaming(constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
})),
tags, true)))
withStructuredStreaming(
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
})),
tags,
true)))
.thenReturn(EMR_JOB_ID);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();
when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata);
Expand All @@ -448,14 +464,16 @@ void testDispatchIndexQueryWithoutADatasourceName() {
"TEST_CLUSTER:index-query",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
withStructuredStreaming(constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
})),
tags, true));
withStructuredStreaming(
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
})),
tags,
true));
Assertions.assertEquals(EMR_JOB_ID, jobId);
}

Expand Down

0 comments on commit e8d32de

Please sign in to comment.