Skip to content

Commit

Permalink
Add timestamp pinning service and scheduler to update in-memory state (
Browse files Browse the repository at this point in the history
…#15180)

---------

Signed-off-by: Sachin Kale <kalsac@amazon.com>
Co-authored-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale and Sachin Kale committed Aug 15, 2024
1 parent 01acf1c commit 1717b55
Show file tree
Hide file tree
Showing 9 changed files with 727 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.common.collect.Tuple;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Set;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
static final String INDEX_NAME = "remote-store-test-idx-1";

ActionListener<Void> noOpActionListener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {}

@Override
public void onFailure(Exception e) {}
};

public void testTimestampPinUnpin() throws Exception {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp = RemoteStorePinnedTimestampService.getPinnedTimestamps();
long lastFetchTimestamp = pinnedTimestampWithFetchTimestamp.v1();
assertEquals(-1L, lastFetchTimestamp);
assertEquals(Set.of(), pinnedTimestampWithFetchTimestamp.v2());

assertThrows(
IllegalArgumentException.class,
() -> remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", noOpActionListener)
);

long timestamp1 = System.currentTimeMillis() + 30000L;
long timestamp2 = System.currentTimeMillis() + 60000L;
long timestamp3 = System.currentTimeMillis() + 900000L;
remoteStorePinnedTimestampService.pinTimestamp(timestamp1, "ss2", noOpActionListener);
remoteStorePinnedTimestampService.pinTimestamp(timestamp2, "ss3", noOpActionListener);
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss4", noOpActionListener);

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1));

assertBusy(() -> {
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_2 = RemoteStorePinnedTimestampService.getPinnedTimestamps();
long lastFetchTimestamp_2 = pinnedTimestampWithFetchTimestamp_2.v1();
assertTrue(lastFetchTimestamp_2 != -1);
assertEquals(Set.of(timestamp1, timestamp2, timestamp3), pinnedTimestampWithFetchTimestamp_2.v2());
});

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3));

// This should be a no-op as pinning entity is different
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", noOpActionListener);
// Unpinning already pinned entity
remoteStorePinnedTimestampService.unpinTimestamp(timestamp2, "ss3", noOpActionListener);
// Adding different entity to already pinned timestamp
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss5", noOpActionListener);

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1));

assertBusy(() -> {
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_3 = RemoteStorePinnedTimestampService.getPinnedTimestamps();
long lastFetchTimestamp_3 = pinnedTimestampWithFetchTimestamp_3.v1();
assertTrue(lastFetchTimestamp_3 != -1);
assertEquals(Set.of(timestamp1, timestamp3), pinnedTimestampWithFetchTimestamp_3.v2());
});

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.opensearch.node.Node.DiscoverySettings;
import org.opensearch.node.NodeRoleSettings;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.persistent.PersistentTasksClusterService;
import org.opensearch.persistent.decider.EnableAssignmentDecider;
Expand Down Expand Up @@ -760,6 +761,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA,
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,

RemoteStorePinnedTimestampService.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL,

// Composite index settings
CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING,

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.common.remote.RemoteWriteableBlobEntity;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.compress.Compressor;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;

/**
* Wrapper class for uploading/downloading {@link RemotePinnedTimestamps} to/from remote blob store
*
* @opensearch.internal
*/
public class RemotePinnedTimestamps extends RemoteWriteableBlobEntity<RemotePinnedTimestamps.PinnedTimestamps> {
private static final Logger logger = LogManager.getLogger(RemotePinnedTimestamps.class);

/**
* Represents a collection of pinned timestamps and their associated pinning entities.
* This class is thread-safe and implements the Writeable interface for serialization.
*/
public static class PinnedTimestamps implements Writeable {
private final Map<Long, List<String>> pinnedTimestampPinningEntityMap;

public PinnedTimestamps(Map<Long, List<String>> pinnedTimestampPinningEntityMap) {
this.pinnedTimestampPinningEntityMap = new ConcurrentHashMap<>(pinnedTimestampPinningEntityMap);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(pinnedTimestampPinningEntityMap, StreamOutput::writeLong, StreamOutput::writeStringCollection);
}

public static PinnedTimestamps readFrom(StreamInput in) throws IOException {
return new PinnedTimestamps(in.readMap(StreamInput::readLong, StreamInput::readStringList));
}

/**
* Pins a timestamp against a pinning entity.
*
* @param timestamp The timestamp to pin.
* @param pinningEntity The entity pinning the timestamp.
*/
public void pin(Long timestamp, String pinningEntity) {
logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity);
pinnedTimestampPinningEntityMap.computeIfAbsent(timestamp, k -> new ArrayList<>()).add(pinningEntity);
}

/**
* Unpins a timestamp for a specific pinning entity.
*
* @param timestamp The timestamp to unpin.
* @param pinningEntity The entity unpinning the timestamp.
*/
public void unpin(Long timestamp, String pinningEntity) {
logger.debug("Unpinning timestamp = {} against entity = {}", timestamp, pinningEntity);
if (pinnedTimestampPinningEntityMap.containsKey(timestamp) == false
|| pinnedTimestampPinningEntityMap.get(timestamp).contains(pinningEntity) == false) {
logger.warn("Timestamp: {} is not pinned by entity: {}", timestamp, pinningEntity);
}
pinnedTimestampPinningEntityMap.compute(timestamp, (k, v) -> {
v.remove(pinningEntity);
return v.isEmpty() ? null : v;
});
}

public Map<Long, List<String>> getPinnedTimestampPinningEntityMap() {
return new HashMap<>(pinnedTimestampPinningEntityMap);
}
}

