Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Describe,Refresh and Show Queries Properly #2357

Merged
merged 1 commit into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ dropCoveringIndexStatement

materializedViewStatement
: createMaterializedViewStatement
| refreshMaterializedViewStatement
vmmusings marked this conversation as resolved.
Show resolved Hide resolved
| showMaterializedViewStatement
| describeMaterializedViewStatement
| dropMaterializedViewStatement
Expand All @@ -90,6 +91,10 @@ createMaterializedViewStatement
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshMaterializedViewStatement
: REFRESH MATERIALIZED VIEW mvName=multipartIdentifier
;

showMaterializedViewStatement
: SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.dispatcher.model.*;
import org.opensearch.sql.spark.execution.session.CreateSessionRequest;
import org.opensearch.sql.spark.execution.session.Session;
import org.opensearch.sql.spark.execution.session.SessionId;
Expand All @@ -56,11 +53,10 @@
public class SparkQueryDispatcher {

private static final Logger LOG = LogManager.getLogger();

public static final String INDEX_TAG_KEY = "index";
public static final String DATASOURCE_TAG_KEY = "datasource";
public static final String CLUSTER_NAME_TAG_KEY = "cluster";
public static final String JOB_TYPE_TAG_KEY = "job_type";
public static final String JOB_TYPE_TAG_KEY = "type";

private EMRServerlessClient emrServerlessClient;

Expand Down Expand Up @@ -107,15 +103,18 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
}

