Skip to content

Commit

Permalink
Add Flint Index Purging Logic (#2372) (#2389)
Browse files Browse the repository at this point in the history
* Add Flint Index Purging Logic

- Introduce dynamic settings for enabling/disabling purging and controlling index TTL.
- Reuse default result index name as a common prefix for all result indices.
- Change result index to a non-hidden index for better user experience.
- Allow custom result index specification in the data source.
- Move default result index name from spark to core package to avoid cross-package references.
- Add validation for provided result index name in the data source.
- Use pattern prefix + data source name for default result index naming.

Testing:
- Verified old documents are purged in a cluster setup.
- Checked result index naming with and without custom names, ensuring validation is applied.

Note: Tests will be added in a subsequent PR.



* address comments



---------


(cherry picked from commit 1bcacd1)

Signed-off-by: Kaituo Li <kaituo@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 37e010f commit dd48b9b
Show file tree
Hide file tree
Showing 24 changed files with 553 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ public enum Key {
SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name"),
SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"),
SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit");
SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit"),
SESSION_INDEX_TTL("plugins.query.executionengine.spark.session.index.ttl"),
RESULT_INDEX_TTL("plugins.query.executionengine.spark.result.index.ttl"),
AUTO_INDEX_MANAGEMENT_ENABLED(
"plugins.query.executionengine.spark.auto_index_management.enabled");

@Getter private final String keyValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import java.util.function.Function;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -25,11 +25,24 @@

@Getter
@Setter
@AllArgsConstructor
@EqualsAndHashCode
@JsonIgnoreProperties(ignoreUnknown = true)
public class DataSourceMetadata {

public static final String DEFAULT_RESULT_INDEX = "query_execution_result";
public static final int MAX_RESULT_INDEX_NAME_SIZE = 255;
// OS doesn’t allow uppercase: https://tinyurl.com/yse2xdbx
public static final String RESULT_INDEX_NAME_PATTERN = "[a-z0-9_-]+";
public static String INVALID_RESULT_INDEX_NAME_SIZE =
"Result index name size must contains less than "
+ MAX_RESULT_INDEX_NAME_SIZE
+ " characters";
public static String INVALID_CHAR_IN_RESULT_INDEX_NAME =
"Result index name has invalid character. Valid characters are a-z, 0-9, -(hyphen) and"
+ " _(underscore)";
public static String INVALID_RESULT_INDEX_PREFIX =
"Result index must start with " + DEFAULT_RESULT_INDEX;

@JsonProperty private String name;

@JsonProperty private String description;
Expand All @@ -44,18 +57,31 @@ public class DataSourceMetadata {

@JsonProperty private String resultIndex;

public static Function<String, String> DATASOURCE_TO_RESULT_INDEX =
datasourceName -> String.format("%s_%s", DEFAULT_RESULT_INDEX, datasourceName);

public DataSourceMetadata(
String name,
String description,
DataSourceType connector,
List<String> allowedRoles,
Map<String, String> properties,
String resultIndex) {
this.name = name;
String errorMessage = validateCustomResultIndex(resultIndex);
if (errorMessage != null) {
throw new IllegalArgumentException(errorMessage);
}
if (resultIndex == null) {
this.resultIndex = fromNameToCustomResultIndex();
} else {
this.resultIndex = resultIndex;
}

this.connector = connector;
this.description = StringUtils.EMPTY;
this.description = description;
this.properties = properties;
this.allowedRoles = allowedRoles;
this.resultIndex = resultIndex;
}

public DataSourceMetadata() {
Expand All @@ -71,9 +97,56 @@ public DataSourceMetadata() {
public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() {
return new DataSourceMetadata(
DEFAULT_DATASOURCE_NAME,
StringUtils.EMPTY,
DataSourceType.OPENSEARCH,
Collections.emptyList(),
ImmutableMap.of(),
null);
}

public String validateCustomResultIndex(String resultIndex) {
if (resultIndex == null) {
return null;
}
if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) {
return INVALID_RESULT_INDEX_NAME_SIZE;
}
if (!resultIndex.matches(RESULT_INDEX_NAME_PATTERN)) {
return INVALID_CHAR_IN_RESULT_INDEX_NAME;
}
if (resultIndex != null && !resultIndex.startsWith(DEFAULT_RESULT_INDEX)) {
return INVALID_RESULT_INDEX_PREFIX;
}
return null;
}

/**
* Since we are using datasource name to create result index, we need to make sure that the final
* name is valid
*
* @param resultIndex result index name
* @return valid result index name
*/
private String convertToValidResultIndex(String resultIndex) {
// Limit Length
if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) {
resultIndex = resultIndex.substring(0, MAX_RESULT_INDEX_NAME_SIZE);
}

// Pattern Matching: Remove characters that don't match the pattern
StringBuilder validChars = new StringBuilder();
for (char c : resultIndex.toCharArray()) {
if (String.valueOf(c).matches(RESULT_INDEX_NAME_PATTERN)) {
validChars.append(c);
}
}
return validChars.toString();
}

public String fromNameToCustomResultIndex() {
if (name == null) {
throw new IllegalArgumentException("Datasource name cannot be null");
}
return convertToValidResultIndex(DATASOURCE_TO_RESULT_INDEX.apply(name.toLowerCase()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
Expand Down Expand Up @@ -197,6 +198,7 @@ public Set<DataSourceMetadata> getDataSourceMetadata(boolean isDefaultDataSource
ds ->
new DataSourceMetadata(
ds.getName(),
StringUtils.EMPTY,
ds.getConnectorType(),
Collections.emptyList(),
ImmutableMap.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -62,6 +63,7 @@ void testIterator() {
dataSource ->
new DataSourceMetadata(
dataSource.getName(),
StringUtils.EMPTY,
dataSource.getConnectorType(),
Collections.emptyList(),
ImmutableMap.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -382,6 +383,7 @@ void testRemovalOfAuthorizationInfo() {
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties,
Expand All @@ -407,6 +409,7 @@ void testRemovalOfAuthorizationInfoForAccessKeyAndSecretKye() {
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties,
Expand Down Expand Up @@ -434,6 +437,7 @@ void testRemovalOfAuthorizationInfoForGlueWithRoleARN() {
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testGlue",
StringUtils.EMPTY,
DataSourceType.S3GLUE,
Collections.singletonList("glue_access"),
properties,
Expand Down Expand Up @@ -498,6 +502,7 @@ void testGetRawDataSourceMetadata() {
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void testToDataSourceMetadataFromJson() {
dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS);
dataSourceMetadata.setAllowedRoles(List.of("prometheus_access"));
dataSourceMetadata.setProperties(Map.of("prometheus.uri", "https://localhost:9090"));
dataSourceMetadata.setResultIndex("query_execution_result2");
Gson gson = new Gson();
String json = gson.toJson(dataSourceMetadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -103,6 +104,7 @@ public void updateDataSourceAPITest() {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"update_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "https://localhost:9090"),
Expand All @@ -116,6 +118,7 @@ public void updateDataSourceAPITest() {
DataSourceMetadata updateDSM =
new DataSourceMetadata(
"update_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "https://randomtest.com:9090"),
Expand Down Expand Up @@ -175,6 +178,7 @@ public void deleteDataSourceTest() {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"delete_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "https://localhost:9090"),
Expand Down Expand Up @@ -214,6 +218,7 @@ public void getAllDataSourceTest() {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"get_all_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "https://localhost:9090"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -44,6 +45,7 @@ protected void init() throws InterruptedException, IOException {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"my_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "http://localhost:9090"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ protected void init() throws InterruptedException, IOException {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"my_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "http://localhost:9090"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -44,6 +45,7 @@ protected void init() throws InterruptedException, IOException {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"my_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "http://localhost:9090"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.opensearch.setting;

import static org.opensearch.common.settings.Settings.EMPTY;
import static org.opensearch.common.unit.TimeValue.timeValueDays;
import static org.opensearch.sql.common.setting.Settings.Key.ENCYRPTION_MASTER_KEY;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -25,6 +26,7 @@
import org.opensearch.common.settings.SecureSetting;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.MemorySizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.common.setting.LegacySettings;
import org.opensearch.sql.common.setting.Settings;

Expand Down Expand Up @@ -149,6 +151,27 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<TimeValue> SESSION_INDEX_TTL_SETTING =
Setting.positiveTimeSetting(
Key.SESSION_INDEX_TTL.getKeyValue(),
timeValueDays(14),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<TimeValue> RESULT_INDEX_TTL_SETTING =
Setting.positiveTimeSetting(
Key.RESULT_INDEX_TTL.getKeyValue(),
timeValueDays(60),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<Boolean> AUTO_INDEX_MANAGEMENT_ENABLED_SETTING =
Setting.boolSetting(
Key.AUTO_INDEX_MANAGEMENT_ENABLED.getKeyValue(),
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -231,6 +254,24 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.SPARK_EXECUTION_SESSION_LIMIT,
SPARK_EXECUTION_SESSION_LIMIT_SETTING,
new Updater(Key.SPARK_EXECUTION_SESSION_LIMIT));
register(
settingBuilder,
clusterSettings,
Key.SESSION_INDEX_TTL,
SESSION_INDEX_TTL_SETTING,
new Updater(Key.SESSION_INDEX_TTL));
register(
settingBuilder,
clusterSettings,
Key.RESULT_INDEX_TTL,
RESULT_INDEX_TTL_SETTING,
new Updater(Key.RESULT_INDEX_TTL));
register(
settingBuilder,
clusterSettings,
Key.AUTO_INDEX_MANAGEMENT_ENABLED,
AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
new Updater(Key.AUTO_INDEX_MANAGEMENT_ENABLED));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -298,6 +339,9 @@ public static List<Setting<?>> pluginSettings() {
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.add(SPARK_EXECUTION_SESSION_ENABLED_SETTING)
.add(SPARK_EXECUTION_SESSION_LIMIT_SETTING)
.add(SESSION_INDEX_TTL_SETTING)
.add(RESULT_INDEX_TTL_SETTING)
.add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING)
.build();
}

Expand Down
Loading

0 comments on commit dd48b9b

Please sign in to comment.