Skip to content

Commit

Permalink
Extract metadata log operations from FlintClient into FlintMetadataLo…
Browse files Browse the repository at this point in the history
…gService (#379)

* metadata log service interface

Signed-off-by: Sean Kao <seankao@amazon.com>

* flint opensearch metadata log service impl

Signed-off-by: Sean Kao <seankao@amazon.com>

* reflection for metadata log service instantiate

Signed-off-by: Sean Kao <seankao@amazon.com>

* add comments

Signed-off-by: Sean Kao <seankao@amazon.com>

* undo using client builder in tests

FlintClientBuilder uses reflection to instantiate
FlintMetadataLogService, which cannot be checked at compile time.

Signed-off-by: Sean Kao <seankao@amazon.com>

* use metadata log service in transaction IT suites

Signed-off-by: Sean Kao <seankao@amazon.com>

* metadata log test suite

Signed-off-by: Sean Kao <seankao@amazon.com>

* flint client builder test

Signed-off-by: Sean Kao <seankao@amazon.com>

* scalafmtAll

Signed-off-by: Sean Kao <seankao@amazon.com>

* undo reflection

Signed-off-by: Sean Kao <seankao@amazon.com>

* rename descriptive symbols

Signed-off-by: Sean Kao <seankao@amazon.com>

* descriptive parameter name and update comment

Signed-off-by: Sean Kao <seankao@amazon.com>

* remove init from getMetadataLog interface

Signed-off-by: Sean Kao <seankao@amazon.com>

* remove startTransaction from FlintClient

Signed-off-by: Sean Kao <seankao@amazon.com>

* rm flint os client dep on metadata log service

Signed-off-by: Sean Kao <seankao@amazon.com>

* rename function and remove unused flint client

Signed-off-by: Sean Kao <seankao@amazon.com>

* remove test case for init in getMetadataLog

Signed-off-by: Sean Kao <seankao@amazon.com>

* abstract recordHeartbeat method

Signed-off-by: Sean Kao <seankao@amazon.com>

* consistent var naming for metadataLogIndexName

Signed-off-by: Sean Kao <seankao@amazon.com>

---------

Signed-off-by: Sean Kao <seankao@amazon.com>
  • Loading branch information
seankao-az authored Jun 18, 2024
1 parent 80d8f6e commit d0caa7b
Show file tree
Hide file tree
Showing 12 changed files with 372 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

package org.opensearch.flint.core;

import java.util.List;
import java.util.Map;

import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.storage.FlintReader;
import org.opensearch.flint.core.storage.FlintWriter;

Expand All @@ -18,24 +17,6 @@
*/
public interface FlintClient {

/**
* Start a new optimistic transaction.
*
* @param indexName index name
* @return transaction handle
*/
<T> OptimisticTransaction<T> startTransaction(String indexName);

/**
*
* Start a new optimistic transaction.
*
* @param indexName index name
* @param forceInit forceInit create empty translog if not exist.
* @return transaction handle
*/
<T> OptimisticTransaction<T> startTransaction(String indexName, boolean forceInit);

/**
* Create a Flint index with the metadata given.
*
Expand All @@ -56,9 +37,10 @@ public interface FlintClient {
* Retrieve all metadata for Flint index whose name matches the given pattern.
*
* @param indexNamePattern index name pattern
* @return all matched index metadata
* @return map where the keys are the matched index names, and the values are
* corresponding index metadata
*/
List<FlintMetadata> getAllIndexMetadata(String indexNamePattern);
Map<String, FlintMetadata> getAllIndexMetadata(String indexNamePattern);

/**
* Retrieve metadata in a Flint index.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;

import java.util.Optional;

/**
* Flint metadata log service provides API for metadata log related operations on a Flint index
* regardless of underlying storage.
*/
public interface FlintMetadataLogService {

/**
* Start a new optimistic transaction.
*
* @param indexName index name
* @param forceInit force init transaction and create empty metadata log if not exist
* @return transaction handle
*/
<T> OptimisticTransaction<T> startTransaction(String indexName, boolean forceInit);

/**
* Start a new optimistic transaction.
*
* @param indexName index name
* @return transaction handle
*/
default <T> OptimisticTransaction<T> startTransaction(String indexName) {
return startTransaction(indexName, false);
}

/**
* Get metadata log for index.
*
* @param indexName index name
* @return optional metadata log
*/
Optional<FlintMetadataLog<FlintMetadataLogEntry>> getIndexMetadataLog(String indexName);

/**
* Record heartbeat timestamp for index streaming job.
*
* @param indexName index name
*/
void recordHeartbeat(String indexName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;

import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService;

/**
* {@link FlintMetadataLogService} builder.
*/
public class FlintMetadataLogServiceBuilder {
public static FlintMetadataLogService build(FlintOptions options) {
return new FlintOpenSearchMetadataLogService(options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand All @@ -33,16 +32,12 @@
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.index.query.AbstractQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchModule;
import org.opensearch.search.builder.SearchSourceBuilder;
import scala.Option;
import scala.Some;

/**
* Flint client implementation for OpenSearch storage.
Expand All @@ -67,47 +62,10 @@ public class FlintOpenSearchClient implements FlintClient {
private final static Set<Character> INVALID_INDEX_NAME_CHARS =
Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<');

/**
* Metadata log index name prefix
*/
public final static String META_LOG_NAME_PREFIX = ".query_execution_request";

private final FlintOptions options;
private final String dataSourceName;
private final String metaLogIndexName;

public FlintOpenSearchClient(FlintOptions options) {
this.options = options;
this.dataSourceName = options.getDataSourceName();
this.metaLogIndexName = constructMetaLogIndexName();
}

@Override
public <T> OptimisticTransaction<T> startTransaction(String indexName, boolean forceInit) {
LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName);
try (IRestHighLevelClient client = createClient()) {
if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
} else {
if (forceInit) {
createIndex(metaLogIndexName, FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_MAPPING(),
Some.apply(FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_SETTINGS()));
} else {
String errorMsg = "Metadata log index not found " + metaLogIndexName;
LOG.warning(errorMsg);
throw new IllegalStateException(errorMsg);
}
}
return new DefaultOptimisticTransaction<>(dataSourceName,
new FlintOpenSearchMetadataLog(options, indexName, metaLogIndexName));
} catch (IOException e) {
throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e);
}
}

@Override
public <T> OptimisticTransaction<T> startTransaction(String indexName) {
return startTransaction(indexName, false);
}

@Override
Expand Down Expand Up @@ -143,19 +101,21 @@ public boolean exists(String indexName) {
}

@Override
public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
public Map<String, FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = sanitizeIndexName(indexNamePattern);
try (IRestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexNamePattern);
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

return Arrays.stream(response.getIndices())
.map(index -> constructFlintMetadata(
index,
response.getMappings().get(index).source().toString(),
response.getSettings().get(index).toString()))
.collect(Collectors.toList());
.collect(Collectors.toMap(
index -> index,
index -> FlintMetadata.apply(
response.getMappings().get(index).source().toString(),
response.getSettings().get(index).toString()
)
));
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e);
}
Expand All @@ -171,7 +131,7 @@ public FlintMetadata getIndexMetadata(String indexName) {

MappingMetadata mapping = response.getMappings().get(osIndexName);
Settings settings = response.getSettings().get(osIndexName);
return constructFlintMetadata(indexName, mapping.source().string(), settings.toString());
return FlintMetadata.apply(mapping.source().string(), settings.toString());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e);
}
Expand Down Expand Up @@ -241,34 +201,6 @@ public IRestHighLevelClient createClient() {
return OpenSearchClientUtils.createClient(options);
}

/*
* Constructs Flint metadata with latest metadata log entry attached if it's available.
* It relies on FlintOptions to provide data source name.
*/
private FlintMetadata constructFlintMetadata(String indexName, String mapping, String settings) {
String dataSourceName = options.getDataSourceName();
String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX
: META_LOG_NAME_PREFIX + "_" + dataSourceName;
Optional<FlintMetadataLogEntry> latest = Optional.empty();

try (IRestHighLevelClient client = createClient()) {
if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
FlintOpenSearchMetadataLog metadataLog =
new FlintOpenSearchMetadataLog(options, indexName, metaLogIndexName);
latest = metadataLog.getLatest();
}
} catch (IOException e) {
throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e);
}