private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryRequest) {
if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) {
IndexDetails indexDetails =
if (SQLQueryUtils.isFlintExtensionQuery(dispatchQueryRequest.getQuery())) {
IndexQueryDetails indexQueryDetails =
SQLQueryUtils.extractIndexDetails(dispatchQueryRequest.getQuery());
fillMissingDetails(dispatchQueryRequest, indexDetails);
fillMissingDetails(dispatchQueryRequest, indexQueryDetails);

if (indexDetails.isDropIndex()) {
return handleDropIndexQuery(dispatchQueryRequest, indexDetails);
// TODO: refactor this code properly.
if (IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())) {
return handleDropIndexQuery(dispatchQueryRequest, indexQueryDetails);
} else if (IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType())) {
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
return handleStreamingQueries(dispatchQueryRequest, indexQueryDetails);
} else {
return handleIndexQuery(dispatchQueryRequest, indexDetails);
return handleFlintNonStreamingQueries(dispatchQueryRequest, indexQueryDetails);
}
} else {
return handleNonIndexQuery(dispatchQueryRequest);
Expand All @@ -127,24 +126,59 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR
// Spark Assumes the datasource to be catalog.
// This is required to handle drop index case properly when datasource name is not provided.
private static void fillMissingDetails(
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
if (indexDetails.getFullyQualifiedTableName() != null
&& indexDetails.getFullyQualifiedTableName().getDatasourceName() == null) {
indexDetails
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
if (indexQueryDetails.getFullyQualifiedTableName() != null
&& indexQueryDetails.getFullyQualifiedTableName().getDatasourceName() == null) {
indexQueryDetails
.getFullyQualifiedTableName()
.setDatasourceName(dispatchQueryRequest.getDatasource());
}
}

private DispatchQueryResponse handleIndexQuery(
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
private DispatchQueryResponse handleStreamingQueries(
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query";
Map<String, String> tags = getDefaultTagsForJobSubmission(dispatchQueryRequest);
tags.put(INDEX_TAG_KEY, indexQueryDetails.openSearchIndexName());
if (indexQueryDetails.isAutoRefresh()) {
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
}
StartJobRequest startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
jobName,
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.Builder.builder()
.dataSource(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.structuredStreaming(indexQueryDetails.isAutoRefresh())
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
.toString(),
tags,
indexQueryDetails.isAutoRefresh(),
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
return new DispatchQueryResponse(
AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()),
jobId,
false,
dataSourceMetadata.getResultIndex(),
null);
}

private DispatchQueryResponse handleFlintNonStreamingQueries(
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query";
Map<String, String> tags = getDefaultTagsForJobSubmission(dispatchQueryRequest);
tags.put(INDEX_TAG_KEY, indexDetails.openSearchIndexName());
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
StartJobRequest startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
Expand All @@ -155,12 +189,11 @@ private DispatchQueryResponse handleIndexQuery(
.dataSource(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.structuredStreaming(indexDetails.isAutoRefresh())
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
.toString(),
tags,
indexDetails.isAutoRefresh(),
indexQueryDetails.isAutoRefresh(),
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
return new DispatchQueryResponse(
Expand Down Expand Up @@ -242,11 +275,12 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ
}

private DispatchQueryResponse handleDropIndexQuery(
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails);
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
try {
Expand All @@ -255,7 +289,7 @@ private DispatchQueryResponse handleDropIndexQuery(
dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId());
}
} finally {
String indexName = indexDetails.openSearchIndexName();
String indexName = indexQueryDetails.openSearchIndexName();
try {
AcknowledgedResponse response =
client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher.model;

/** Enum for Index Action in the given query.* */
public enum IndexQueryActionType {
CREATE,
REFRESH,
DESCRIBE,
SHOW,
DROP
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.sql.spark.dispatcher.model;

import com.google.common.base.Preconditions;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -14,83 +13,67 @@
/** Index details in an async query. */
@Getter
@EqualsAndHashCode
public class IndexDetails {
public class IndexQueryDetails {

public static final String STRIP_CHARS = "`";

private String indexName;
private FullyQualifiedTableName fullyQualifiedTableName;
// by default, auto_refresh = false;
private boolean autoRefresh;
private boolean isDropIndex;
private IndexQueryActionType indexQueryActionType;
// materialized view special case where
// table name and mv name are combined.
private String mvName;
private FlintIndexType indexType;

private IndexDetails() {}
private IndexQueryDetails() {}

public static IndexDetailsBuilder builder() {
return new IndexDetailsBuilder();
public static IndexQueryDetailsBuilder builder() {
return new IndexQueryDetailsBuilder();
}

// Builder class
public static class IndexDetailsBuilder {
private final IndexDetails indexDetails;
public static class IndexQueryDetailsBuilder {
private final IndexQueryDetails indexQueryDetails;

public IndexDetailsBuilder() {
indexDetails = new IndexDetails();
public IndexQueryDetailsBuilder() {
indexQueryDetails = new IndexQueryDetails();
}

public IndexDetailsBuilder indexName(String indexName) {
indexDetails.indexName = indexName;
public IndexQueryDetailsBuilder indexName(String indexName) {
indexQueryDetails.indexName = indexName;
return this;
}

public IndexDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName tableName) {
indexDetails.fullyQualifiedTableName = tableName;
public IndexQueryDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName tableName) {
indexQueryDetails.fullyQualifiedTableName = tableName;
return this;
}

public IndexDetailsBuilder autoRefresh(Boolean autoRefresh) {
indexDetails.autoRefresh = autoRefresh;
public IndexQueryDetailsBuilder autoRefresh(Boolean autoRefresh) {
indexQueryDetails.autoRefresh = autoRefresh;
return this;
}

public IndexDetailsBuilder isDropIndex(boolean isDropIndex) {
indexDetails.isDropIndex = isDropIndex;
public IndexQueryDetailsBuilder indexQueryActionType(
IndexQueryActionType indexQueryActionType) {
indexQueryDetails.indexQueryActionType = indexQueryActionType;
return this;
}

public IndexDetailsBuilder mvName(String mvName) {
indexDetails.mvName = mvName;
public IndexQueryDetailsBuilder mvName(String mvName) {
indexQueryDetails.mvName = mvName;
return this;
}

public IndexDetailsBuilder indexType(FlintIndexType indexType) {
indexDetails.indexType = indexType;
public IndexQueryDetailsBuilder indexType(FlintIndexType indexType) {
indexQueryDetails.indexType = indexType;
return this;
}

public IndexDetails build() {
Preconditions.checkNotNull(indexDetails.indexType, "Index Type can't be null");
switch (indexDetails.indexType) {
case COVERING:
Preconditions.checkNotNull(
indexDetails.indexName, "IndexName can't be null for Covering Index.");
Preconditions.checkNotNull(
indexDetails.fullyQualifiedTableName, "TableName can't be null for Covering Index.");
break;
case SKIPPING:
Preconditions.checkNotNull(
indexDetails.fullyQualifiedTableName, "TableName can't be null for Skipping Index.");
break;
case MATERIALIZED_VIEW:
Preconditions.checkNotNull(indexDetails.mvName, "Materialized view name can't be null");
break;
}

return indexDetails;
public IndexQueryDetails build() {
return indexQueryDetails;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
public class InteractiveSession implements Session {
private static final Logger LOG = LogManager.getLogger();

public static final String SESSION_ID_TAG_KEY = "sid";

private final SessionId sessionId;
private final StateStore stateStore;
private final EMRServerlessClient serverlessClient;
Expand All @@ -46,6 +48,7 @@ public void open(CreateSessionRequest createSessionRequest) {
createSessionRequest
.getSparkSubmitParametersBuilder()
.sessionExecution(sessionId.getSessionId(), createSessionRequest.getDatasourceName());
createSessionRequest.getTags().put(SESSION_ID_TAG_KEY, sessionId.getSessionId());
String jobID = serverlessClient.startJobRun(createSessionRequest.getStartJobRequest());
String applicationId = createSessionRequest.getStartJobRequest().getApplicationId();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package org.opensearch.sql.spark.flint;

import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;

/** Interface for FlintIndexMetadataReader */
public interface FlintIndexMetadataReader {

/**
* Given Index details, get the streaming job Id.
*
* @param indexDetails indexDetails.
* @param indexQueryDetails indexDetails.
* @return FlintIndexMetadata.
*/
FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails);
FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;

/** Implementation of {@link FlintIndexMetadataReader} */
@AllArgsConstructor
Expand All @@ -14,8 +14,8 @@ public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader {
private final Client client;

@Override
public FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = indexDetails.openSearchIndexName();
public FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails) {
String indexName = indexQueryDetails.openSearchIndexName();
GetMappingsResponse mappingsResponse =
client.admin().indices().prepareGetMappings(indexName).get();
try {
Expand Down
Loading
Loading