diff --git a/docs/index.md b/docs/index.md index cc030e0a3..2c93418fa 100644 --- a/docs/index.md +++ b/docs/index.md @@ -522,6 +522,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.auth.password`: basic auth password. - `spark.datasource.flint.region`: default is us-west-2. only been used when auth=sigv4 - `spark.datasource.flint.customAWSCredentialsProvider`: default is empty. +- `spark.datasource.flint.customFlintMetadataLogServiceClass`: default is empty. +- `spark.datasource.flint.customFlintIndexMetadataServiceClass`: default is empty. - `spark.datasource.flint.write.id_name`: no default value. - `spark.datasource.flint.ignore.id_column` : default value is true. - `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintVersion.scala b/flint-commons/src/main/scala/org/opensearch/flint/common/FlintVersion.scala similarity index 93% rename from flint-core/src/main/scala/org/opensearch/flint/core/FlintVersion.scala rename to flint-commons/src/main/scala/org/opensearch/flint/common/FlintVersion.scala index 909d76ce5..4de6e5b7f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintVersion.scala +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/FlintVersion.scala @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.core +package org.opensearch.flint.common /** * Flint version. diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintIndexMetadataService.java b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintIndexMetadataService.java new file mode 100644 index 000000000..b990998a9 --- /dev/null +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintIndexMetadataService.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.common.metadata; + +import java.util.Map; + +/** + * Flint index metadata service provides API for index metadata related operations on a Flint index + * regardless of underlying storage. + *

+ * Custom implementations of this interface are expected to provide a public constructor with + * the signature {@code public MyCustomService(SparkConf sparkConf)} to be instantiated by + * the FlintIndexMetadataServiceBuilder. + */ +public interface FlintIndexMetadataService { + + /** + * Retrieve metadata for a Flint index. + * + * @param indexName index name + * @return index metadata + */ + FlintMetadata getIndexMetadata(String indexName); + + /** + * Retrieve all metadata for Flint index whose name matches the given pattern. + * + * @param indexNamePattern index name pattern + * @return map where the keys are the matched index names, and the values are + * corresponding index metadata + */ + Map getAllIndexMetadata(String... indexNamePattern); + + /** + * Update metadata for a Flint index. + * + * @param indexName index name + * @param metadata index metadata to update + */ + void updateIndexMetadata(String indexName, FlintMetadata metadata); + + /** + * Delete metadata for a Flint index. + * + * @param indexName index name + */ + void deleteIndexMetadata(String indexName); +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintMetadata.scala similarity index 52% rename from flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala rename to flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintMetadata.scala index e4e94cc8c..219a0a831 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintMetadata.scala @@ -3,14 +3,13 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.core.metadata +package org.opensearch.flint.common.metadata import java.util +import org.opensearch.flint.common.FlintVersion +import org.opensearch.flint.common.FlintVersion.current import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.FlintVersion -import org.opensearch.flint.core.FlintVersion.current -import org.opensearch.flint.core.metadata.FlintJsonHelper._ /** * Flint metadata follows Flint index specification and defines metadata for a Flint index @@ -35,7 +34,11 @@ case class FlintMetadata( schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], /** Optional latest metadata log entry id */ latestId: Option[String] = None, - /** Optional latest metadata log entry */ + /** + * Optional latest metadata log entry. TODO: remove? This was added for SHOW command to be + * fetched during get(All)IndexMetadata. Now describeIndex uses metadata log service to fetch + * log entry after get(All)IndexMetadata so this doesn't need to be part of FlintMetadata. + */ latestLogEntry: Option[FlintMetadataLogEntry] = None, /** Optional Flint index settings. TODO: move elsewhere? */ indexSettings: Option[String]) { @@ -44,124 +47,10 @@ case class FlintMetadata( require(name != null, "name is required") require(kind != null, "kind is required") require(source != null, "source is required") - - /** - * Generate JSON content as index metadata. - * - * @return - * JSON content - */ - def getContent: String = { - try { - buildJson(builder => { - // Add _meta field - objectField(builder, "_meta") { - builder - .field("version", version.version) - .field("name", name) - .field("kind", kind) - .field("source", source) - .field("indexedColumns", indexedColumns) - - if (latestId.isDefined) { - builder.field("latestId", latestId.get) - } - optionalObjectField(builder, "options", options) - optionalObjectField(builder, "properties", properties) - } - - // Add properties (schema) field - builder.field("properties", schema) - }) - } catch { - case e: Exception => - throw new IllegalStateException("Failed to jsonify Flint metadata", e) - } - } } object FlintMetadata { - /** - * Construct Flint metadata with JSON content, index settings, and latest log entry. - * - * @param content - * JSON content - * @param settings - * index settings - * @param latestLogEntry - * latest metadata log entry - * @return - * Flint metadata - */ - def apply( - content: String, - settings: String, - latestLogEntry: FlintMetadataLogEntry): FlintMetadata = { - val metadata = FlintMetadata(content, settings) - metadata.copy(latestLogEntry = Option(latestLogEntry)) - } - - /** - * Construct Flint metadata with JSON content and index settings. - * - * @param content - * JSON content - * @param settings - * index settings - * @return - * Flint metadata - */ - def apply(content: String, settings: String): FlintMetadata = { - val metadata = FlintMetadata(content) - metadata.copy(indexSettings = Option(settings)) - } - - /** - * Parse the given JSON content and construct Flint metadata class. - * - * @param content - * JSON content - * @return - * Flint metadata - */ - def apply(content: String): FlintMetadata = { - try { - val builder = new FlintMetadata.Builder() - parseJson(content) { (parser, fieldName) => - { - fieldName match { - case "_meta" => - parseObjectField(parser) { (parser, innerFieldName) => - { - innerFieldName match { - case "version" => builder.version(FlintVersion.apply(parser.text())) - case "name" => builder.name(parser.text()) - case "kind" => builder.kind(parser.text()) - case "source" => builder.source(parser.text()) - case "indexedColumns" => - parseArrayField(parser) { - builder.addIndexedColumn(parser.map()) - } - case "options" => builder.options(parser.map()) - case "properties" => builder.properties(parser.map()) - case _ => // Handle other fields as needed - } - } - } - case "properties" => - builder.schema(parser.map()) - case _ => // Ignore other fields, for instance, dynamic. - } - } - } - builder.build() - } catch { - case e: Exception => - throw new IllegalStateException("Failed to parse metadata JSON", e) - } - } - def builder(): FlintMetadata.Builder = new Builder /** @@ -231,16 +120,6 @@ object FlintMetadata { this } - def schema(schema: String): this.type = { - parseJson(schema) { (parser, fieldName) => - fieldName match { - case "properties" => this.schema = parser.map() - case _ => // do nothing - } - } - this - } - def latestLogEntry(entry: FlintMetadataLogEntry): this.type = { this.latestId = Option(entry.id) this.latestLogEntry = Option(entry) diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala index a7391ed6a..982b7df23 100644 --- a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala @@ -43,8 +43,8 @@ case class FlintMetadataLogEntry( state: IndexState, entryVersion: JMap[String, Any], error: String, - storageContext: JMap[String, Any]) = { - this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext.asScala.toMap) + properties: JMap[String, Any]) = { + this(id, createTime, state, entryVersion.asScala.toMap, error, properties.asScala.toMap) } def this( @@ -53,8 +53,8 @@ case class FlintMetadataLogEntry( state: IndexState, entryVersion: JMap[String, Any], error: String, - storageContext: Map[String, Any]) = { - this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext) + properties: Map[String, Any]) = { + this(id, createTime, state, entryVersion.asScala.toMap, error, properties) } } diff --git a/flint-commons/src/test/scala/org/opensearch/flint/common/metadata/FlintMetadataSuite.scala b/flint-commons/src/test/scala/org/opensearch/flint/common/metadata/FlintMetadataSuite.scala new file mode 100644 index 000000000..ea69c69ef --- /dev/null +++ b/flint-commons/src/test/scala/org/opensearch/flint/common/metadata/FlintMetadataSuite.scala @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.common.metadata + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.opensearch.flint.common.FlintVersion.current +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class FlintMetadataSuite extends AnyFlatSpec with Matchers { + "builder" should "build FlintMetadata with provided fields" in { + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) + + val metadata = builder.build() + + metadata.version shouldBe current() + metadata.name shouldBe "test_index" + metadata.kind shouldBe "test_kind" + metadata.source shouldBe "test_source_table" + metadata.indexedColumns shouldBe Array(Map("test_field" -> "spark_type").asJava) + metadata.schema shouldBe Map("test_field" -> Map("type" -> "os_type").asJava).asJava + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 1a3775f0b..29b5f6de9 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -5,9 +5,7 @@ package org.opensearch.flint.core; -import java.util.Map; - -import org.opensearch.flint.core.metadata.FlintMetadata; +import org.opensearch.flint.common.metadata.FlintMetadata; import org.opensearch.flint.core.storage.FlintWriter; /** @@ -32,31 +30,6 @@ public interface FlintClient { */ boolean exists(String indexName); - /** - * Retrieve all metadata for Flint index whose name matches the given pattern. - * - * @param indexNamePattern index name pattern - * @return map where the keys are the matched index names, and the values are - * corresponding index metadata - */ - Map getAllIndexMetadata(String... indexNamePattern); - - /** - * Retrieve metadata in a Flint index. - * - * @param indexName index name - * @return index metadata - */ - FlintMetadata getIndexMetadata(String indexName); - - /** - * Update a Flint index with the metadata given. - * - * @param indexName index name - * @param metadata index metadata - */ - void updateIndex(String indexName, FlintMetadata metadata); - /** * Delete a Flint index. * diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 6eebf1598..6c3c02b9f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -95,7 +95,9 @@ public class FlintOptions implements Serializable { public static final String DEFAULT_BATCH_BYTES = "1mb"; - public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "spark.datasource.flint.customFlintMetadataLogServiceClass"; + public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "customFlintMetadataLogServiceClass"; + + public static final String CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS = "customFlintIndexMetadataServiceClass"; public static final String SUPPORT_SHARD = "read.support_shard"; @@ -189,6 +191,10 @@ public String getCustomFlintMetadataLogServiceClass() { return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, ""); } + public String getCustomFlintIndexMetadataServiceClass() { + return options.getOrDefault(CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS, ""); + } + /** * FIXME, This is workaround for AWS OpenSearch Serverless (AOSS). AOSS does not support shard * operation, but shard info is exposed in index settings. Remove this setting when AOSS fix diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/MetaData.scala b/flint-core/src/main/scala/org/opensearch/flint/core/MetaData.scala index 98b7f8960..1ada5f2e3 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/MetaData.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/MetaData.scala @@ -5,8 +5,6 @@ package org.opensearch.flint.core -import org.opensearch.flint.core.metadata.FlintMetadata - /** * OpenSearch Table metadata. * @@ -18,11 +16,3 @@ import org.opensearch.flint.core.metadata.FlintMetadata * setting */ case class MetaData(name: String, properties: String, setting: String) - -object MetaData { - def apply(name: String, flintMetadata: FlintMetadata): MetaData = { - val properties = flintMetadata.getContent - val setting = flintMetadata.indexSettings.getOrElse("") - MetaData(name, properties, setting) - } -} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintIndexMetadataServiceBuilder.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintIndexMetadataServiceBuilder.java new file mode 100644 index 000000000..d6f88135f --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintIndexMetadataServiceBuilder.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata; + +import java.lang.reflect.Constructor; +import org.opensearch.flint.common.metadata.FlintIndexMetadataService; +import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService; + +/** + * {@link FlintIndexMetadataService} builder. + *

+ * Custom implementations of {@link FlintIndexMetadataService} are expected to provide a public + * constructor with no arguments to be instantiated by this builder. + */ +public class FlintIndexMetadataServiceBuilder { + public static FlintIndexMetadataService build(FlintOptions options) { + String className = options.getCustomFlintIndexMetadataServiceClass(); + if (className.isEmpty()) { + return new FlintOpenSearchIndexMetadataService(options); + } + + // Attempts to instantiate Flint index metadata service using reflection + try { + Class flintIndexMetadataServiceClass = Class.forName(className); + Constructor constructor = flintIndexMetadataServiceClass.getConstructor(); + return (FlintIndexMetadataService) constructor.newInstance(); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate FlintIndexMetadataService: " + className, e); + } + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java index 9ec4ac2c4..fc89eea9f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java @@ -6,7 +6,6 @@ package org.opensearch.flint.core.metadata.log; import java.lang.reflect.Constructor; -import org.apache.spark.SparkConf; import org.opensearch.flint.common.metadata.log.FlintMetadataLogService; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService; @@ -15,21 +14,20 @@ * {@link FlintMetadataLogService} builder. *

* Custom implementations of {@link FlintMetadataLogService} are expected to provide a public - * constructor with the signature {@code public MyCustomService(SparkConf sparkConf)} to be - * instantiated by this builder. + * constructor with no arguments to be instantiated by this builder. */ public class FlintMetadataLogServiceBuilder { - public static FlintMetadataLogService build(FlintOptions options, SparkConf sparkConf) { + public static FlintMetadataLogService build(FlintOptions options) { String className = options.getCustomFlintMetadataLogServiceClass(); if (className.isEmpty()) { return new FlintOpenSearchMetadataLogService(options); } - // Attempts to instantiate Flint metadata log service with sparkConf using reflection + // Attempts to instantiate Flint metadata log service using reflection try { Class flintMetadataLogServiceClass = Class.forName(className); - Constructor constructor = flintMetadataLogServiceClass.getConstructor(SparkConf.class); - return (FlintMetadataLogService) constructor.newInstance(sparkConf); + Constructor constructor = flintMetadataLogServiceClass.getConstructor(); + return (FlintMetadataLogService) constructor.newInstance(); } catch (Exception e) { throw new RuntimeException("Failed to instantiate FlintMetadataLogService: " + className, e); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 1a7c976c2..affcd0e36 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -9,25 +9,15 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.GetIndexRequest; -import org.opensearch.client.indices.GetIndexResponse; -import org.opensearch.client.indices.PutMappingRequest; -import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flint.common.metadata.FlintMetadata; import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; -import org.opensearch.flint.core.metadata.FlintMetadata; import scala.Option; import java.io.IOException; -import java.util.Arrays; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.logging.Logger; -import java.util.stream.Collectors; /** * Flint client implementation for OpenSearch storage. @@ -36,13 +26,6 @@ public class FlintOpenSearchClient implements FlintClient { private static final Logger LOG = Logger.getLogger(FlintOpenSearchClient.class.getName()); - /** - * Invalid index name characters to percent-encode, - * excluding '*' because it's reserved for pattern matching. - */ - private final static Set INVALID_INDEX_NAME_CHARS = - Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<'); - private final FlintOptions options; public FlintOpenSearchClient(FlintOptions options) { @@ -52,7 +35,7 @@ public FlintOpenSearchClient(FlintOptions options) { @Override public void createIndex(String indexName, FlintMetadata metadata) { LOG.info("Creating Flint index " + indexName + " with metadata " + metadata); - createIndex(indexName, metadata.getContent(), metadata.indexSettings()); + createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings()); } protected void createIndex(String indexName, String mapping, Option settings) { @@ -81,58 +64,6 @@ public boolean exists(String indexName) { } } - @Override - public Map getAllIndexMetadata(String... indexNamePattern) { - LOG.info("Fetching all Flint index metadata for pattern " + String.join(",", indexNamePattern)); - String[] indexNames = - Arrays.stream(indexNamePattern).map(this::sanitizeIndexName).toArray(String[]::new); - try (IRestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(indexNames); - GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); - - return Arrays.stream(response.getIndices()) - .collect(Collectors.toMap( - index -> index, - index -> FlintMetadata.apply( - response.getMappings().get(index).source().toString(), - response.getSettings().get(index).toString() - ) - )); - } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + - String.join(",", indexNames), e); - } - } - - @Override - public FlintMetadata getIndexMetadata(String indexName) { - LOG.info("Fetching Flint index metadata for " + indexName); - String osIndexName = sanitizeIndexName(indexName); - try (IRestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(osIndexName); - GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); - - MappingMetadata mapping = response.getMappings().get(osIndexName); - Settings settings = response.getSettings().get(osIndexName); - return FlintMetadata.apply(mapping.source().string(), settings.toString()); - } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); - } - } - - @Override - public void updateIndex(String indexName, FlintMetadata metadata) { - LOG.info("Updating Flint index " + indexName + " with metadata " + metadata); - String osIndexName = sanitizeIndexName(indexName); - try (IRestHighLevelClient client = createClient()) { - PutMappingRequest request = new PutMappingRequest(osIndexName); - request.source(metadata.getContent(), XContentType.JSON); - client.updateIndexMapping(request, RequestOptions.DEFAULT); - } catch (Exception e) { - throw new IllegalStateException("Failed to update Flint index " + osIndexName, e); - } - } - @Override public void deleteIndex(String indexName) { LOG.info("Deleting Flint index " + indexName); @@ -157,40 +88,7 @@ public IRestHighLevelClient createClient() { return OpenSearchClientUtils.createClient(options); } - /* - * Because OpenSearch requires all lowercase letters in index name, we have to - * lowercase all letters in the given Flint index name. - */ - private String toLowercase(String indexName) { - Objects.requireNonNull(indexName); - - return indexName.toLowerCase(Locale.ROOT); - } - - /* - * Percent-encode invalid OpenSearch index name characters. - */ - private String percentEncode(String indexName) { - Objects.requireNonNull(indexName); - - StringBuilder builder = new StringBuilder(indexName.length()); - for (char ch : indexName.toCharArray()) { - if (INVALID_INDEX_NAME_CHARS.contains(ch)) { - builder.append(String.format("%%%02X", (int) ch)); - } else { - builder.append(ch); - } - } - return builder.toString(); - } - - /* - * Sanitize index name to comply with OpenSearch index name restrictions. - */ private String sanitizeIndexName(String indexName) { - Objects.requireNonNull(indexName); - - String encoded = percentEncode(indexName); - return toLowercase(encoded); + return OpenSearchClientUtils.sanitizeIndexName(indexName); } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala new file mode 100644 index 000000000..fad2f1b63 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala @@ -0,0 +1,203 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage + +import java.util + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.{GetIndexRequest, GetIndexResponse, PutMappingRequest} +import org.opensearch.common.xcontent.XContentType +import org.opensearch.flint.common.FlintVersion +import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} +import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.IRestHighLevelClient +import org.opensearch.flint.core.metadata.FlintJsonHelper._ + +import org.apache.spark.internal.Logging + +class FlintOpenSearchIndexMetadataService(options: FlintOptions) + extends FlintIndexMetadataService + with Logging { + + override def getIndexMetadata(indexName: String): FlintMetadata = { + logInfo(s"Fetching Flint index metadata for $indexName") + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName) + var client: IRestHighLevelClient = null + try { + client = OpenSearchClientUtils.createClient(options) + val request = new GetIndexRequest(osIndexName) + val response = client.getIndex(request, RequestOptions.DEFAULT) + val mapping = response.getMappings.get(osIndexName) + val settings = response.getSettings.get(osIndexName) + FlintOpenSearchIndexMetadataService.deserialize(mapping.source.string, settings.toString) + } catch { + case e: Exception => + throw new IllegalStateException( + "Failed to get Flint index metadata for " + osIndexName, + e) + } finally + if (client != null) { + client.close() + } + } + + override def getAllIndexMetadata(indexNamePattern: String*): util.Map[String, FlintMetadata] = { + logInfo(s"Fetching all Flint index metadata for pattern ${indexNamePattern.mkString(",")}"); + val indexNames = indexNamePattern.map(OpenSearchClientUtils.sanitizeIndexName) + var client: IRestHighLevelClient = null + try { + client = OpenSearchClientUtils.createClient(options) + val request = new GetIndexRequest(indexNames: _*) + val response: GetIndexResponse = client.getIndex(request, RequestOptions.DEFAULT) + + response.getIndices + .map(index => + index -> FlintOpenSearchIndexMetadataService.deserialize( + response.getMappings.get(index).source().string(), + response.getSettings.get(index).toString)) + .toMap + .asJava + } catch { + case e: Exception => + throw new IllegalStateException( + s"Failed to get Flint index metadata for ${indexNames.mkString(",")}", + e) + } finally + if (client != null) { + client.close() + } + } + + override def updateIndexMetadata(indexName: String, metadata: FlintMetadata): Unit = { + logInfo(s"Updating Flint index $indexName with metadata $metadata"); + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName) + var client: IRestHighLevelClient = null + try { + client = OpenSearchClientUtils.createClient(options) + val request = new PutMappingRequest(osIndexName) + request.source(FlintOpenSearchIndexMetadataService.serialize(metadata), XContentType.JSON) + client.updateIndexMapping(request, RequestOptions.DEFAULT) + } catch { + case e: Exception => + throw new IllegalStateException(s"Failed to update Flint index $osIndexName", e) + } finally + if (client != null) { + client.close() + } + } + + // Do nothing. For OpenSearch, deleting the index will also delete its metadata + override def deleteIndexMetadata(indexName: String): Unit = {} +} + +object FlintOpenSearchIndexMetadataService { + + def serialize(metadata: FlintMetadata): String = { + serialize(metadata, true) + } + + /** + * Generate JSON content as index metadata. + * + * @param metadata + * Flint index metadata + * @param includeSpec + * Whether to include _meta field in the JSON content for Flint index specification + * @return + * JSON content + */ + def serialize(metadata: FlintMetadata, includeSpec: Boolean): String = { + try { + buildJson(builder => { + if (includeSpec) { + // Add _meta field + objectField(builder, "_meta") { + builder + .field("version", metadata.version.version) + .field("name", metadata.name) + .field("kind", metadata.kind) + .field("source", metadata.source) + .field("indexedColumns", metadata.indexedColumns) + + if (metadata.latestId.isDefined) { + builder.field("latestId", metadata.latestId.get) + } + optionalObjectField(builder, "options", metadata.options) + optionalObjectField(builder, "properties", metadata.properties) + } + } + + // Add properties (schema) field + builder.field("properties", metadata.schema) + }) + } catch { + case e: Exception => + throw new IllegalStateException("Failed to jsonify Flint metadata", e) + } + } + + /** + * Construct Flint metadata with JSON content and index settings. + * + * @param content + * JSON content + * @param settings + * index settings + * @return + * Flint metadata + */ + def deserialize(content: String, settings: String): FlintMetadata = { + val metadata = deserialize(content) + metadata.copy(indexSettings = Option(settings)) + } + + /** + * Parse the given JSON content and construct Flint metadata class. + * + * @param content + * JSON content + * @return + * Flint metadata + */ + def deserialize(content: String): FlintMetadata = { + try { + val builder = new FlintMetadata.Builder() + parseJson(content) { (parser, fieldName) => + { + fieldName match { + case "_meta" => + parseObjectField(parser) { (parser, innerFieldName) => + { + innerFieldName match { + case "version" => builder.version(FlintVersion.apply(parser.text())) + case "name" => builder.name(parser.text()) + case "kind" => builder.kind(parser.text()) + case "source" => builder.source(parser.text()) + case "indexedColumns" => + parseArrayField(parser) { + builder.addIndexedColumn(parser.map()) + } + case "options" => builder.options(parser.map()) + case "properties" => builder.properties(parser.map()) + case _ => // Handle other fields as needed + } + } + } + case "properties" => + builder.schema(parser.map()) + case _ => // Ignore other fields, for instance, dynamic. + } + } + } + builder.build() + } catch { + case e: Exception => + throw new IllegalStateException("Failed to parse metadata JSON", e) + } + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 7944de5ae..8c327b664 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -40,7 +40,7 @@ * - entryVersion: * - seqNo (Long): OpenSearch sequence number * - primaryTerm (Long): OpenSearch primary term - * - storageContext: + * - properties: * - dataSourceName (String): OpenSearch data source associated */ public class FlintOpenSearchMetadataLog implements FlintMetadataLog { @@ -67,7 +67,8 @@ public FlintOpenSearchMetadataLog(FlintOptions options, String flintIndexName, S this.options = options; this.metadataLogIndexName = metadataLogIndexName; this.dataSourceName = options.getDataSourceName(); - this.latestId = Base64.getEncoder().encodeToString(flintIndexName.getBytes()); + String osIndexName = OpenSearchClientUtils.sanitizeIndexName(flintIndexName); + this.latestId = Base64.getEncoder().encodeToString(osIndexName.getBytes()); } @Override diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java index a29c9f94e..0f80d07c9 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java @@ -8,6 +8,9 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import java.lang.reflect.Constructor; +import java.util.Locale; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -35,6 +38,13 @@ public class OpenSearchClientUtils { */ public final static String META_LOG_NAME_PREFIX = ".query_execution_request"; + /** + * Invalid index name characters to percent-encode, + * excluding '*' because it's reserved for pattern matching. + */ + private final static Set INVALID_INDEX_NAME_CHARS = + Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<'); + /** * Used in IT. */ @@ -62,6 +72,43 @@ public static IRestHighLevelClient createClient(FlintOptions options) { BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(options)); } + /** + * Sanitize index name to comply with OpenSearch index name restrictions. + */ + public static String sanitizeIndexName(String indexName) { + Objects.requireNonNull(indexName); + + String encoded = percentEncode(indexName); + return toLowercase(encoded); + } + + /** + * Because OpenSearch requires all lowercase letters in index name, we have to + * lowercase all letters in the given Flint index name. + */ + private static String toLowercase(String indexName) { + Objects.requireNonNull(indexName); + + return indexName.toLowerCase(Locale.ROOT); + } + + /** + * Percent-encode invalid OpenSearch index name characters. + */ + private static String percentEncode(String indexName) { + Objects.requireNonNull(indexName); + + StringBuilder builder = new StringBuilder(indexName.length()); + for (char ch : indexName.toCharArray()) { + if (INVALID_INDEX_NAME_CHARS.contains(ch)) { + builder.append(String.format("%%%02X", (int) ch)); + } else { + builder.append(ch); + } + } + return builder.toString(); + } + private static RestClientBuilder configureSigV4Auth(RestClientBuilder restClientBuilder, FlintOptions options) { // Use DefaultAWSCredentialsProviderChain by default. final AtomicReference customAWSCredentialsProvider = diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchCluster.java b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchCluster.java new file mode 100644 index 000000000..2736177cc --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchCluster.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.table; + +import org.opensearch.client.RequestOptions; +import org.opensearch.client.indices.GetIndexRequest; +import org.opensearch.client.indices.GetIndexResponse; +import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.core.IRestHighLevelClient; +import org.opensearch.flint.core.MetaData; +import org.opensearch.flint.core.storage.OpenSearchClientUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +public class OpenSearchCluster { + + private static final Logger LOG = Logger.getLogger(OpenSearchCluster.class.getName()); + + /** + * Creates list of OpenSearchIndexTable instance of indices in OpenSearch domain. + * + * @param indexName + * tableName support (1) single index name. (2) wildcard index name. (3) comma sep index name. + * @param options + * The options for Flint. + * @return + * A list of OpenSearchIndexTable instance. + */ + public static List apply(String indexName, FlintOptions options) { + return getAllOpenSearchTableMetadata(options, indexName.split(",")) + .stream() + .map(metadata -> new OpenSearchIndexTable(metadata, options)) + .collect(Collectors.toList()); + } + + /** + * Retrieve all metadata for OpenSearch table whose name matches the given pattern. + * + * @param options The options for Flint. + * @param indexNamePattern index name pattern + * @return list of OpenSearch table metadata + */ + public static List getAllOpenSearchTableMetadata(FlintOptions options, String... indexNamePattern) { + LOG.info("Fetching all OpenSearch table metadata for pattern " + String.join(",", indexNamePattern)); + String[] indexNames = + Arrays.stream(indexNamePattern).map(OpenSearchClientUtils::sanitizeIndexName).toArray(String[]::new); + try (IRestHighLevelClient client = OpenSearchClientUtils.createClient(options)) { + GetIndexRequest request = new GetIndexRequest(indexNames); + GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); + + return Arrays.stream(response.getIndices()) + .map(index -> new MetaData( + index, + response.getMappings().get(index).source().string(), + response.getSettings().get(index).toString())) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new IllegalStateException("Failed to get OpenSearch table metadata for " + + String.join(",", indexNames), e); + } + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala index 783163687..57c770eb8 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/table/OpenSearchIndexTable.scala @@ -5,8 +5,6 @@ package org.opensearch.flint.core.table -import scala.collection.JavaConverters._ - import org.json4s.{Formats, NoTypeHints} import org.json4s.JsonAST.JString import org.json4s.jackson.JsonMethods @@ -146,28 +144,3 @@ object OpenSearchIndexTable { */ val maxSplitSizeBytes = 10 * 1024 * 1024 } - -object OpenSearchCluster { - - /** - * Creates list of OpenSearchIndexTable instance of indices in OpenSearch domain. - * - * @param indexName - * tableName support (1) single index name. (2) wildcard index name. (3) comma sep index name. - * @param options - * The options for Flint. - * @return - * An list of OpenSearchIndexTable instance. - */ - def apply(indexName: String, options: FlintOptions): Seq[OpenSearchIndexTable] = { - val client = FlintClientBuilder.build(options) - client - .getAllIndexMetadata(indexName.split(","): _*) - .asScala - .toMap - .map(entry => { - new OpenSearchIndexTable(MetaData.apply(entry._1, entry._2), options) - }) - .toSeq - } -} diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataServiceSuite.scala similarity index 65% rename from flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala rename to flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataServiceSuite.scala index 01b4e266c..f1ed09531 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataServiceSuite.scala @@ -3,16 +3,17 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.core.metadata +package org.opensearch.flint.core.storage import scala.collection.JavaConverters.mapAsJavaMapConverter import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson -import org.opensearch.flint.core.FlintVersion.current +import org.opensearch.flint.common.FlintVersion.current +import org.opensearch.flint.common.metadata.FlintMetadata import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -class FlintMetadataSuite extends AnyFlatSpec with Matchers { +class FlintOpenSearchIndexMetadataServiceSuite extends AnyFlatSpec with Matchers { /** Test Flint index meta JSON string */ val testMetadataJson: String = s""" @@ -60,6 +61,16 @@ class FlintMetadataSuite extends AnyFlatSpec with Matchers { | } |""".stripMargin + val testNoSpec: String = s""" + | { + | "properties": { + | "test_field": { + | "type": "os_type" + | } + | } + | } + |""".stripMargin + val testIndexSettingsJson: String = """ | { "number_of_shards": 3 } @@ -67,7 +78,8 @@ class FlintMetadataSuite extends AnyFlatSpec with Matchers { "constructor" should "deserialize the given JSON and assign parsed value to field" in { Seq(testMetadataJson, testDynamic).foreach(mapping => { - val metadata = FlintMetadata(mapping, testIndexSettingsJson) + val metadata = + FlintOpenSearchIndexMetadataService.deserialize(mapping, testIndexSettingsJson) metadata.version shouldBe current() metadata.name shouldBe "test_index" metadata.kind shouldBe "test_kind" @@ -77,15 +89,27 @@ class FlintMetadataSuite extends AnyFlatSpec with Matchers { }) } - "getContent" should "serialize all fields to JSON" in { + "serialize" should "serialize all fields to JSON" in { + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) + + val metadata = builder.build() + FlintOpenSearchIndexMetadataService.serialize(metadata) should matchJson(testMetadataJson) + } + + "serialize without spec" should "serialize all fields to JSON without adding _meta field" in { val builder = new FlintMetadata.Builder builder.name("test_index") builder.kind("test_kind") builder.source("test_source_table") - builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava); - builder.schema("""{"properties": {"test_field": {"type": "os_type"}}}""") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) val metadata = builder.build() - metadata.getContent should matchJson(testMetadataJson) + FlintOpenSearchIndexMetadataService.serialize(metadata, false) should matchJson(testNoSpec) } } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchClientUtilsSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchClientUtilsSuite.scala new file mode 100644 index 000000000..abcf9edf8 --- /dev/null +++ b/flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchClientUtilsSuite.scala @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class OpenSearchClientUtilsSuite extends AnyFlatSpec with Matchers { + + "sanitizeIndexName" should "percent-encode invalid OpenSearch index name characters and lowercase all characters" in { + val indexName = "TEST :\"+/\\|?#><" + val sanitizedIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName) + sanitizedIndexName shouldBe "test%20%3a%22%2b%2f%5c%7c%3f%23%3e%3c" + } +} diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala index ed6902841..e9f6f5ea1 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala @@ -7,6 +7,8 @@ package org.apache.spark.sql.flint import java.util +import scala.collection.JavaConverters._ + import org.opensearch.flint.core.table.OpenSearchCluster import org.apache.spark.sql.SparkSession @@ -39,7 +41,7 @@ class FlintReadOnlyTable( lazy val name: String = flintSparkConf.tableName() lazy val tables: Seq[org.opensearch.flint.core.Table] = - OpenSearchCluster.apply(name, flintSparkConf.flintOptions()) + OpenSearchCluster.apply(name, flintSparkConf.flintOptions()).asScala lazy val resolvedTablesSchema: StructType = tables.headOption .map(tbl => FlintDataType.deserialize(tbl.schema().asJson())) diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index b1351e205..1d12d004e 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -193,10 +193,16 @@ object FlintSparkConf { .doc("data source name") .createOptional() val CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = - FlintConfig(FlintOptions.CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS) + FlintConfig(s"spark.datasource.flint.${FlintOptions.CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS}") .datasourceOption() .doc("custom Flint metadata log service class") .createOptional() + val CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS = + FlintConfig( + s"spark.datasource.flint.${FlintOptions.CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS}") + .datasourceOption() + .doc("custom Flint index metadata service class") + .createOptional() val QUERY = FlintConfig("spark.flint.job.query") .doc("Flint query for batch and streaming job") @@ -298,6 +304,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable RETRYABLE_EXCEPTION_CLASS_NAMES, DATA_SOURCE_NAME, CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, + CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS, SESSION_ID, REQUEST_INDEX, METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER, diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index e9913ccd7..3eb36010e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -9,12 +9,13 @@ import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization +import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.common.metadata.log.FlintMetadataLogService import org.opensearch.flint.common.metadata.log.OptimisticTransaction import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} -import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ @@ -47,10 +48,12 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w /** Flint client for low-level index operation */ private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions()) + private val flintIndexMetadataService: FlintIndexMetadataService = { + FlintIndexMetadataServiceBuilder.build(flintSparkConf.flintOptions()) + } + override protected val flintMetadataLogService: FlintMetadataLogService = { - FlintMetadataLogServiceBuilder.build( - flintSparkConf.flintOptions(), - spark.sparkContext.getConf) + FlintMetadataLogServiceBuilder.build(flintSparkConf.flintOptions()) } /** Required by json4s parse function */ @@ -114,9 +117,12 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .commit(latest => if (latest == null) { // in case transaction capability is disabled flintClient.createIndex(indexName, metadata) + flintIndexMetadataService.updateIndexMetadata(indexName, metadata) } else { logInfo(s"Creating index with metadata log entry ID ${latest.id}") flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id))) + flintIndexMetadataService + .updateIndexMetadata(indexName, metadata.copy(latestId = Some(latest.id))) }) } } @@ -163,7 +169,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = { logInfo(s"Describing indexes with pattern $indexNamePattern") if (flintClient.exists(indexNamePattern)) { - flintClient + flintIndexMetadataService .getAllIndexMetadata(indexNamePattern) .asScala .map { case (indexName, metadata) => @@ -187,7 +193,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w def describeIndex(indexName: String): Option[FlintSparkIndex] = { logInfo(s"Describing index name $indexName") if (flintClient.exists(indexName)) { - val metadata = flintClient.getIndexMetadata(indexName) + val metadata = flintIndexMetadataService.getIndexMetadata(indexName) val metadataWithEntry = attachLatestLogEntry(indexName, metadata) FlintSparkIndexFactory.create(metadataWithEntry) } else { @@ -267,6 +273,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .finalLog(_ => NO_LOG_ENTRY) .commit(_ => { flintClient.deleteIndex(indexName) + flintIndexMetadataService.deleteIndexMetadata(indexName) true }) } else { @@ -428,7 +435,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .transientLog(latest => latest.copy(state = UPDATING)) .finalLog(latest => latest.copy(state = ACTIVE)) .commit(_ => { - flintClient.updateIndex(indexName, index.metadata) + flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata) logInfo("Update index options complete") flintIndexMonitor.stopMonitor(indexName) stopRefreshingJob(indexName) @@ -453,7 +460,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w latest.copy(state = REFRESHING) }) .commit(_ => { - flintClient.updateIndex(indexName, index.metadata) + flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata) logInfo("Update index options complete") indexRefresh.start(spark, flintSparkConf) }) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 34c2ae452..44ea5188f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -5,10 +5,11 @@ package org.opensearch.flint.spark -import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} +import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.core.metadata.FlintJsonHelper._ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.flint.datatype.FlintDataType @@ -176,4 +177,18 @@ object FlintSparkIndex { val structType = StructType.fromDDL(catalogDDL) FlintDataType.serialize(structType) } + + def generateSchema(allFieldTypes: Map[String, String]): Map[String, AnyRef] = { + val schemaJson = generateSchemaJSON(allFieldTypes) + var schemaMap: Map[String, AnyRef] = Map.empty + + parseJson(schemaJson) { (parser, fieldName) => + fieldName match { + case "properties" => schemaMap = parser.map().asScala.toMap + case _ => // do nothing + } + } + + schemaMap + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index aa3c23360..6c34e00e1 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -9,7 +9,7 @@ import java.util.Collections import scala.collection.JavaConverters.mapAsScalaMapConverter -import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE import org.opensearch.flint.spark.mv.FlintSparkMaterializedView diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 0fade2ee7..8748bf874 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -7,10 +7,10 @@ package org.opensearch.flint.spark.covering import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark._ -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName} +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchema, metadataBuilder, quotedTableName} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} @@ -53,13 +53,13 @@ case class FlintSparkCoveringIndex( Map[String, AnyRef]("columnName" -> colName, "columnType" -> colType).asJava }.toArray } - val schemaJson = generateSchemaJSON(indexedColumns) + val schema = generateSchema(indexedColumns).asJava val builder = metadataBuilder(this) .name(indexName) .source(tableName) .indexedColumns(indexColumnMaps) - .schema(schemaJson) + .schema(schema) // Add optional index properties filterCondition.map(builder.addProperty("filterCondition", _)) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 48dfee50a..caa75be75 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -10,10 +10,10 @@ import java.util.Locale import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.collection.convert.ImplicitConversions.`map AsScala` +import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, StreamingRefresh} +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchema, metadataBuilder, StreamingRefresh} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.function.TumbleFunction import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE} @@ -59,13 +59,13 @@ case class FlintSparkMaterializedView( outputSchema.map { case (colName, colType) => Map[String, AnyRef]("columnName" -> colName, "columnType" -> colType).asJava }.toArray - val schemaJson = generateSchemaJSON(outputSchema) + val schema = generateSchema(outputSchema).asJava metadataBuilder(this) .name(mvName) .source(query) .indexedColumns(indexColumnMaps) - .schema(schemaJson) + .schema(schema) .build() } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index da73ea01e..b6f21e455 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -7,8 +7,8 @@ package org.opensearch.flint.spark.skipping import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark._ import org.opensearch.flint.spark.FlintSparkIndex._ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty @@ -65,13 +65,13 @@ case class FlintSparkSkippingIndex( indexedColumns .flatMap(_.outputSchema()) .toMap + (FILE_PATH_COLUMN -> "string") - val schemaJson = generateSchemaJSON(fieldTypes) + val schema = generateSchema(fieldTypes).asJava metadataBuilder(this) .name(name()) .source(tableName) .indexedColumns(indexColumnMaps) - .schema(schemaJson) + .schema(schema) .build() } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index 3a6802704..2c5518778 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -7,13 +7,14 @@ package org.opensearch.flint.spark.covering import scala.collection.JavaConverters._ -import org.mockito.ArgumentMatchers.any +import org.mockito.ArgumentMatchers.{any, eq => mockitoEq} import org.mockito.Mockito.{mockStatic, when, RETURNS_DEEP_STUBS} import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState} -import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions, IRestHighLevelClient} -import org.opensearch.flint.core.storage.OpenSearchClientUtils +import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions, MetaData} +import org.opensearch.flint.core.table.OpenSearchCluster import org.opensearch.flint.spark.FlintSpark +import org.opensearch.flint.spark.FlintSparkIndex.generateSchemaJSON import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.scalatest.matchers.{Matcher, MatchResult} import org.scalatest.matchers.should.Matchers @@ -34,9 +35,8 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { private val clientBuilder = mockStatic(classOf[FlintClientBuilder]) private val client = mock[FlintClient](RETURNS_DEEP_STUBS) - /** Mock IRestHighLevelClient to avoid looking for real OpenSearch cluster */ - private val clientUtils = mockStatic(classOf[OpenSearchClientUtils]) - private val openSearchClient = mock[IRestHighLevelClient](RETURNS_DEEP_STUBS) + /** Mock OpenSearchCluster to avoid looking for real OpenSearch cluster */ + private val openSearchCluster = mockStatic(classOf[OpenSearchCluster]) /** Mock FlintSpark which is required by the rule. Deep stub required to replace spark val. */ private val flint = mock[FlintSpark](RETURNS_DEEP_STUBS) @@ -59,16 +59,17 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { clientBuilder .when(() => FlintClientBuilder.build(any(classOf[FlintOptions]))) .thenReturn(client) - when(flint.spark).thenReturn(spark) // Mock static - clientUtils - .when(() => OpenSearchClientUtils.createClient(any(classOf[FlintOptions]))) - .thenReturn(openSearchClient) + openSearchCluster + .when(() => OpenSearchCluster.apply(any(classOf[String]), any(classOf[FlintOptions]))) + .thenCallRealMethod() + when(flint.spark).thenReturn(spark) } override protected def afterAll(): Unit = { sql(s"DROP TABLE $testTable") clientBuilder.close() + openSearchCluster.close() super.afterAll() } @@ -274,8 +275,18 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { }) indexes.foreach { index => - when(client.getAllIndexMetadata(index.name())) - .thenReturn(Map.apply(index.name() -> index.metadata()).asJava) + { + openSearchCluster + .when(() => + OpenSearchCluster + .getAllOpenSearchTableMetadata(any(classOf[FlintOptions]), mockitoEq(index.name))) + .thenReturn( + Seq( + MetaData( + index.name, + generateSchemaJSON(index.indexedColumns), + index.metadata.indexSettings.getOrElse(""))).asJava) + } } rule.apply(plan) } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index 6772eb8f3..1b332660e 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -11,7 +11,8 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.json4s.native.JsonMethods.parse import org.mockito.Mockito.when -import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.common.metadata.FlintMetadata +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind @@ -263,7 +264,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { } private def schemaShouldMatch(metadata: FlintMetadata, expected: String): Unit = { - val actual = parse(metadata.getContent) \ "properties" + val actual = parse(FlintOpenSearchIndexMetadataService.serialize(metadata)) \ "properties" assert(actual == parse(expected)) } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index eadc5031a..9aeba7512 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -19,8 +19,7 @@ import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers -import org.apache.spark.SparkConf -import org.apache.spark.sql.flint.config.FlintSparkConf.{CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, DATA_SOURCE_NAME} +import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { @@ -46,18 +45,19 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { test("should build metadata log service") { val customOptions = - openSearchOptions + (CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS.key -> "org.opensearch.flint.core.TestMetadataLogService") + openSearchOptions + (FlintOptions.CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS -> "org.opensearch.flint.core.TestMetadataLogService") val customFlintOptions = new FlintOptions(customOptions.asJava) val customFlintMetadataLogService = - FlintMetadataLogServiceBuilder.build(customFlintOptions, sparkConf) + FlintMetadataLogServiceBuilder.build(customFlintOptions) customFlintMetadataLogService shouldBe a[TestMetadataLogService] } test("should fail to build metadata log service if class name doesn't exist") { - val options = openSearchOptions + (CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS.key -> "dummy") + val options = + openSearchOptions + (FlintOptions.CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS -> "dummy") val flintOptions = new FlintOptions(options.asJava) the[RuntimeException] thrownBy { - FlintMetadataLogServiceBuilder.build(flintOptions, sparkConf) + FlintMetadataLogServiceBuilder.build(flintOptions) } } @@ -118,7 +118,7 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { } } -case class TestMetadataLogService(sparkConf: SparkConf) extends FlintMetadataLogService { +class TestMetadataLogService extends FlintMetadataLogService { override def startTransaction[T]( indexName: String, forceInit: Boolean): OptimisticTransaction[T] = { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 53188fb5a..2dc6016b2 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -10,23 +10,20 @@ import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization -import org.mockito.Mockito.when import org.opensearch.flint.OpenSearchSuite -import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.storage.FlintOpenSearchClient +import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService} import org.opensearch.flint.core.table.OpenSearchCluster import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with Matchers { - lazy val options = new FlintOptions(openSearchOptions.asJava) - /** Lazy initialize after container started. */ - lazy val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + lazy val options = new FlintOptions(openSearchOptions.asJava) + lazy val flintClient = new FlintOpenSearchClient(options) + lazy val flintIndexMetadataService = new FlintOpenSearchIndexMetadataService(options) behavior of "Flint OpenSearch client" @@ -45,98 +42,39 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M | } |""".stripMargin - val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn(content) - when(metadata.indexSettings).thenReturn(None) + val metadata = FlintOpenSearchIndexMetadataService.deserialize(content) flintClient.createIndex(indexName, metadata) + flintIndexMetadataService.updateIndexMetadata(indexName, metadata) flintClient.exists(indexName) shouldBe true - flintClient.getIndexMetadata(indexName).kind shouldBe "test_kind" + flintIndexMetadataService.getIndexMetadata(indexName).kind shouldBe "test_kind" } it should "create index with settings" in { val indexName = "flint_test_with_settings" val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" - val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") - when(metadata.indexSettings).thenReturn(Some(indexSettings)) + val metadata = FlintOpenSearchIndexMetadataService.deserialize("{}", indexSettings) flintClient.createIndex(indexName, metadata) flintClient.exists(indexName) shouldBe true // OS uses full setting name ("index" prefix) and store as string implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(indexName).indexSettings.get) + val settings = parse(flintIndexMetadataService.getIndexMetadata(indexName).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } - it should "update index successfully" in { - val indexName = "test_update" - val content = - """ { - | "_meta": { - | "kind": "test_kind" - | }, - | "properties": { - | "age": { - | "type": "integer" - | } - | } - | } - |""".stripMargin - - val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn(content) - when(metadata.indexSettings).thenReturn(None) - flintClient.createIndex(indexName, metadata) - - val newContent = - """ { - | "_meta": { - | "kind": "test_kind", - | "name": "test_name" - | }, - | "properties": { - | "age": { - | "type": "integer" - | } - | } - | } - |""".stripMargin - - val newMetadata = mock[FlintMetadata] - when(newMetadata.getContent).thenReturn(newContent) - when(newMetadata.indexSettings).thenReturn(None) - flintClient.updateIndex(indexName, newMetadata) - - flintClient.exists(indexName) shouldBe true - flintClient.getIndexMetadata(indexName).kind shouldBe "test_kind" - flintClient.getIndexMetadata(indexName).name shouldBe "test_name" - } - - it should "get all index metadata with the given index name pattern" in { - val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") - when(metadata.indexSettings).thenReturn(None) - flintClient.createIndex("flint_test_1_index", metadata) - flintClient.createIndex("flint_test_2_index", metadata) - - val allMetadata = flintClient.getAllIndexMetadata("flint_*_index") - allMetadata should have size 2 - allMetadata.values.forEach(metadata => metadata.getContent should not be empty) - allMetadata.values.forEach(metadata => metadata.indexSettings should not be empty) - } - it should "convert index name to all lowercase" in { val indexName = "flint_ELB_logs_index" flintClient.createIndex( indexName, - FlintMetadata("""{"properties": {"test": { "type": "integer" } } }""")) + FlintOpenSearchIndexMetadataService.deserialize( + """{"properties": {"test": { "type": "integer" } } }""")) flintClient.exists(indexName) shouldBe true - flintClient.getIndexMetadata(indexName) should not be null - flintClient.getAllIndexMetadata("flint_ELB_*") should not be empty + flintIndexMetadataService.getIndexMetadata(indexName) should not be null + flintIndexMetadataService.getAllIndexMetadata("flint_ELB_*") should not be empty // Read write test val writer = flintClient.createWriter(indexName) @@ -156,11 +94,12 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val indexName = "test :\"+/\\|?#><" flintClient.createIndex( indexName, - FlintMetadata("""{"properties": {"test": { "type": "integer" } } }""")) + FlintOpenSearchIndexMetadataService.deserialize( + """{"properties": {"test": { "type": "integer" } } }""")) flintClient.exists(indexName) shouldBe true - flintClient.getIndexMetadata(indexName) should not be null - flintClient.getAllIndexMetadata("test *") should not be empty + flintIndexMetadataService.getIndexMetadata(indexName) should not be null + flintIndexMetadataService.getAllIndexMetadata("test *") should not be empty // Read write test val writer = flintClient.createWriter(indexName) @@ -268,6 +207,6 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M } def createTable(indexName: String, options: FlintOptions): Table = { - OpenSearchCluster.apply(indexName, options).head + OpenSearchCluster.apply(indexName, options).asScala.head } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchIndexMetadataServiceITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchIndexMetadataServiceITSuite.scala new file mode 100644 index 000000000..c5bd75951 --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchIndexMetadataServiceITSuite.scala @@ -0,0 +1,122 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core + +import java.util + +import scala.collection.JavaConverters._ + +import org.opensearch.flint.OpenSearchSuite +import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} +import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder +import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class FlintOpenSearchIndexMetadataServiceITSuite + extends AnyFlatSpec + with OpenSearchSuite + with Matchers { + + /** Lazy initialize after container started. */ + lazy val options = new FlintOptions(openSearchOptions.asJava) + lazy val flintClient = new FlintOpenSearchClient(options) + lazy val flintIndexMetadataService = new FlintOpenSearchIndexMetadataService(options) + + behavior of "Flint index metadata service builder" + + it should "build index metadata service" in { + val customOptions = + openSearchOptions + (FlintOptions.CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS -> "org.opensearch.flint.core.TestIndexMetadataService") + val customFlintOptions = new FlintOptions(customOptions.asJava) + val customFlintIndexMetadataService = + FlintIndexMetadataServiceBuilder.build(customFlintOptions) + customFlintIndexMetadataService shouldBe a[TestIndexMetadataService] + } + + it should "fail to build index metadata service if class name doesn't exist" in { + val options = + openSearchOptions + (FlintOptions.CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS -> "dummy") + val flintOptions = new FlintOptions(options.asJava) + the[RuntimeException] thrownBy { + FlintIndexMetadataServiceBuilder.build(flintOptions) + } + } + + behavior of "Flint OpenSearch index metadata service" + + it should "get all index metadata with the given index name pattern" in { + val metadata = FlintOpenSearchIndexMetadataService.deserialize("{}") + flintClient.createIndex("flint_test_1_index", metadata) + flintClient.createIndex("flint_test_2_index", metadata) + + val allMetadata = flintIndexMetadataService.getAllIndexMetadata("flint_*_index") + allMetadata should have size 2 + allMetadata.values.forEach(metadata => + FlintOpenSearchIndexMetadataService.serialize(metadata) should not be empty) + allMetadata.values.forEach(metadata => metadata.indexSettings should not be empty) + } + + it should "update index metadata successfully" in { + val indexName = "test_update" + val content = + """ { + | "_meta": { + | "kind": "test_kind" + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + + val metadata = FlintOpenSearchIndexMetadataService.deserialize(content) + flintClient.createIndex(indexName, metadata) + + flintIndexMetadataService.getIndexMetadata(indexName).kind shouldBe empty + + flintIndexMetadataService.updateIndexMetadata(indexName, metadata) + + flintIndexMetadataService.getIndexMetadata(indexName).kind shouldBe "test_kind" + flintIndexMetadataService.getIndexMetadata(indexName).name shouldBe empty + + val newContent = + """ { + | "_meta": { + | "kind": "test_kind", + | "name": "test_name" + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + + val newMetadata = FlintOpenSearchIndexMetadataService.deserialize(newContent) + flintIndexMetadataService.updateIndexMetadata(indexName, newMetadata) + + flintIndexMetadataService.getIndexMetadata(indexName).kind shouldBe "test_kind" + flintIndexMetadataService.getIndexMetadata(indexName).name shouldBe "test_name" + } +} + +class TestIndexMetadataService extends FlintIndexMetadataService { + override def getIndexMetadata(indexName: String): FlintMetadata = { + null + } + + override def getAllIndexMetadata(indexNamePattern: String*): util.Map[String, FlintMetadata] = { + null + } + + override def updateIndexMetadata(indexName: String, metadata: FlintMetadata): Unit = {} + + override def deleteIndexMetadata(indexName: String): Unit = {} +} diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 31b5c14b1..9c91a129e 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -8,7 +8,8 @@ package org.opensearch.flint.spark import java.util.Base64 import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson -import org.opensearch.flint.core.FlintVersion.current +import org.opensearch.flint.common.FlintVersion.current +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -47,7 +48,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testFlintIndex) index shouldBe defined - index.get.metadata().getContent should matchJson(s"""{ + FlintOpenSearchIndexMetadataService.serialize(index.get.metadata()) should matchJson(s"""{ | "_meta": { | "version": "${current()}", | "name": "name_and_age", diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index ffd956b1c..235cab4d2 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -12,7 +12,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.FlintOpenSearchClient +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined @@ -94,10 +94,12 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { |""".stripMargin) // Check if the index setting option is set to OS index setting - val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get) + val settings = + parse(flintIndexMetadataService.getIndexMetadata(testFlintIndex).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala index a5744271f..ce1cbb2ea 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala @@ -217,4 +217,32 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers { deleteTestIndex(testSkippingFlintIndex) } } + + test("show flint index with special characters") { + val testCoveringIndexSpecial = "test :\"+/\\|?#><" + val testCoveringFlintIndexSpecial = + FlintSparkCoveringIndex.getFlintIndexName(testCoveringIndexSpecial, testTableQualifiedName) + + flint + .coveringIndex() + .name(testCoveringIndexSpecial) + .onTable(testTableQualifiedName) + .addIndexColumns("name", "age") + .options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true"))) + .create() + flint.refreshIndex(testCoveringFlintIndexSpecial) + + checkAnswer( + sql(s"SHOW FLINT INDEX IN spark_catalog"), + Seq( + Row( + testCoveringFlintIndexSpecial, + "covering", + "default", + testTableName, + testCoveringIndexSpecial, + true, + "refreshing"))) + deleteTestIndex(testCoveringFlintIndexSpecial) + } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index f824aab73..605975af6 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -9,7 +9,8 @@ import java.sql.Timestamp import java.util.Base64 import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson -import org.opensearch.flint.core.FlintVersion.current +import org.opensearch.flint.common.FlintVersion.current +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -59,7 +60,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { val index = flint.describeIndex(testFlintIndex) index shouldBe defined - index.get.metadata().getContent should matchJson(s""" + FlintOpenSearchIndexMetadataService.serialize(index.get.metadata()) should matchJson(s""" | { | "_meta": { | "version": "${current()}", diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 439930486..66d6e0779 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -14,7 +14,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.FlintOpenSearchClient +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName import org.scalatest.matchers.must.Matchers.{defined, have} import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} @@ -152,10 +152,12 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { |""".stripMargin) // Check if the index setting option is set to OS index setting - val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get) + val settings = + parse(flintIndexMetadataService.getIndexMetadata(testFlintIndex).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 66e777dea..968f09345 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -10,7 +10,8 @@ import java.util.Base64 import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.native.JsonMethods._ import org.opensearch.client.RequestOptions -import org.opensearch.flint.core.FlintVersion.current +import org.opensearch.flint.common.FlintVersion.current +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.FlintSparkSkippingFileIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName @@ -60,7 +61,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - index.get.metadata().getContent should matchJson(s"""{ + FlintOpenSearchIndexMetadataService.serialize(index.get.metadata()) should matchJson(s"""{ | "_meta": { | "name": "flint_spark_catalog_default_skipping_test_skipping_index", | "version": "${current()}", @@ -155,7 +156,11 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined val optionJson = - compact(render(parse(index.get.metadata().getContent) \ "_meta" \ "options")) + compact( + render( + parse( + FlintOpenSearchIndexMetadataService.serialize( + index.get.metadata())) \ "_meta" \ "options")) optionJson should matchJson(s""" | { | "auto_refresh": "true", @@ -644,7 +649,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val index = flint.describeIndex(testIndex) index shouldBe defined - index.get.metadata().getContent should matchJson(s"""{ + FlintOpenSearchIndexMetadataService.serialize(index.get.metadata()) should matchJson(s"""{ | "_meta": { | "name": "flint_spark_catalog_default_data_type_table_skipping_index", | "version": "${current()}", diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 751a0a7b6..ff114b8e2 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -13,7 +13,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.{compact, parse, render} import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.FlintOpenSearchClient +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} @@ -150,7 +150,8 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit |""".stripMargin)).foreach { case (query, expectedParamJson) => test(s"create skipping index with bloom filter parameters $expectedParamJson") { sql(query) - val metadata = flint.describeIndex(testIndex).get.metadata().getContent + val metadata = FlintOpenSearchIndexMetadataService.serialize( + flint.describeIndex(testIndex).get.metadata()) val parameters = compact(render(parse(metadata) \\ "parameters")) parameters should matchJson(expectedParamJson) } @@ -187,10 +188,11 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit |""".stripMargin) // Check if the index setting option is set to OS index setting - val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testIndex).indexSettings.get) + val settings = parse(flintIndexMetadataService.getIndexMetadata(testIndex).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index c42822f71..f2ed92adc 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.native.JsonMethods._ import org.opensearch.client.RequestOptions +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.opensearch.index.query.QueryBuilders import org.opensearch.index.reindex.DeleteByQueryRequest @@ -57,7 +58,11 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { // Verify index after update val indexFinal = flint.describeIndex(testIndex).get val optionJson = - compact(render(parse(indexFinal.metadata().getContent) \ "_meta" \ "options")) + compact( + render( + parse( + FlintOpenSearchIndexMetadataService.serialize( + indexFinal.metadata())) \ "_meta" \ "options")) optionJson should matchJson(s""" | { | "auto_refresh": "true", diff --git a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParser.scala b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParser.scala index 332dabc95..51618d487 100644 --- a/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParser.scala +++ b/ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParser.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.types.{DataType, StructType} class FlintSparkPPLParser(sparkParser: ParserInterface) extends ParserInterface { /** OpenSearch (PPL) AST builder. */ - private val planTrnasormer = new CatalystQueryPlanVisitor() + private val planTransformer = new CatalystQueryPlanVisitor() private val pplParser = new PPLSyntaxParser() @@ -55,7 +55,7 @@ class FlintSparkPPLParser(sparkParser: ParserInterface) extends ParserInterface try { // if successful build ppl logical plan and translate to catalyst logical plan val context = new CatalystPlanContext - planTrnasormer.visit(plan(pplParser, sqlText, false), context) + planTransformer.visit(plan(pplParser, sqlText, false), context) context.getPlan } catch { // Fall back to Spark parse plan logic if flint cannot parse