Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.5] Abstract service for accessing Flint index metadata #575

Merged
merged 1 commit into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.auth.password`: basic auth password.
- `spark.datasource.flint.region`: default is us-west-2. only been used when auth=sigv4
- `spark.datasource.flint.customAWSCredentialsProvider`: default is empty.
- `spark.datasource.flint.customFlintMetadataLogServiceClass`: default is empty.
- `spark.datasource.flint.customFlintIndexMetadataServiceClass`: default is empty.
- `spark.datasource.flint.write.id_name`: no default value.
- `spark.datasource.flint.ignore.id_column` : default value is true.
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core
package org.opensearch.flint.common

/**
* Flint version.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.common.metadata;

import java.util.Map;

/**
* Flint index metadata service provides API for index metadata related operations on a Flint index
* regardless of underlying storage.
* <p>
* Custom implementations of this interface are expected to provide a public constructor with
* the signature {@code public MyCustomService(SparkConf sparkConf)} to be instantiated by
* the FlintIndexMetadataServiceBuilder.
*/
public interface FlintIndexMetadataService {

/**
* Retrieve metadata for a Flint index.
*
* @param indexName index name
* @return index metadata
*/
FlintMetadata getIndexMetadata(String indexName);

/**
* Retrieve all metadata for Flint index whose name matches the given pattern.
*
* @param indexNamePattern index name pattern
* @return map where the keys are the matched index names, and the values are
* corresponding index metadata
*/
Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePattern);

/**
* Update metadata for a Flint index.
*
* @param indexName index name
* @param metadata index metadata to update
*/
void updateIndexMetadata(String indexName, FlintMetadata metadata);

/**
* Delete metadata for a Flint index.
*
* @param indexName index name
*/
void deleteIndexMetadata(String indexName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata
package org.opensearch.flint.common.metadata

import java.util

import org.opensearch.flint.common.FlintVersion
import org.opensearch.flint.common.FlintVersion.current
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.FlintVersion
import org.opensearch.flint.core.FlintVersion.current
import org.opensearch.flint.core.metadata.FlintJsonHelper._

/**
* Flint metadata follows Flint index specification and defines metadata for a Flint index
Expand All @@ -35,7 +34,11 @@ case class FlintMetadata(
schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Optional latest metadata log entry id */
latestId: Option[String] = None,
/** Optional latest metadata log entry */
/**
* Optional latest metadata log entry. TODO: remove? This was added for SHOW command to be
* fetched during get(All)IndexMetadata. Now describeIndex uses metadata log service to fetch
* log entry after get(All)IndexMetadata so this doesn't need to be part of FlintMetadata.
*/
latestLogEntry: Option[FlintMetadataLogEntry] = None,
/** Optional Flint index settings. TODO: move elsewhere? */
indexSettings: Option[String]) {
Expand All @@ -44,124 +47,10 @@ case class FlintMetadata(
require(name != null, "name is required")
require(kind != null, "kind is required")
require(source != null, "source is required")

/**
* Generate JSON content as index metadata.
*
* @return
* JSON content
*/
def getContent: String = {
try {
buildJson(builder => {
// Add _meta field
objectField(builder, "_meta") {
builder
.field("version", version.version)
.field("name", name)
.field("kind", kind)
.field("source", source)
.field("indexedColumns", indexedColumns)

if (latestId.isDefined) {
builder.field("latestId", latestId.get)
}
optionalObjectField(builder, "options", options)
optionalObjectField(builder, "properties", properties)
}

// Add properties (schema) field
builder.field("properties", schema)
})
} catch {
case e: Exception =>
throw new IllegalStateException("Failed to jsonify Flint metadata", e)
}
}
}

object FlintMetadata {

/**
* Construct Flint metadata with JSON content, index settings, and latest log entry.
*
* @param content
* JSON content
* @param settings
* index settings
* @param latestLogEntry
* latest metadata log entry
* @return
* Flint metadata
*/
def apply(
content: String,
settings: String,
latestLogEntry: FlintMetadataLogEntry): FlintMetadata = {
val metadata = FlintMetadata(content, settings)
metadata.copy(latestLogEntry = Option(latestLogEntry))
}

/**
* Construct Flint metadata with JSON content and index settings.
*
* @param content
* JSON content
* @param settings
* index settings
* @return
* Flint metadata
*/
def apply(content: String, settings: String): FlintMetadata = {
val metadata = FlintMetadata(content)
metadata.copy(indexSettings = Option(settings))
}

/**
* Parse the given JSON content and construct Flint metadata class.
*
* @param content
* JSON content
* @return
* Flint metadata
*/
def apply(content: String): FlintMetadata = {
try {
val builder = new FlintMetadata.Builder()
parseJson(content) { (parser, fieldName) =>
{
fieldName match {
case "_meta" =>
parseObjectField(parser) { (parser, innerFieldName) =>
{
innerFieldName match {
case "version" => builder.version(FlintVersion.apply(parser.text()))
case "name" => builder.name(parser.text())
case "kind" => builder.kind(parser.text())
case "source" => builder.source(parser.text())
case "indexedColumns" =>
parseArrayField(parser) {
builder.addIndexedColumn(parser.map())
}
case "options" => builder.options(parser.map())
case "properties" => builder.properties(parser.map())
case _ => // Handle other fields as needed
}
}
}
case "properties" =>
builder.schema(parser.map())
case _ => // Ignore other fields, for instance, dynamic.
}
}
}
builder.build()
} catch {
case e: Exception =>
throw new IllegalStateException("Failed to parse metadata JSON", e)
}
}

def builder(): FlintMetadata.Builder = new Builder

/**
Expand Down Expand Up @@ -231,16 +120,6 @@ object FlintMetadata {
this
}

def schema(schema: String): this.type = {
parseJson(schema) { (parser, fieldName) =>
fieldName match {
case "properties" => this.schema = parser.map()
case _ => // do nothing
}
}
this
}

def latestLogEntry(entry: FlintMetadataLogEntry): this.type = {
this.latestId = Option(entry.id)
this.latestLogEntry = Option(entry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ case class FlintMetadataLogEntry(
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
storageContext: JMap[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext.asScala.toMap)
properties: JMap[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, properties.asScala.toMap)
}

def this(
Expand All @@ -53,8 +53,8 @@ case class FlintMetadataLogEntry(
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
storageContext: Map[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext)
properties: Map[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, properties)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.common.metadata

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.common.FlintVersion.current
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class FlintMetadataSuite extends AnyFlatSpec with Matchers {
"builder" should "build FlintMetadata with provided fields" in {
val builder = new FlintMetadata.Builder
builder.name("test_index")
builder.kind("test_kind")
builder.source("test_source_table")
builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava)
builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava)

val metadata = builder.build()

metadata.version shouldBe current()
metadata.name shouldBe "test_index"
metadata.kind shouldBe "test_kind"
metadata.source shouldBe "test_source_table"
metadata.indexedColumns shouldBe Array(Map("test_field" -> "spark_type").asJava)
metadata.schema shouldBe Map("test_field" -> Map("type" -> "os_type").asJava).asJava
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

package org.opensearch.flint.core;

import java.util.Map;

import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.common.metadata.FlintMetadata;
import org.opensearch.flint.core.storage.FlintWriter;

/**
Expand All @@ -32,31 +30,6 @@ public interface FlintClient {
*/
boolean exists(String indexName);

/**
* Retrieve all metadata for Flint index whose name matches the given pattern.
*
* @param indexNamePattern index name pattern
* @return map where the keys are the matched index names, and the values are
* corresponding index metadata
*/
Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePattern);

/**
* Retrieve metadata in a Flint index.
*
* @param indexName index name
* @return index metadata
*/
FlintMetadata getIndexMetadata(String indexName);

/**
* Update a Flint index with the metadata given.
*
* @param indexName index name
* @param metadata index metadata
*/
void updateIndex(String indexName, FlintMetadata metadata);

/**
* Delete a Flint index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ public class FlintOptions implements Serializable {

public static final String DEFAULT_BATCH_BYTES = "1mb";

public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "spark.datasource.flint.customFlintMetadataLogServiceClass";
public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "customFlintMetadataLogServiceClass";

public static final String CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS = "customFlintIndexMetadataServiceClass";

public static final String SUPPORT_SHARD = "read.support_shard";

Expand Down Expand Up @@ -189,6 +191,10 @@ public String getCustomFlintMetadataLogServiceClass() {
return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, "");
}

public String getCustomFlintIndexMetadataServiceClass() {
return options.getOrDefault(CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS, "");
}

/**
* FIXME, This is workaround for AWS OpenSearch Serverless (AOSS). AOSS does not support shard
* operation, but shard info is exposed in index settings. Remove this setting when AOSS fix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.flint.core

import org.opensearch.flint.core.metadata.FlintMetadata

/**
* OpenSearch Table metadata.
*
Expand All @@ -18,11 +16,3 @@ import org.opensearch.flint.core.metadata.FlintMetadata
* setting
*/
case class MetaData(name: String, properties: String, setting: String)

object MetaData {
def apply(name: String, flintMetadata: FlintMetadata): MetaData = {
val properties = flintMetadata.getContent
val setting = flintMetadata.indexSettings.getOrElse("")
MetaData(name, properties, setting)
}
}
Loading
Loading