diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java new file mode 100644 index 0000000000000..0bb53309f7a78 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Set; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase { + static final String INDEX_NAME = "remote-store-test-idx-1"; + + ActionListener noOpActionListener = new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) {} + }; + + public void testTimestampPinUnpin() throws Exception { + prepareCluster(1, 1, INDEX_NAME, 0, 2); + ensureGreen(INDEX_NAME); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + Tuple> pinnedTimestampWithFetchTimestamp = RemoteStorePinnedTimestampService.getPinnedTimestamps(); + long lastFetchTimestamp = pinnedTimestampWithFetchTimestamp.v1(); + assertEquals(-1L, lastFetchTimestamp); + assertEquals(Set.of(), pinnedTimestampWithFetchTimestamp.v2()); + + assertThrows( + IllegalArgumentException.class, + () -> remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", noOpActionListener) + ); + + long timestamp1 = System.currentTimeMillis() + 30000L; + long timestamp2 = System.currentTimeMillis() + 60000L; + long timestamp3 = System.currentTimeMillis() + 900000L; + remoteStorePinnedTimestampService.pinTimestamp(timestamp1, "ss2", noOpActionListener); + remoteStorePinnedTimestampService.pinTimestamp(timestamp2, "ss3", noOpActionListener); + remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss4", noOpActionListener); + + remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1)); + + assertBusy(() -> { + Tuple> pinnedTimestampWithFetchTimestamp_2 = RemoteStorePinnedTimestampService.getPinnedTimestamps(); + long lastFetchTimestamp_2 = pinnedTimestampWithFetchTimestamp_2.v1(); + assertTrue(lastFetchTimestamp_2 != -1); + assertEquals(Set.of(timestamp1, timestamp2, timestamp3), pinnedTimestampWithFetchTimestamp_2.v2()); + }); + + remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3)); + + // This should be a no-op as pinning entity is different + remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", noOpActionListener); + // Unpinning already pinned entity + remoteStorePinnedTimestampService.unpinTimestamp(timestamp2, "ss3", noOpActionListener); + // Adding different entity to already pinned timestamp + remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss5", noOpActionListener); + + remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1)); + + assertBusy(() -> { + Tuple> pinnedTimestampWithFetchTimestamp_3 = RemoteStorePinnedTimestampService.getPinnedTimestamps(); + long lastFetchTimestamp_3 = pinnedTimestampWithFetchTimestamp_3.v1(); + assertTrue(lastFetchTimestamp_3 != -1); + assertEquals(Set.of(timestamp1, timestamp3), pinnedTimestampWithFetchTimestamp_3.v2()); + }); + + remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3)); + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index a73e5d44b7e02..7baae17dd77cd 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -142,6 +142,7 @@ import org.opensearch.node.Node.DiscoverySettings; import org.opensearch.node.NodeRoleSettings; import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.node.resource.tracker.ResourceTrackerSettings; import org.opensearch.persistent.PersistentTasksClusterService; import org.opensearch.persistent.decider.EnableAssignmentDecider; @@ -760,6 +761,8 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA, SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING, + RemoteStorePinnedTimestampService.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL, + // Composite index settings CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java new file mode 100644 index 0000000000000..030491cf8b7b9 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java @@ -0,0 +1,144 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.remote.RemoteWriteableBlobEntity; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.compress.Compressor; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; + +/** + * Wrapper class for uploading/downloading {@link RemotePinnedTimestamps} to/from remote blob store + * + * @opensearch.internal + */ +public class RemotePinnedTimestamps extends RemoteWriteableBlobEntity { + private static final Logger logger = LogManager.getLogger(RemotePinnedTimestamps.class); + + /** + * Represents a collection of pinned timestamps and their associated pinning entities. + * This class is thread-safe and implements the Writeable interface for serialization. + */ + public static class PinnedTimestamps implements Writeable { + private final Map> pinnedTimestampPinningEntityMap; + + public PinnedTimestamps(Map> pinnedTimestampPinningEntityMap) { + this.pinnedTimestampPinningEntityMap = new ConcurrentHashMap<>(pinnedTimestampPinningEntityMap); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(pinnedTimestampPinningEntityMap, StreamOutput::writeLong, StreamOutput::writeStringCollection); + } + + public static PinnedTimestamps readFrom(StreamInput in) throws IOException { + return new PinnedTimestamps(in.readMap(StreamInput::readLong, StreamInput::readStringList)); + } + + /** + * Pins a timestamp against a pinning entity. + * + * @param timestamp The timestamp to pin. + * @param pinningEntity The entity pinning the timestamp. + */ + public void pin(Long timestamp, String pinningEntity) { + logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity); + pinnedTimestampPinningEntityMap.computeIfAbsent(timestamp, k -> new ArrayList<>()).add(pinningEntity); + } + + /** + * Unpins a timestamp for a specific pinning entity. + * + * @param timestamp The timestamp to unpin. + * @param pinningEntity The entity unpinning the timestamp. + */ + public void unpin(Long timestamp, String pinningEntity) { + logger.debug("Unpinning timestamp = {} against entity = {}", timestamp, pinningEntity); + if (pinnedTimestampPinningEntityMap.containsKey(timestamp) == false + || pinnedTimestampPinningEntityMap.get(timestamp).contains(pinningEntity) == false) { + logger.warn("Timestamp: {} is not pinned by entity: {}", timestamp, pinningEntity); + } + pinnedTimestampPinningEntityMap.compute(timestamp, (k, v) -> { + v.remove(pinningEntity); + return v.isEmpty() ? null : v; + }); + } + + public Map> getPinnedTimestampPinningEntityMap() { + return new HashMap<>(pinnedTimestampPinningEntityMap); + } + } + + public static final String PINNED_TIMESTAMPS = "pinned_timestamps"; + public static final ChecksumWritableBlobStoreFormat PINNED_TIMESTAMPS_FORMAT = new ChecksumWritableBlobStoreFormat<>( + PINNED_TIMESTAMPS, + PinnedTimestamps::readFrom + ); + + private PinnedTimestamps pinnedTimestamps; + + public RemotePinnedTimestamps(String clusterUUID, Compressor compressor) { + super(clusterUUID, compressor); + pinnedTimestamps = new PinnedTimestamps(new HashMap<>()); + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of(PINNED_TIMESTAMPS), PINNED_TIMESTAMPS); + } + + @Override + public String getType() { + return PINNED_TIMESTAMPS; + } + + @Override + public String generateBlobFileName() { + return this.blobFileName = String.join(DELIMITER, PINNED_TIMESTAMPS, RemoteStoreUtils.invertLong(System.currentTimeMillis())); + } + + @Override + public InputStream serialize() throws IOException { + return PINNED_TIMESTAMPS_FORMAT.serialize(pinnedTimestamps, generateBlobFileName(), getCompressor()).streamInput(); + } + + @Override + public PinnedTimestamps deserialize(InputStream inputStream) throws IOException { + return PINNED_TIMESTAMPS_FORMAT.deserialize(blobName, Streams.readFully(inputStream)); + } + + public void setBlobFileName(String blobFileName) { + this.blobFileName = blobFileName; + } + + public void setPinnedTimestamps(PinnedTimestamps pinnedTimestamps) { + this.pinnedTimestamps = pinnedTimestamps; + } + + public PinnedTimestamps getPinnedTimestamps() { + return pinnedTimestamps; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java new file mode 100644 index 0000000000000..2a65dd993d0af --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.remote.RemoteWriteableBlobEntity; +import org.opensearch.common.remote.RemoteWriteableEntityBlobStore; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +/** + * Extends the RemoteClusterStateBlobStore to support {@link RemotePinnedTimestamps} + */ +public class RemoteStorePinnedTimestampsBlobStore extends RemoteWriteableEntityBlobStore< + RemotePinnedTimestamps.PinnedTimestamps, + RemotePinnedTimestamps> { + + public static final String PINNED_TIMESTAMPS_PATH_TOKEN = "pinned_timestamps"; + private final BlobStoreRepository blobStoreRepository; + + public RemoteStorePinnedTimestampsBlobStore( + BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, + String clusterName, + ThreadPool threadPool, + String executor + ) { + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool, executor, PINNED_TIMESTAMPS_PATH_TOKEN); + this.blobStoreRepository = blobStoreRepository; + } + + @Override + public BlobPath getBlobPathForUpload(final RemoteWriteableBlobEntity obj) { + return blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN); + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 409f84354a8b1..1a9b233b387b2 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -185,6 +185,7 @@ import org.opensearch.monitor.fs.FsProbe; import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.node.resource.tracker.NodeResourceUsageTracker; import org.opensearch.persistent.PersistentTasksClusterService; import org.opensearch.persistent.PersistentTasksExecutor; @@ -810,6 +811,18 @@ protected Node( remoteIndexPathUploader = null; remoteClusterStateCleanupManager = null; } + final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService; + if (isRemoteStoreAttributePresent(settings)) { + remoteStorePinnedTimestampService = new RemoteStorePinnedTimestampService( + repositoriesServiceReference::get, + settings, + threadPool, + clusterService + ); + resourcesToClose.add(remoteStorePinnedTimestampService); + } else { + remoteStorePinnedTimestampService = null; + } // collect engine factory providers from plugins final Collection enginePlugins = pluginsService.filterPlugins(EnginePlugin.class); @@ -1173,7 +1186,8 @@ protected Node( clusterModule.getIndexNameExpressionResolver(), repositoryService, transportService, - actionModule.getActionFilters() + actionModule.getActionFilters(), + remoteStorePinnedTimestampService ); SnapshotShardsService snapshotShardsService = new SnapshotShardsService( settings, @@ -1426,6 +1440,7 @@ protected Node( b.bind(MetricsRegistry.class).toInstance(metricsRegistry); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader); + b.bind(RemoteStorePinnedTimestampService.class).toProvider(() -> remoteStorePinnedTimestampService); b.bind(RemoteClusterStateCleanupManager.class).toProvider(() -> remoteClusterStateCleanupManager); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); @@ -1581,6 +1596,12 @@ public Node start() throws NodeValidationException { if (remoteIndexPathUploader != null) { remoteIndexPathUploader.start(); } + final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = injector.getInstance( + RemoteStorePinnedTimestampService.class + ); + if (remoteStorePinnedTimestampService != null) { + remoteStorePinnedTimestampService.start(); + } // Load (and maybe upgrade) the metadata stored on disk final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class); gatewayMetaState.start( diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java new file mode 100644 index 0000000000000..35730a75a8142 --- /dev/null +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -0,0 +1,321 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node.remotestore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.AbstractAsyncTask; +import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.model.RemotePinnedTimestamps; +import org.opensearch.gateway.remote.model.RemotePinnedTimestamps.PinnedTimestamps; +import org.opensearch.gateway.remote.model.RemoteStorePinnedTimestampsBlobStore; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.node.Node; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * Service for managing pinned timestamps in a remote store. + * This service handles pinning and unpinning of timestamps, as well as periodic updates of the pinned timestamps set. + * + * @opensearch.internal + */ +public class RemoteStorePinnedTimestampService implements Closeable { + private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class); + private static Tuple> pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); + public static final int PINNED_TIMESTAMP_FILES_TO_KEEP = 5; + + private final Supplier repositoriesService; + private final Settings settings; + private final ThreadPool threadPool; + private final ClusterService clusterService; + private BlobStoreRepository blobStoreRepository; + private BlobStoreTransferService blobStoreTransferService; + private RemoteStorePinnedTimestampsBlobStore pinnedTimestampsBlobStore; + private AsyncUpdatePinnedTimestampTask asyncUpdatePinnedTimestampTask; + private volatile TimeValue pinnedTimestampsSchedulerInterval; + private final Semaphore updateTimetampPinningSemaphore = new Semaphore(1); + + /** + * Controls pinned timestamp scheduler interval + */ + public static final Setting CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL = Setting.timeSetting( + "cluster.remote_store.pinned_timestamps.scheduler_interval", + TimeValue.timeValueMinutes(3), + TimeValue.timeValueMinutes(1), + Setting.Property.NodeScope + ); + + /** + * Controls allowed timestamp values to be pinned from past + */ + public static final Setting CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL = Setting.timeSetting( + "cluster.remote_store.pinned_timestamps.lookback_interval", + TimeValue.timeValueMinutes(1), + TimeValue.timeValueMinutes(1), + TimeValue.timeValueMinutes(5), + Setting.Property.NodeScope + ); + + public RemoteStorePinnedTimestampService( + Supplier repositoriesService, + Settings settings, + ThreadPool threadPool, + ClusterService clusterService + ) { + this.repositoriesService = repositoriesService; + this.settings = settings; + this.threadPool = threadPool; + this.clusterService = clusterService; + + pinnedTimestampsSchedulerInterval = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.get(settings); + } + + /** + * Starts the RemoteStorePinnedTimestampService. + * This method validates the remote store configuration, initializes components, + * and starts the asynchronous update task. + */ + public void start() { + validateRemoteStoreConfiguration(); + initializeComponents(); + startAsyncUpdateTask(); + } + + private void validateRemoteStoreConfiguration() { + final String remoteStoreRepo = settings.get( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY + ); + assert remoteStoreRepo != null : "Remote Segment Store repository is not configured"; + final Repository repository = repositoriesService.get().repository(remoteStoreRepo); + assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; + blobStoreRepository = (BlobStoreRepository) repository; + } + + private void initializeComponents() { + String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value(); + blobStoreTransferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), this.threadPool); + pinnedTimestampsBlobStore = new RemoteStorePinnedTimestampsBlobStore( + blobStoreTransferService, + blobStoreRepository, + clusterName, + this.threadPool, + ThreadPool.Names.REMOTE_STATE_READ + ); + } + + private void startAsyncUpdateTask() { + asyncUpdatePinnedTimestampTask = new AsyncUpdatePinnedTimestampTask(logger, threadPool, pinnedTimestampsSchedulerInterval, true); + } + + /** + * Pins a timestamp in the remote store. + * + * @param timestamp The timestamp to be pinned + * @param pinningEntity The entity responsible for pinning the timestamp + * @param listener A listener to be notified when the pinning operation completes + * @throws IllegalArgumentException If the timestamp is less than the current time minus one second + */ + public void pinTimestamp(long timestamp, String pinningEntity, ActionListener listener) { + // If a caller uses current system time to pin the timestamp, following check will almost always fail. + // So, we allow pinning timestamp in the past upto some buffer + long lookbackIntervalInMills = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL.get(settings).millis(); + if (timestamp < TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - lookbackIntervalInMills) { + throw new IllegalArgumentException( + "Timestamp to be pinned is less than current timestamp - value of cluster.remote_store.pinned_timestamps.lookback_interval" + ); + } + updatePinning(pinnedTimestamps -> pinnedTimestamps.pin(timestamp, pinningEntity), listener); + } + + /** + * Unpins a timestamp from the remote store. + * + * @param timestamp The timestamp to be unpinned + * @param pinningEntity The entity responsible for unpinning the timestamp + * @param listener A listener to be notified when the unpinning operation completes + */ + public void unpinTimestamp(long timestamp, String pinningEntity, ActionListener listener) { + updatePinning(pinnedTimestamps -> pinnedTimestamps.unpin(timestamp, pinningEntity), listener); + } + + private void updatePinning(Consumer updateConsumer, ActionListener listener) { + RemotePinnedTimestamps remotePinnedTimestamps = new RemotePinnedTimestamps( + clusterService.state().metadata().clusterUUID(), + blobStoreRepository.getCompressor() + ); + BlobPath path = pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps); + try { + if (updateTimetampPinningSemaphore.tryAcquire(10, TimeUnit.MINUTES)) { + ActionListener semaphoreAwareListener = ActionListener.runBefore(listener, updateTimetampPinningSemaphore::release); + ActionListener> listCallResponseListener = getListenerForListCallResponse( + remotePinnedTimestamps, + updateConsumer, + semaphoreAwareListener + ); + blobStoreTransferService.listAllInSortedOrder( + path, + remotePinnedTimestamps.getType(), + Integer.MAX_VALUE, + listCallResponseListener + ); + } else { + throw new TimeoutException("Timed out while waiting to acquire lock in updatePinning"); + } + } catch (InterruptedException | TimeoutException e) { + listener.onFailure(e); + } + } + + private ActionListener> getListenerForListCallResponse( + RemotePinnedTimestamps remotePinnedTimestamps, + Consumer updateConsumer, + ActionListener listener + ) { + return ActionListener.wrap(blobMetadata -> { + PinnedTimestamps pinnedTimestamps = new PinnedTimestamps(new HashMap<>()); + if (blobMetadata.isEmpty() == false) { + pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps); + } + updateConsumer.accept(pinnedTimestamps); + remotePinnedTimestamps.setPinnedTimestamps(pinnedTimestamps); + ActionListener writeCallResponseListener = getListenerForWriteCallResponse( + remotePinnedTimestamps, + blobMetadata, + listener + ); + pinnedTimestampsBlobStore.writeAsync(remotePinnedTimestamps, writeCallResponseListener); + }, listener::onFailure); + } + + private ActionListener getListenerForWriteCallResponse( + RemotePinnedTimestamps remotePinnedTimestamps, + List blobMetadata, + ActionListener listener + ) { + return ActionListener.wrap(unused -> { + // Delete older pinnedTimestamp files + if (blobMetadata.size() > PINNED_TIMESTAMP_FILES_TO_KEEP) { + List oldFilesToBeDeleted = blobMetadata.subList(PINNED_TIMESTAMP_FILES_TO_KEEP, blobMetadata.size()) + .stream() + .map(BlobMetadata::name) + .collect(Collectors.toList()); + try { + blobStoreTransferService.deleteBlobs( + pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps), + oldFilesToBeDeleted + ); + } catch (IOException e) { + logger.error("Exception while deleting stale pinned timestamps", e); + } + } + listener.onResponse(null); + }, listener::onFailure); + } + + private PinnedTimestamps readExistingPinnedTimestamps(String blobFilename, RemotePinnedTimestamps remotePinnedTimestamps) { + remotePinnedTimestamps.setBlobFileName(blobFilename); + remotePinnedTimestamps.setFullBlobName(pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps)); + try { + return pinnedTimestampsBlobStore.read(remotePinnedTimestamps); + } catch (IOException e) { + throw new RuntimeException("Failed to read existing pinned timestamps", e); + } + } + + @Override + public void close() throws IOException { + asyncUpdatePinnedTimestampTask.close(); + } + + // Visible for testing + public void setPinnedTimestampsSchedulerInterval(TimeValue pinnedTimestampsSchedulerInterval) { + this.pinnedTimestampsSchedulerInterval = pinnedTimestampsSchedulerInterval; + rescheduleAsyncUpdatePinnedTimestampTask(); + } + + private void rescheduleAsyncUpdatePinnedTimestampTask() { + if (pinnedTimestampsSchedulerInterval != null) { + pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); + asyncUpdatePinnedTimestampTask.close(); + startAsyncUpdateTask(); + } + } + + public static Tuple> getPinnedTimestamps() { + return pinnedTimestampsSet; + } + + /** + * Inner class for asynchronously updating the pinned timestamp set. + */ + private final class AsyncUpdatePinnedTimestampTask extends AbstractAsyncTask { + private AsyncUpdatePinnedTimestampTask(Logger logger, ThreadPool threadPool, TimeValue interval, boolean autoReschedule) { + super(logger, threadPool, interval, autoReschedule); + rescheduleIfNecessary(); + } + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + protected void runInternal() { + long triggerTimestamp = System.currentTimeMillis(); + RemotePinnedTimestamps remotePinnedTimestamps = new RemotePinnedTimestamps( + clusterService.state().metadata().clusterUUID(), + blobStoreRepository.getCompressor() + ); + BlobPath path = pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps); + blobStoreTransferService.listAllInSortedOrder(path, remotePinnedTimestamps.getType(), 1, new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + if (blobMetadata.isEmpty()) { + return; + } + PinnedTimestamps pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps); + logger.debug( + "Fetched pinned timestamps from remote store: {} - {}", + triggerTimestamp, + pinnedTimestamps.getPinnedTimestampPinningEntityMap().keySet() + ); + pinnedTimestampsSet = new Tuple<>(triggerTimestamp, pinnedTimestamps.getPinnedTimestampPinningEntityMap().keySet()); + } + + @Override + public void onFailure(Exception e) { + logger.error("Exception while listing pinned timestamp files", e); + } + }); + } + } +} diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index acc2dc83749cd..5e49208465dbb 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -92,6 +92,7 @@ import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -180,6 +181,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; private final TransportService transportService; + private final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService; private final OngoingRepositoryOperations repositoryOperations = new OngoingRepositoryOperations(); @@ -208,7 +210,8 @@ public SnapshotsService( IndexNameExpressionResolver indexNameExpressionResolver, RepositoriesService repositoriesService, TransportService transportService, - ActionFilters actionFilters + ActionFilters actionFilters, + @Nullable RemoteStorePinnedTimestampService remoteStorePinnedTimestampService ) { this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; @@ -216,6 +219,7 @@ public SnapshotsService( this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService); this.threadPool = transportService.getThreadPool(); this.transportService = transportService; + this.remoteStorePinnedTimestampService = remoteStorePinnedTimestampService; // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction( diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemotePinnedTimestampsTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemotePinnedTimestampsTests.java new file mode 100644 index 0000000000000..309263a634265 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemotePinnedTimestampsTests.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.Compressor; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class RemotePinnedTimestampsTests extends OpenSearchTestCase { + + private RemotePinnedTimestamps remotePinnedTimestamps; + + @Before + public void setup() { + Compressor compressor = new DeflateCompressor(); + remotePinnedTimestamps = new RemotePinnedTimestamps("testClusterUUID", compressor); + } + + public void testGenerateBlobFileName() { + String fileName = remotePinnedTimestamps.generateBlobFileName(); + assertTrue(fileName.startsWith(RemotePinnedTimestamps.PINNED_TIMESTAMPS)); + assertEquals(fileName, remotePinnedTimestamps.getBlobFileName()); + } + + public void testSerializeAndDeserialize() throws IOException { + RemotePinnedTimestamps.PinnedTimestamps pinnedTimestamps = new RemotePinnedTimestamps.PinnedTimestamps(new HashMap<>()); + pinnedTimestamps.pin(1000L, "entity1"); + pinnedTimestamps.pin(2000L, "entity2"); + remotePinnedTimestamps.setPinnedTimestamps(pinnedTimestamps); + + InputStream serialized = remotePinnedTimestamps.serialize(); + RemotePinnedTimestamps.PinnedTimestamps deserialized = remotePinnedTimestamps.deserialize(serialized); + + assertEquals(pinnedTimestamps.getPinnedTimestampPinningEntityMap(), deserialized.getPinnedTimestampPinningEntityMap()); + } + + public void testSetAndGetPinnedTimestamps() { + RemotePinnedTimestamps.PinnedTimestamps pinnedTimestamps = new RemotePinnedTimestamps.PinnedTimestamps(new HashMap<>()); + remotePinnedTimestamps.setPinnedTimestamps(pinnedTimestamps); + assertEquals(pinnedTimestamps, remotePinnedTimestamps.getPinnedTimestamps()); + } + + public void testPinnedTimestampsPin() { + RemotePinnedTimestamps.PinnedTimestamps pinnedTimestamps = new RemotePinnedTimestamps.PinnedTimestamps(new HashMap<>()); + pinnedTimestamps.pin(1000L, "entity1"); + pinnedTimestamps.pin(1000L, "entity2"); + pinnedTimestamps.pin(2000L, "entity3"); + + Map> expected = new HashMap<>(); + expected.put(1000L, Arrays.asList("entity1", "entity2")); + expected.put(2000L, List.of("entity3")); + + assertEquals(expected, pinnedTimestamps.getPinnedTimestampPinningEntityMap()); + } + + public void testPinnedTimestampsUnpin() { + RemotePinnedTimestamps.PinnedTimestamps pinnedTimestamps = new RemotePinnedTimestamps.PinnedTimestamps(new HashMap<>()); + pinnedTimestamps.pin(1000L, "entity1"); + pinnedTimestamps.pin(1000L, "entity2"); + pinnedTimestamps.pin(2000L, "entity3"); + + pinnedTimestamps.unpin(1000L, "entity1"); + pinnedTimestamps.unpin(2000L, "entity3"); + + Map> expected = new HashMap<>(); + expected.put(1000L, List.of("entity2")); + + assertEquals(expected, pinnedTimestamps.getPinnedTimestampPinningEntityMap()); + } + + public void testPinnedTimestampsReadFromAndWriteTo() throws IOException { + RemotePinnedTimestamps.PinnedTimestamps original = new RemotePinnedTimestamps.PinnedTimestamps(new HashMap<>()); + original.pin(1000L, "entity1"); + original.pin(2000L, "entity2"); + + BytesStreamOutput out = new BytesStreamOutput(); + original.writeTo(out); + + StreamInput in = new BytesStreamInput(out.bytes().toBytesRef().bytes); + RemotePinnedTimestamps.PinnedTimestamps deserialized = RemotePinnedTimestamps.PinnedTimestamps.readFrom(in); + + assertEquals(original.getPinnedTimestampPinningEntityMap(), deserialized.getPinnedTimestampPinningEntityMap()); + } +} diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 9c58fc8fde084..769dfeb37ff8d 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2015,7 +2015,8 @@ public void onFailure(final Exception e) { indexNameExpressionResolver, repositoriesService, transportService, - actionFilters + actionFilters, + null ); nodeEnv = new NodeEnvironment(settings, environment); final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList());