Skip to content

Commit

Permalink
Add remote path settings to RemoteStoreSettings (#13225)
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 authored Apr 17, 2024
1 parent 1c208d5 commit 9c35a84
Show file tree
Hide file tree
Showing 14 changed files with 133 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.opensearch.indices.IndexCreationException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.InvalidIndexNameException;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.indices.SystemIndices;
import org.opensearch.indices.replication.common.ReplicationType;
Expand Down Expand Up @@ -191,7 +192,8 @@ public MetadataCreateIndexService(
final NamedXContentRegistry xContentRegistry,
final SystemIndices systemIndices,
final boolean forbidPrivateIndexSettings,
final AwarenessReplicaBalance awarenessReplicaBalance
final AwarenessReplicaBalance awarenessReplicaBalance,
final RemoteStoreSettings remoteStoreSettings
) {
this.settings = settings;
this.clusterService = clusterService;
Expand All @@ -211,7 +213,7 @@ public MetadataCreateIndexService(
createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true);
Supplier<Version> minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion();
remoteStorePathStrategyResolver = isRemoteDataAttributePresent(settings)
? new RemoteStorePathStrategyResolver(clusterService.getClusterSettings(), minNodeVersionSupplier)
? new RemoteStorePathStrategyResolver(remoteStoreSettings, minNodeVersionSupplier)
: null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,8 +713,6 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
IndicesService.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
IndicesService.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,

// Admission Control Settings
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
Expand All @@ -732,7 +730,9 @@ public void apply(Settings value, Settings current, Settings previous) {

RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
package org.opensearch.index.remote;

import org.opensearch.Version;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;

import java.util.function.Supplier;

Expand All @@ -23,35 +22,21 @@
*/
public class RemoteStorePathStrategyResolver {

private volatile PathType type;

private volatile PathHashAlgorithm hashAlgorithm;

private final RemoteStoreSettings remoteStoreSettings;
private final Supplier<Version> minNodeVersionSupplier;

public RemoteStorePathStrategyResolver(ClusterSettings clusterSettings, Supplier<Version> minNodeVersionSupplier) {
public RemoteStorePathStrategyResolver(RemoteStoreSettings remoteStoreSettings, Supplier<Version> minNodeVersionSupplier) {
this.remoteStoreSettings = remoteStoreSettings;
this.minNodeVersionSupplier = minNodeVersionSupplier;
type = clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING);
hashAlgorithm = clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING);
clusterSettings.addSettingsUpdateConsumer(IndicesService.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING, this::setType);
clusterSettings.addSettingsUpdateConsumer(IndicesService.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, this::setHashAlgorithm);
}

public RemoteStorePathStrategy get() {
PathType pathType;
PathHashAlgorithm pathHashAlgorithm;
// Min node version check ensures that we are enabling the new prefix type only when all the nodes understand it.
pathType = Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 ? type : PathType.FIXED;
pathType = Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 ? remoteStoreSettings.getPathType() : PathType.FIXED;
// If the path type is fixed, hash algorithm is not applicable.
pathHashAlgorithm = pathType == PathType.FIXED ? null : hashAlgorithm;
pathHashAlgorithm = pathType == PathType.FIXED ? null : remoteStoreSettings.getPathHashAlgorithm();
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);
}

private void setType(PathType type) {
this.type = type;
}

private void setHashAlgorithm(PathHashAlgorithm hashAlgorithm) {
this.hashAlgorithm = hashAlgorithm;
}
}
30 changes: 0 additions & 30 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.service.CacheService;
Expand Down Expand Up @@ -125,8 +124,6 @@
import org.opensearch.index.query.QueryRewriteContext;
import org.opensearch.index.recovery.RecoveryStats;
import org.opensearch.index.refresh.RefreshStats;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.index.seqno.RetentionLeaseStats;
Expand Down Expand Up @@ -308,33 +305,6 @@ public class IndicesService extends AbstractLifecycleComponent
Property.Final
);

/**
* This setting is used to set the remote store blob store path type strategy. This setting is effective only for
* remote store enabled cluster.
*/
@ExperimentalApi
public static final Setting<PathType> CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING = new Setting<>(
"cluster.remote_store.index.path.type",
PathType.FIXED.toString(),
PathType::parseString,
Property.NodeScope,
Property.Dynamic
);

