Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenSearchTable in flint core #479

Merged
merged 6 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.opensearch.OpenSearchException;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
Expand All @@ -30,6 +29,9 @@
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.opensearch.indices.IndicesStatsRequest;
import org.opensearch.client.opensearch.indices.IndicesStatsResponse;
import org.opensearch.flint.core.logging.CustomLogging;
import org.opensearch.flint.core.logging.OperationMessage;
import org.opensearch.flint.core.metrics.MetricsUtil;
Expand Down Expand Up @@ -69,6 +71,8 @@ public interface IRestHighLevelClient extends Closeable {

DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException;

IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException;

CreatePitResponse createPit(CreatePitRequest request) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.pit.CreatePitResponse;
import org.opensearch.client.opensearch.core.pit.CreatePitRequest;
import org.opensearch.client.opensearch.core.pit.CreatePitResponse;
import org.opensearch.client.opensearch.indices.IndicesStatsRequest;
import org.opensearch.client.opensearch.indices.IndicesStatsResponse;
import org.opensearch.client.transport.rest_client.RestClientTransport;

import java.io.IOException;

import static org.opensearch.flint.core.metrics.MetricConstants.*;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_READ_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_WRITE_OP_METRIC_PREFIX;

/**
* A wrapper class for RestHighLevelClient to facilitate OpenSearch operations
Expand Down Expand Up @@ -121,6 +124,17 @@ public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.update(updateRequest, options));
}

@Override
public IndicesStatsResponse stats(IndicesStatsRequest request) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX,
penghuo marked this conversation as resolved.
Show resolved Hide resolved
() -> {
OpenSearchClient openSearchClient =
new OpenSearchClient(new RestClientTransport(client.getLowLevelClient(),
new JacksonJsonpMapper()));
return openSearchClient.indices().stats(request);
});
}

@Override
public CreatePitResponse createPit(CreatePitRequest request) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> openSearchClient().createPit(request));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.Map;

import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.storage.FlintReader;
import org.opensearch.flint.core.storage.FlintWriter;

/**
Expand Down Expand Up @@ -65,25 +64,6 @@ public interface FlintClient {
*/
void deleteIndex(String indexName);

/**
* Create {@link FlintReader}.
*
* @param indexName index name.
* @param query DSL query. DSL query is null means match_all
* @return {@link FlintReader}.
*/
FlintReader createReader(String indexName, String query);

/**
* Create {@link FlintReader}.
*
* @param indexName index name.
* @param shardId shard id.
* @param query DSL query. DSL query is null means match_all
* @return {@link FlintReader}.
*/
FlintReader createReader(String indexName, String shardId, String query);

/**
* Create {@link FlintWriter}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.Serializable;
import java.util.Map;
import java.util.Optional;

import org.apache.spark.network.util.ByteUnit;
import org.opensearch.flint.core.http.FlintRetryOptions;
Expand Down Expand Up @@ -57,7 +58,7 @@ public class FlintOptions implements Serializable {
public static final String SYSTEM_INDEX_KEY_NAME = "spark.flint.job.requestIndex";

/**
* Used by {@link org.opensearch.flint.core.storage.OpenSearchScrollReader}
* The page size for OpenSearch Rest Request.
*/
public static final String SCROLL_SIZE = "read.scroll_size";
public static final int DEFAULT_SCROLL_SIZE = 100;
Expand Down Expand Up @@ -92,6 +93,10 @@ public class FlintOptions implements Serializable {

public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "spark.datasource.flint.customFlintMetadataLogServiceClass";

public static final String SUPPORT_SHARD = "read.support_shard";

public static final String DEFAULT_SUPPORT_SHARD = "true";

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand All @@ -105,8 +110,12 @@ public int getPort() {
return Integer.parseInt(options.getOrDefault(PORT, "9200"));
}

public int getScrollSize() {
return Integer.parseInt(options.getOrDefault(SCROLL_SIZE, String.valueOf(DEFAULT_SCROLL_SIZE)));
public Optional<Integer> getScrollSize() {
if (options.containsKey(SCROLL_SIZE)) {
return Optional.of(Integer.parseInt(options.get(SCROLL_SIZE)));
} else {
return Optional.empty();
}
}

public int getScrollDuration() {
Expand Down Expand Up @@ -168,4 +177,9 @@ public int getBatchBytes() {
public String getCustomFlintMetadataLogServiceClass() {
return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, "");
}

public boolean supportShard() {
return options.getOrDefault(SUPPORT_SHARD, DEFAULT_SUPPORT_SHARD).equalsIgnoreCase(
DEFAULT_SUPPORT_SHARD);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core

/**
* Schema in OpenSearch index mapping format.
*
* @param jsonSchema
*/
case class JsonSchema(jsonSchema: String) extends Schema {
override def asJson(): String = jsonSchema
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core

import org.opensearch.flint.core.metadata.FlintMetadata

/**
* OpenSearch Table metadata.
*
* @param name
* name
* @param properties
* properties
* @param setting
* setting
*/
case class MetaData(name: String, properties: String, setting: String)

object MetaData {
def apply(name: String, flintMetadata: FlintMetadata): MetaData = {
val properties = flintMetadata.getContent
val setting = flintMetadata.indexSettings.getOrElse("")
MetaData(name, properties, setting)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core

/**
* Table Schema.
*/
trait Schema {

/**
* Return table schema as Json.
*
* @return
* schema.
*/
def asJson(): String
}
54 changes: 54 additions & 0 deletions flint-core/src/main/scala/org/opensearch/flint/core/Table.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core

import org.opensearch.flint.core.storage.FlintReader

/**
* A OpenSearch Table.
*/
trait Table extends Serializable {

/**
* OpenSearch Table MetaData.
*
* @return
* {@link Table}
*/
def metaData(): MetaData

/**
* Is OpenSearch Table splittable.
*
* @return
* true if splittable, otherwise false.
*/
def isSplittable(): Boolean = false

/**
* Slice OpenSearch Table.
* @return
* a sequence of sliced OpenSearch Table
*/
def slice(): Seq[Table]

/**
* Create Flint Reader from DSL query.
*
* @param query
* OpenSearch DSL query.
* @return
*/
def createReader(query: String): FlintReader

/**
* OpenSearch Table schema
*
* @return
* {@link Schema}
*/
def schema(): Schema
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
import java.util.logging.Logger;
import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate;
import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate;
import java.io.Serializable;

/**
* Flint options related to HTTP request retry.
*/
public class FlintRetryOptions {
public class FlintRetryOptions implements Serializable {

private static final Logger LOG = Logger.getLogger(FlintRetryOptions.class.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ object FlintMetadata {
}
case "properties" =>
builder.schema(parser.map())
case _ => // Ignore other fields, for instance, dynamic.
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchModule;
import org.opensearch.search.builder.SearchSourceBuilder;
import scala.Option;

import java.io.IOException;
Expand All @@ -35,7 +34,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;

Expand All @@ -48,11 +46,10 @@ public class FlintOpenSearchClient implements FlintClient {

private static final Logger LOG = Logger.getLogger(FlintOpenSearchClient.class.getName());


/**
* {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder} from DSL query string.
*/
private final static NamedXContentRegistry
public final static NamedXContentRegistry
xContentRegistry =
new NamedXContentRegistry(new SearchModule(Settings.builder().build(),
new ArrayList<>()).getNamedXContents());
Expand All @@ -64,15 +61,23 @@ public class FlintOpenSearchClient implements FlintClient {
private final static Set<Character> INVALID_INDEX_NAME_CHARS =
Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<');

private final static Function<String, String> SHARD_ID_PREFERENCE =
shardId -> shardId == null ? shardId : "_shards:"+shardId;

private final FlintOptions options;

public FlintOpenSearchClient(FlintOptions options) {
this.options = options;
}

public static QueryBuilder queryBuilder(String query) throws IOException {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
QueryBuilder queryBuilder = new MatchAllQueryBuilder();
if (!Strings.isNullOrEmpty(query)) {
XContentParser
parser =
XContentType.JSON.xContent().createParser(xContentRegistry, IGNORE_DEPRECATIONS, query);
queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser);
}
return queryBuilder;
}

@Override
public void createIndex(String indexName, FlintMetadata metadata) {
LOG.info("Creating Flint index " + indexName + " with metadata " + metadata);
Expand Down Expand Up @@ -169,47 +174,6 @@ public void deleteIndex(String indexName) {
}
}

/**
* Create {@link FlintReader}.
*
* @param indexName index name.
* @param query DSL query. DSL query is null means match_all.
* @return {@link FlintReader}.
*/
@Override
public FlintReader createReader(String indexName, String query) {
return createReader(indexName, query, null);
}

/**
* Create {@link FlintReader}.
*
* @param indexName index name.
* @param query DSL query. DSL query is null means match_all
* @param shardId shardId
* @return
*/
@Override
public FlintReader createReader(String indexName, String query, String shardId) {
LOG.info("Creating Flint index reader for " + indexName + " with query " + query + " shardId " + shardId);
try {
QueryBuilder queryBuilder = new MatchAllQueryBuilder();
if (!Strings.isNullOrEmpty(query)) {
XContentParser
parser =
XContentType.JSON.xContent().createParser(xContentRegistry, IGNORE_DEPRECATIONS, query);
queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser);
}
return new OpenSearchScrollReader(createClient(),
sanitizeIndexName(indexName),
new SearchSourceBuilder().query(queryBuilder),
options,
SHARD_ID_PREFERENCE.apply(shardId));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public FlintWriter createWriter(String indexName) {
LOG.info(String.format("Creating Flint index writer for %s, refresh_policy:%s, " +
"batch_bytes:%d", indexName, options.getRefreshPolicy(), options.getBatchBytes()));
Expand Down
Loading
Loading