Skip to content

Commit

Permalink
Bug Fix , delete OpenSearch index when DROP INDEX (#2250)
Browse files Browse the repository at this point in the history
* Bug Fix, delete opensearch index when DROP INDEX

Signed-off-by: Peng Huo <penghuo@gmail.com>

* address comments

Signed-off-by: Peng Huo <penghuo@gmail.com>

---------

Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo authored Oct 6, 2023
1 parent 55e8e84 commit 0c78105
Show file tree
Hide file tree
Showing 13 changed files with 492 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService() {
this.dataSourceService,
new DataSourceUserAuthorizationHelperImpl(client),
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client));
new FlintIndexMetadataReaderImpl(client),
client);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}
Expand Down
2 changes: 1 addition & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ dependencies {
test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
events "failed"
exceptionFormat "full"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public Optional<AsyncQueryJobMetadata> getJobMetadata(String jobId) {
createJobMetadataIndex();
return Optional.empty();
}
return searchInJobMetadataIndex(QueryBuilders.termQuery("jobId", jobId)).stream().findFirst();
return searchInJobMetadataIndex(QueryBuilders.termQuery("jobId.keyword", jobId)).stream()
.findFirst();
}

private void createJobMetadataIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,22 @@
import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRunState;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
Expand All @@ -28,6 +39,7 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.model.LangType;
Expand All @@ -37,6 +49,8 @@
@AllArgsConstructor
public class SparkQueryDispatcher {

private static final Logger LOG = LogManager.getLogger();

public static final String INDEX_TAG_KEY = "index";
public static final String DATASOURCE_TAG_KEY = "datasource";
public static final String SCHEMA_TAG_KEY = "schema";
Expand All @@ -53,6 +67,8 @@ public class SparkQueryDispatcher {

private FlintIndexMetadataReader flintIndexMetadataReader;

private Client client;

public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) {
if (LangType.SQL.equals(dispatchQueryRequest.getLangType())) {
return handleSQLQuery(dispatchQueryRequest);
Expand All @@ -64,6 +80,11 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
}

public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
// todo. refactor query process logic in plugin.
if (asyncQueryJobMetadata.isDropIndexQuery()) {
return DropIndexResult.fromJobId(asyncQueryJobMetadata.getJobId()).result();
}

// either empty json when the result is not available or data with status
// Fetch from Result Index
JSONObject result =
Expand Down Expand Up @@ -186,11 +207,29 @@ private DispatchQueryResponse handleDropIndexQuery(
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
String jobId = flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(indexDetails);
emrServerlessClient.cancelJobRun(dispatchQueryRequest.getApplicationId(), jobId);
String dropIndexDummyJobId = RandomStringUtils.randomAlphanumeric(16);
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
try {
if (indexMetadata.isAutoRefresh()) {
emrServerlessClient.cancelJobRun(
dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId());
}
} finally {
String indexName = indexDetails.openSearchIndexName();
try {
AcknowledgedResponse response =
client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get();
if (!response.isAcknowledged()) {
LOG.error("failed to delete index");
}
status = JobRunState.SUCCESS.toString();
} catch (InterruptedException | ExecutionException e) {
LOG.error("failed to delete index");
}
}
return new DispatchQueryResponse(
dropIndexDummyJobId, true, dataSourceMetadata.getResultIndex());
new DropIndexResult(status).toJobId(), true, dataSourceMetadata.getResultIndex());
}

private static Map<String, String> getDefaultTagsForJobSubmission(
Expand All @@ -200,4 +239,39 @@ private static Map<String, String> getDefaultTagsForJobSubmission(
tags.put(DATASOURCE_TAG_KEY, dispatchQueryRequest.getDatasource());
return tags;
}

@Getter
@RequiredArgsConstructor
public static class DropIndexResult {
private static final int PREFIX_LEN = 10;

private final String status;

public static DropIndexResult fromJobId(String jobId) {
String status = new String(Base64.getDecoder().decode(jobId)).substring(PREFIX_LEN);
return new DropIndexResult(status);
}

public String toJobId() {
String queryId = RandomStringUtils.randomAlphanumeric(PREFIX_LEN) + status;
return Base64.getEncoder().encodeToString(queryId.getBytes(StandardCharsets.UTF_8));
}

public JSONObject result() {
JSONObject result = new JSONObject();
if (JobRunState.SUCCESS.toString().equalsIgnoreCase(status)) {
result.put(STATUS_FIELD, status);
// todo. refactor response handling.
JSONObject dummyData = new JSONObject();
dummyData.put("result", new JSONArray());
dummyData.put("schema", new JSONArray());
dummyData.put("applicationId", "fakeDropIndexApplicationId");
result.put(DATA_FIELD, dummyData);
} else {
result.put(STATUS_FIELD, status);
result.put(ERROR_FIELD, "failed to drop index");
}
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,38 @@ public class IndexDetails {
private Boolean autoRefresh = false;
private boolean isDropIndex;
private FlintIndexType indexType;

public String openSearchIndexName() {
FullyQualifiedTableName fullyQualifiedTableName = getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else if (FlintIndexType.COVERING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", getIndexType()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import java.util.Locale;
import java.util.Map;
import lombok.Data;

@Data
public class FlintIndexMetadata {
public static final String PROPERTIES_KEY = "properties";
public static final String ENV_KEY = "env";
public static final String OPTIONS_KEY = "options";

public static final String SERVERLESS_EMR_JOB_ID = "SERVERLESS_EMR_JOB_ID";
public static final String AUTO_REFRESH = "auto_refresh";
public static final String AUTO_REFRESH_DEFAULT = "false";

private final String jobId;
private final boolean autoRefresh;

public static FlintIndexMetadata fromMetatdata(Map<String, Object> metaMap) {
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get(PROPERTIES_KEY);
Map<String, Object> envMap = (Map<String, Object>) propertiesMap.get(ENV_KEY);
Map<String, Object> options = (Map<String, Object>) metaMap.get(OPTIONS_KEY);
String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID);

boolean autoRefresh =
!((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT))
.toLowerCase(Locale.ROOT)
.equalsIgnoreCase(AUTO_REFRESH_DEFAULT);
return new FlintIndexMetadata(jobId, autoRefresh);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public interface FlintIndexMetadataReader {
* Given Index details, get the streaming job Id.
*
* @param indexDetails indexDetails.
* @return jobId.
* @return FlintIndexMetadata.
*/
String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails);
FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,25 @@
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;

/** Implementation of {@link FlintIndexMetadataReader} */
@AllArgsConstructor
public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader {

protected static final String META_KEY = "_meta";
protected static final String PROPERTIES_KEY = "properties";
protected static final String ENV_KEY = "env";
protected static final String JOB_ID_KEY = "SERVERLESS_EMR_JOB_ID";

private final Client client;

@Override
public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = getIndexName(indexDetails);
public FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = indexDetails.openSearchIndexName();
GetMappingsResponse mappingsResponse =
client.admin().indices().prepareGetMappings(indexName).get();
try {
MappingMetadata mappingMetadata = mappingsResponse.mappings().get(indexName);
Map<String, Object> mappingSourceMap = mappingMetadata.getSourceAsMap();
Map<String, Object> metaMap = (Map<String, Object>) mappingSourceMap.get(META_KEY);
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get(PROPERTIES_KEY);
Map<String, Object> envMap = (Map<String, Object>) propertiesMap.get(ENV_KEY);
return (String) envMap.get(JOB_ID_KEY);
return FlintIndexMetadata.fromMetatdata((Map<String, Object>) mappingSourceMap.get("_meta"));
} catch (NullPointerException npe) {
throw new IllegalArgumentException("Provided Index doesn't exist");
}
}

private String getIndexName(IndexDetails indexDetails) {
FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(indexDetails.getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();
} else if (FlintIndexType.COVERING.equals(indexDetails.getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexName()
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", indexDetails.getIndexType()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;

import com.amazonaws.services.emrserverless.model.JobRunState;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;

public class DropIndexResultTest {
// todo, remove this UT after response refactor.
@Test
public void successRespEncodeDecode() {
// encode jobId
String jobId =
new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString()).toJobId();

// decode jobId
SparkQueryDispatcher.DropIndexResult dropIndexResult =
SparkQueryDispatcher.DropIndexResult.fromJobId(jobId);

JSONObject result = dropIndexResult.result();
assertEquals(JobRunState.SUCCESS.toString(), result.get(STATUS_FIELD));
assertEquals(
"{\"result\":[],\"schema\":[],\"applicationId\":\"fakeDropIndexApplicationId\"}",
result.get(DATA_FIELD).toString());
}

// todo, remove this UT after response refactor.
@Test
public void failedRespEncodeDecode() {
// encode jobId
String jobId =
new SparkQueryDispatcher.DropIndexResult(JobRunState.FAILED.toString()).toJobId();

// decode jobId
SparkQueryDispatcher.DropIndexResult dropIndexResult =
SparkQueryDispatcher.DropIndexResult.fromJobId(jobId);

JSONObject result = dropIndexResult.result();
assertEquals(JobRunState.FAILED.toString(), result.get(STATUS_FIELD));
assertEquals("failed to drop index", result.get(ERROR_FIELD));
}
}
Loading

0 comments on commit 0c78105

Please sign in to comment.