Skip to content

Commit

Permalink
Minor fix in dropping covering index (#2226)
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vmmusings authored Oct 5, 2023
1 parent a83482d commit 5f76037
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 55 deletions.
4 changes: 2 additions & 2 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import org.opensearch.sql.spark.asyncquery.AsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.asyncquery.OpensearchAsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.EmrServerlessClientImplEMR;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
Expand Down Expand Up @@ -325,7 +325,7 @@ private EMRServerlessClient createEMRServerlessClient() {
.withRegion(sparkExecutionEngineConfig.getRegion())
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImplEMR(awsemrServerless);
return new EmrServerlessClientImpl(awsemrServerless);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
/**
* Client Interface for spark Job Submissions. Can have multiple implementations based on the
* underlying spark infrastructure. Currently, we have one for EMRServerless {@link
* EmrServerlessClientImplEMR}
* EmrServerlessClientImpl}
*/
public interface EMRServerlessClient {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class EmrServerlessClientImplEMR implements EMRServerlessClient {
public class EmrServerlessClientImpl implements EMRServerlessClient {

private final AWSEMRServerless emrServerless;
private static final Logger logger = LogManager.getLogger(EmrServerlessClientImplEMR.class);
private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class);

public EmrServerlessClientImplEMR(AWSEMRServerless emrServerless) {
public EmrServerlessClientImpl(AWSEMRServerless emrServerless) {
this.emrServerless = emrServerless;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader {

@Override
public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = getIndexName(indexDetails).toLowerCase();
String indexName = getIndexName(indexDetails);
GetMappingsResponse mappingsResponse =
client.admin().indices().prepareGetMappings(indexName).get();
try {
Expand All @@ -34,27 +34,31 @@ public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) {
private String getIndexName(IndexDetails indexDetails) {
FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(indexDetails.getIndexType())) {
return "flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexType().getName();
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();
} else if (FlintIndexType.COVERING.equals(indexDetails.getIndexType())) {
return "flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexName()
+ "_"
+ indexDetails.getIndexType().getName();
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexName()
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", indexDetails.getIndexType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,19 @@

package org.opensearch.sql.spark.flint;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/** Enum for FlintIndex Type. */
public enum FlintIndexType {
SKIPPING("skipping_index"),
COVERING("covering_index"),
COVERING("index"),
MATERIALIZED("materialized_view");

private final String name;
private static final Map<String, FlintIndexType> ENUM_MAP;

FlintIndexType(String name) {
this.name = name;
}

public String getName() {
return this.name;
}
private final String suffix;

static {
Map<String, FlintIndexType> map = new HashMap<>();
for (FlintIndexType instance : FlintIndexType.values()) {
map.put(instance.getName().toLowerCase(), instance);
}
ENUM_MAP = Collections.unmodifiableMap(map);
FlintIndexType(String suffix) {
this.suffix = suffix;
}

public static FlintIndexType get(String name) {
return ENUM_MAP.get(name.toLowerCase());
public String getSuffix() {
return this.suffix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void testStartJobRun() {
StartJobRunResult response = new StartJobRunResult();
when(emrServerless.startJobRun(any())).thenReturn(response);

EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
emrServerlessClient.startJobRun(
new StartJobRequest(
QUERY,
Expand All @@ -54,7 +54,7 @@ void testStartJobRunResultIndex() {
StartJobRunResult response = new StartJobRunResult();
when(emrServerless.startJobRun(any())).thenReturn(response);

EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
emrServerlessClient.startJobRun(
new StartJobRequest(
QUERY,
Expand All @@ -74,15 +74,15 @@ void testGetJobRunState() {
GetJobRunResult response = new GetJobRunResult();
response.setJobRun(jobRun);
when(emrServerless.getJobRun(any())).thenReturn(response);
EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, "123");
}

@Test
void testCancelJobRun() {
when(emrServerless.cancelJobRun(any()))
.thenReturn(new CancelJobRunResult().withJobRunId(EMR_JOB_ID));
EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
CancelJobRunResult cancelJobRunResult =
emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID);
Assertions.assertEquals(EMR_JOB_ID, cancelJobRunResult.getJobRunId());
Expand All @@ -91,7 +91,7 @@ void testCancelJobRun() {
@Test
void testCancelJobRunWithValidationException() {
doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any());
EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class,
Expand Down

0 comments on commit 5f76037

Please sign in to comment.