Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract metadata log operations from FlintClient into FlintMetadataLogService #379

Merged
merged 20 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface FlintClient {
* Start a new optimistic transaction.
*
* @param indexName index name
* @param forceInit forceInit create empty translog if not exist.
* @param forceInit force init transaction and create empty translog if not exist
* @return transaction handle
*/
<T> OptimisticTransaction<T> startTransaction(String indexName, boolean forceInit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
package org.opensearch.flint.core;

import org.opensearch.flint.core.storage.FlintOpenSearchClient;
import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService;

/**
* {@link FlintClient} builder.
*/
public class FlintClientBuilder {

public static FlintClient build(FlintOptions options) {
return new FlintOpenSearchClient(options);
return new FlintOpenSearchClient(options, new FlintOpenSearchMetadataLogService(options));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;

import java.util.Optional;

/**
* Flint metadata log service provides API for metadata log related operations on a Flint index
* regardless of concrete storage.
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
*/
public interface FlintMetadataLogService {

/**
* Start a new optimistic transaction.
*
* @param indexName index name
* @param forceInit force init transaction and create empty metadata log if not exist
* @return transaction handle
*/
<T> OptimisticTransaction<T> startTransaction(String indexName, boolean forceInit);

/**
* Start a new optimistic transaction.
*
* @param indexName index name
* @return transaction handle
*/
default <T> OptimisticTransaction<T> startTransaction(String indexName) {
return startTransaction(indexName, false);
}

/**
* Get metadata log for index.
*
* @param indexName index name
* @param initIfNotExist create empty metadata log if not exist
* @return optional metadata log
*/
Optional<FlintMetadataLog<FlintMetadataLogEntry>> getIndexMetadataLog(String indexName, boolean initIfNotExist);

/**
* Get metadata log for index.
*
* @param indexName index name
* @return optional metadata log
*/
default Optional<FlintMetadataLog<FlintMetadataLogEntry>> getIndexMetadataLog(String indexName) {
return getIndexMetadataLog(indexName, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogService;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.index.query.AbstractQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchModule;
import org.opensearch.search.builder.SearchSourceBuilder;
import scala.Option;
import scala.Some;

/**
* Flint client implementation for OpenSearch storage.
Expand All @@ -67,47 +67,26 @@ public class FlintOpenSearchClient implements FlintClient {
private final static Set<Character> INVALID_INDEX_NAME_CHARS =
Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<');

/**
* Metadata log index name prefix
*/
public final static String META_LOG_NAME_PREFIX = ".query_execution_request";

private final FlintOptions options;
private final String dataSourceName;
private final String metaLogIndexName;
private final FlintMetadataLogService metadataLogService;

public FlintOpenSearchClient(FlintOptions options) {
public FlintOpenSearchClient(FlintOptions options, FlintMetadataLogService metadataLogService) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should better naming FlintOpenSearchClient, the reasons it FlintOpenSearchClient depend FlintMedataLogService, not OpenSearch. Or in the other way,

  1. Remove FlintMedataLogService operation from FlintOpenSearchClient
  2. Modify FlintOpenSearchClient caller to call FlintMedataLogService

Copy link
Member

@vamsimanohar vamsimanohar Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for above comment.
Also, what is the value add of FlintOpenSearchClient if we are passing down everything to FlintMetadataLogService?

Can we replace FlintOpenSearchClient with FlintMetadataLogService where ever we make operations on FlintMetadataLog.

Copy link
Collaborator Author

@seankao-az seankao-az Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For startTransaction I agree that we can make the callers use FlintMetadataLogService instead. However, FlintOpenSearchClient getIndexMetadata still needs FlintMetadataLogService to getMetadataLog.

Will try to move it away from getIndexMetadata and instead use it in its caller (FlintSpark describeIndex)

Copy link
Collaborator

@dai-chen dai-chen Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we only initialize request index when starting transaction in createIndex. Now current change is to init whenever new API getIndexMetadataLog called? Not sure the impact and if benefit on code structure.

Copy link
Collaborator Author

@seankao-az seankao-az Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now current change is to init whenever new API getIndexMetadataLog called?

Sure... init is indeed not required. Changed so that the FlintMetadataLogService public interface for getIndexMetadataLog cannot init request index. It gets metadata log if available.
Now the initIfNotExist is only used as private method for FlintOpenSearchMetadataLogService as part of startTransaction with forceInit.

Copy link
Collaborator Author

@seankao-az seankao-az Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed FlintMetadataLogService operation from FlintOpenSearchClient

this.options = options;
this.dataSourceName = options.getDataSourceName();
this.metaLogIndexName = constructMetaLogIndexName();
this.metadataLogService = metadataLogService;
}

public FlintOpenSearchClient(FlintOptions options) {
this(options, new FlintOpenSearchMetadataLogService(options));
}

@Override
public <T> OptimisticTransaction<T> startTransaction(String indexName, boolean forceInit) {
LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName);
try (IRestHighLevelClient client = createClient()) {
if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
} else {
if (forceInit) {
createIndex(metaLogIndexName, FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_MAPPING(),
Some.apply(FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_SETTINGS()));
} else {
String errorMsg = "Metadata log index not found " + metaLogIndexName;
LOG.warning(errorMsg);
throw new IllegalStateException(errorMsg);
}
}
return new DefaultOptimisticTransaction<>(dataSourceName,
new FlintOpenSearchMetadataLog(options, indexName, metaLogIndexName));
} catch (IOException e) {
throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e);
}
return metadataLogService.startTransaction(indexName, forceInit);
}

@Override
public <T> OptimisticTransaction<T> startTransaction(String indexName) {
return startTransaction(indexName, false);
return metadataLogService.startTransaction(indexName);
}

@Override
Expand Down Expand Up @@ -243,30 +222,13 @@ public IRestHighLevelClient createClient() {

/*
* Constructs Flint metadata with latest metadata log entry attached if it's available.
* It relies on FlintOptions to provide data source name.
*/
private FlintMetadata constructFlintMetadata(String indexName, String mapping, String settings) {
String dataSourceName = options.getDataSourceName();
String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX
: META_LOG_NAME_PREFIX + "_" + dataSourceName;
Optional<FlintMetadataLogEntry> latest = Optional.empty();

try (IRestHighLevelClient client = createClient()) {
if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
FlintOpenSearchMetadataLog metadataLog =
new FlintOpenSearchMetadataLog(options, indexName, metaLogIndexName);
latest = metadataLog.getLatest();
}
} catch (IOException e) {
throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e);
}

if (latest.isEmpty()) {
return FlintMetadata.apply(mapping, settings);
} else {
return FlintMetadata.apply(mapping, settings, latest.get());
}
Optional<FlintMetadataLogEntry> latest = metadataLogService.getIndexMetadataLog(indexName)
.flatMap(FlintMetadataLog::getLatest);
return latest
.map(entry -> FlintMetadata.apply(mapping, settings, entry))
.orElseGet(() -> FlintMetadata.apply(mapping, settings));
}

/*
Expand Down Expand Up @@ -305,8 +267,4 @@ private String sanitizeIndexName(String indexName) {
String encoded = percentEncode(indexName);
return toLowercase(encoded);
}

private String constructMetaLogIndexName() {
return dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX : META_LOG_NAME_PREFIX + "_" + dataSourceName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import java.io.IOException;
import java.util.Optional;
import java.util.logging.Logger;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogService;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;

/**
* Flint metadata log service implementation for OpenSearch storage.
*/
public class FlintOpenSearchMetadataLogService implements FlintMetadataLogService {

private static final Logger LOG = Logger.getLogger(FlintOpenSearchMetadataLogService.class.getName());

public final static String METADATA_LOG_INDEX_NAME_PREFIX = ".query_execution_request";

private final FlintOptions options;
private final String dataSourceName;
private final String metaLogIndexName;

public FlintOpenSearchMetadataLogService(FlintOptions options) {
this.options = options;
this.dataSourceName = options.getDataSourceName();
this.metaLogIndexName = constructMetaLogIndexName();
}

@Override
public <T> OptimisticTransaction<T> startTransaction(String indexName, boolean forceInit) {
LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName);
Optional<FlintMetadataLog<FlintMetadataLogEntry>> metadataLog = getIndexMetadataLog(indexName, forceInit);
if (metadataLog.isEmpty()) {
String errorMsg = "Metadata log index not found " + metaLogIndexName;
throw new IllegalStateException(errorMsg);
}
return new DefaultOptimisticTransaction<>(dataSourceName, metadataLog.get());
}

@Override
public Optional<FlintMetadataLog<FlintMetadataLogEntry>> getIndexMetadataLog(String indexName, boolean initIfNotExist) {
LOG.info("Getting metadata log for index " + indexName + " and data source " + dataSourceName);
try (IRestHighLevelClient client = createOpenSearchClient()) {
if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
} else {
if (initIfNotExist) {
initIndexMetadataLog();
} else {
String errorMsg = "Metadata log index not found " + metaLogIndexName;
LOG.warning(errorMsg);
return Optional.empty();
}
}
return Optional.of(new FlintOpenSearchMetadataLog(options, indexName, metaLogIndexName));
} catch (IOException e) {
throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e);
}
}

private void initIndexMetadataLog() {
LOG.info("Initializing metadata log index " + metaLogIndexName);
try (IRestHighLevelClient client = createOpenSearchClient()) {
CreateIndexRequest request = new CreateIndexRequest(metaLogIndexName);
request.mapping(FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_MAPPING(), XContentType.JSON);
request.settings(FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_SETTINGS(), XContentType.JSON);
client.createIndex(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new IllegalStateException("Failed to initialize metadata log index " + metaLogIndexName, e);
}
}

private String constructMetaLogIndexName() {
return dataSourceName.isEmpty() ? METADATA_LOG_INDEX_NAME_PREFIX : METADATA_LOG_INDEX_NAME_PREFIX + "_" + dataSourceName;
}

private IRestHighLevelClient createOpenSearchClient() {
return OpenSearchClientUtils.createClient(options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.{QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState
import org.opensearch.flint.core.storage.FlintOpenSearchClient._
import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService.METADATA_LOG_INDEX_NAME_PREFIX
import org.opensearch.flint.spark.FlintSparkSuite

import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME
Expand All @@ -31,7 +31,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME
trait OpenSearchTransactionSuite extends FlintSparkSuite {

val testDataSourceName = "myglue"
lazy val testMetaLogIndex: String = META_LOG_NAME_PREFIX + "_" + testDataSourceName
lazy val testMetaLogIndex: String = METADATA_LOG_INDEX_NAME_PREFIX + "_" + testDataSourceName

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down
Loading
Loading