if (latest.isEmpty()) {
return FlintMetadata.apply(mapping, settings);
} else {
return FlintMetadata.apply(mapping, settings, latest.get());
}
}

/*
* Because OpenSearch requires all lowercase letters in index name, we have to
* lowercase all letters in the given Flint index name.
Expand Down Expand Up @@ -305,8 +237,4 @@ private String sanitizeIndexName(String indexName) {
String encoded = percentEncode(indexName);
return toLowercase(encoded);
}

private String constructMetaLogIndexName() {
return dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX : META_LOG_NAME_PREFIX + "_" + dataSourceName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ public class FlintOpenSearchMetadataLog implements FlintMetadataLog<FlintMetadat
/**
* Reuse query request index as Flint metadata log store
*/
private final String metaLogIndexName;
private final String metadataLogIndexName;

/**
* Doc id for latest log entry (Naming rule is static so no need to query Flint index metadata)
*/
private final String latestId;

public FlintOpenSearchMetadataLog(FlintOptions options, String flintIndexName, String metaLogIndexName) {
public FlintOpenSearchMetadataLog(FlintOptions options, String flintIndexName, String metadataLogIndexName) {
this.options = options;
this.metaLogIndexName = metaLogIndexName;
this.metadataLogIndexName = metadataLogIndexName;
this.latestId = Base64.getEncoder().encodeToString(flintIndexName.getBytes());
}

Expand All @@ -62,7 +62,7 @@ public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) {
// TODO: use single doc for now. this will be always append in future.
FlintMetadataLogEntry latest;
if (!exists()) {
String errorMsg = "Flint Metadata Log index not found " + metaLogIndexName;
String errorMsg = "Flint Metadata Log index not found " + metadataLogIndexName;
LOG.log(SEVERE, errorMsg);
throw new IllegalStateException(errorMsg);
}
Expand All @@ -79,7 +79,7 @@ public Optional<FlintMetadataLogEntry> getLatest() {
LOG.info("Fetching latest log entry with id " + latestId);
try (IRestHighLevelClient client = createOpenSearchClient()) {
GetResponse response =
client.get(new GetRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);
client.get(new GetRequest(metadataLogIndexName, latestId), RequestOptions.DEFAULT);

if (response.isExists()) {
FlintMetadataLogEntry latest = new FlintMetadataLogEntry(
Expand All @@ -105,7 +105,7 @@ public void purge() {
try (IRestHighLevelClient client = createOpenSearchClient()) {
DeleteResponse response =
client.delete(
new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT);
new DeleteRequest(metadataLogIndexName, latestId), RequestOptions.DEFAULT);

LOG.info("Purged log entry with result " + response.getResult());
} catch (Exception e) {
Expand All @@ -129,7 +129,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
return writeLogEntry(logEntryWithId,
client -> client.index(
new IndexRequest()
.index(metaLogIndexName)
.index(metadataLogIndexName)
.id(logEntryWithId.id())
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
.source(logEntryWithId.toJson(), XContentType.JSON),
Expand All @@ -140,7 +140,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) {
LOG.info("Updating log entry " + logEntry);
return writeLogEntry(logEntry,
client -> client.update(
new UpdateRequest(metaLogIndexName, logEntry.id())
new UpdateRequest(metadataLogIndexName, logEntry.id())
.doc(logEntry.toJson(), XContentType.JSON)
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
.setIfSeqNo(logEntry.seqNo())
Expand Down Expand Up @@ -173,11 +173,11 @@ private FlintMetadataLogEntry writeLogEntry(
}

private boolean exists() {
LOG.info("Checking if Flint index exists " + metaLogIndexName);
LOG.info("Checking if Flint index exists " + metadataLogIndexName);
try (IRestHighLevelClient client = createOpenSearchClient()) {
return client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT);
return client.doesIndexExist(new GetIndexRequest(metadataLogIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists " + metaLogIndexName, e);
throw new IllegalStateException("Failed to check if Flint index exists " + metadataLogIndexName, e);
}
}

Expand Down
Loading

0 comments on commit d0caa7b

Please sign in to comment.