/**
* This setting is used to set the remote store blob store path hash algorithm strategy. This setting is effective only for
* remote store enabled cluster. This setting will come to effect if the {@link #CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING}
* is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
*/
@ExperimentalApi
public static final Setting<PathHashAlgorithm> CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING = new Setting<>(
"cluster.remote_store.index.path.hash_algorithm",
PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString(),
PathHashAlgorithm::parseString,
Property.NodeScope,
Property.Dynamic
);

/**
* The node's settings.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

package org.opensearch.indices;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.remote.RemoteStoreEnums;

/**
* Settings for remote store
Expand Down Expand Up @@ -65,12 +67,41 @@ public class RemoteStoreSettings {
Property.Dynamic
);

/**
* This setting is used to set the remote store blob store path type strategy. This setting is effective only for
* remote store enabled cluster.
*/
@ExperimentalApi
public static final Setting<RemoteStoreEnums.PathType> CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING = new Setting<>(
"cluster.remote_store.index.path.type",
RemoteStoreEnums.PathType.FIXED.toString(),
RemoteStoreEnums.PathType::parseString,
Property.NodeScope,
Property.Dynamic
);

/**
* This setting is used to set the remote store blob store path hash algorithm strategy. This setting is effective only for
* remote store enabled cluster. This setting will come to effect if the {@link #CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING}
* is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
*/
@ExperimentalApi
public static final Setting<RemoteStoreEnums.PathHashAlgorithm> CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING = new Setting<>(
"cluster.remote_store.index.path.hash_algorithm",
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString(),
RemoteStoreEnums.PathHashAlgorithm::parseString,
Property.NodeScope,
Property.Dynamic
);

private volatile TimeValue clusterRemoteTranslogBufferInterval;
private volatile int minRemoteSegmentMetadataFiles;
private volatile TimeValue clusterRemoteTranslogTransferTimeout;
private volatile RemoteStoreEnums.PathType pathType;
private volatile RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm;

public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
this::setClusterRemoteTranslogBufferInterval
Expand All @@ -82,11 +113,17 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
this::setMinRemoteSegmentMetadataFiles
);

this.clusterRemoteTranslogTransferTimeout = CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING.get(settings);
clusterRemoteTranslogTransferTimeout = CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
this::setClusterRemoteTranslogTransferTimeout
);

pathType = clusterSettings.get(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING, this::setPathType);

pathHashAlgorithm = clusterSettings.get(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, this::setPathHashAlgorithm);
}

public TimeValue getClusterRemoteTranslogBufferInterval() {
Expand All @@ -112,4 +149,22 @@ public TimeValue getClusterRemoteTranslogTransferTimeout() {
private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) {
this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout;
}

@ExperimentalApi
public RemoteStoreEnums.PathType getPathType() {
return pathType;
}

@ExperimentalApi
public RemoteStoreEnums.PathHashAlgorithm getPathHashAlgorithm() {
return pathHashAlgorithm;
}

private void setPathType(RemoteStoreEnums.PathType pathType) {
this.pathType = pathType;
}

private void setPathHashAlgorithm(RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm) {
this.pathHashAlgorithm = pathHashAlgorithm;
}
}
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,8 @@ protected Node(
xContentRegistry,
systemIndices,
forbidPrivateIndexSettings,
awarenessReplicaBalance
awarenessReplicaBalance,
remoteStoreSettings
);
pluginsService.filterPlugins(Plugin.class)
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.index.mapper.MetadataFieldMapper;
import org.opensearch.index.mapper.RoutingFieldMapper;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.indices.DefaultRemoteStoreSettings;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.InvalidIndexNameException;
import org.opensearch.indices.ShardLimitValidator;
Expand Down Expand Up @@ -738,7 +739,8 @@ public void testRolloverClusterState() throws Exception {
null,
systemIndices,
false,
new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings())
new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()),
DefaultRemoteStoreSettings.INSTANCE
);
MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(
clusterService,
Expand Down Expand Up @@ -876,7 +878,8 @@ public void testRolloverClusterStateForDataStream() throws Exception {
null,
systemIndices,
false,
new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings())
new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()),
DefaultRemoteStoreSettings.INSTANCE
);
MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(
clusterService,
Expand Down Expand Up @@ -1054,7 +1057,8 @@ public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception {
null,
new SystemIndices(emptyMap()),
false,
new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings())
new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()),
DefaultRemoteStoreSettings.INSTANCE
);
MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(
clusterService,
Expand Down
Loading

0 comments on commit 9c35a84

Please sign in to comment.