public static final String PINNED_TIMESTAMPS = "pinned_timestamps";
public static final ChecksumWritableBlobStoreFormat<PinnedTimestamps> PINNED_TIMESTAMPS_FORMAT = new ChecksumWritableBlobStoreFormat<>(
PINNED_TIMESTAMPS,
PinnedTimestamps::readFrom
);

private PinnedTimestamps pinnedTimestamps;

public RemotePinnedTimestamps(String clusterUUID, Compressor compressor) {
super(clusterUUID, compressor);
pinnedTimestamps = new PinnedTimestamps(new HashMap<>());
}

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(PINNED_TIMESTAMPS), PINNED_TIMESTAMPS);
}

@Override
public String getType() {
return PINNED_TIMESTAMPS;
}

@Override
public String generateBlobFileName() {
return this.blobFileName = String.join(DELIMITER, PINNED_TIMESTAMPS, RemoteStoreUtils.invertLong(System.currentTimeMillis()));
}

@Override
public InputStream serialize() throws IOException {
return PINNED_TIMESTAMPS_FORMAT.serialize(pinnedTimestamps, generateBlobFileName(), getCompressor()).streamInput();
}

@Override
public PinnedTimestamps deserialize(InputStream inputStream) throws IOException {
return PINNED_TIMESTAMPS_FORMAT.deserialize(blobName, Streams.readFully(inputStream));
}

public void setBlobFileName(String blobFileName) {
this.blobFileName = blobFileName;
}

public void setPinnedTimestamps(PinnedTimestamps pinnedTimestamps) {
this.pinnedTimestamps = pinnedTimestamps;
}

public PinnedTimestamps getPinnedTimestamps() {
return pinnedTimestamps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.remote.RemoteWriteableBlobEntity;
import org.opensearch.common.remote.RemoteWriteableEntityBlobStore;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

/**
* Extends the RemoteClusterStateBlobStore to support {@link RemotePinnedTimestamps}
*/
public class RemoteStorePinnedTimestampsBlobStore extends RemoteWriteableEntityBlobStore<
RemotePinnedTimestamps.PinnedTimestamps,
RemotePinnedTimestamps> {

public static final String PINNED_TIMESTAMPS_PATH_TOKEN = "pinned_timestamps";
private final BlobStoreRepository blobStoreRepository;

public RemoteStorePinnedTimestampsBlobStore(
BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository,
String clusterName,
ThreadPool threadPool,
String executor
) {
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool, executor, PINNED_TIMESTAMPS_PATH_TOKEN);
this.blobStoreRepository = blobStoreRepository;
}

@Override
public BlobPath getBlobPathForUpload(final RemoteWriteableBlobEntity<RemotePinnedTimestamps.PinnedTimestamps> obj) {
return blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN);
}
}
23 changes: 22 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.node.resource.tracker.NodeResourceUsageTracker;
import org.opensearch.persistent.PersistentTasksClusterService;
import org.opensearch.persistent.PersistentTasksExecutor;
Expand Down Expand Up @@ -810,6 +811,18 @@ protected Node(
remoteIndexPathUploader = null;
remoteClusterStateCleanupManager = null;
}
final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService;
if (isRemoteStoreAttributePresent(settings)) {
remoteStorePinnedTimestampService = new RemoteStorePinnedTimestampService(
repositoriesServiceReference::get,
settings,
threadPool,
clusterService
);
resourcesToClose.add(remoteStorePinnedTimestampService);
} else {
remoteStorePinnedTimestampService = null;
}

// collect engine factory providers from plugins
final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);
Expand Down Expand Up @@ -1173,7 +1186,8 @@ protected Node(
clusterModule.getIndexNameExpressionResolver(),
repositoryService,
transportService,
actionModule.getActionFilters()
actionModule.getActionFilters(),
remoteStorePinnedTimestampService
);
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
settings,
Expand Down Expand Up @@ -1426,6 +1440,7 @@ protected Node(
b.bind(MetricsRegistry.class).toInstance(metricsRegistry);
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader);
b.bind(RemoteStorePinnedTimestampService.class).toProvider(() -> remoteStorePinnedTimestampService);
b.bind(RemoteClusterStateCleanupManager.class).toProvider(() -> remoteClusterStateCleanupManager);
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
Expand Down Expand Up @@ -1581,6 +1596,12 @@ public Node start() throws NodeValidationException {
if (remoteIndexPathUploader != null) {
remoteIndexPathUploader.start();
}
final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = injector.getInstance(
RemoteStorePinnedTimestampService.class
);
if (remoteStorePinnedTimestampService != null) {
remoteStorePinnedTimestampService.start();
}
// Load (and maybe upgrade) the metadata stored on disk
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(
Expand Down
Loading

0 comments on commit 1717b55

Please sign in to comment.