Skip to content

Commit

Permalink
[SnapshotV2] Add timestamp of last successful fetch of pinned timesta…
Browse files Browse the repository at this point in the history
…mps in node stats (opensearch-project#15611)

---------

Signed-off-by: Lakshya Taragi <lakshya.taragi@gmail.com>
  • Loading branch information
ltaragi authored and dk2k committed Oct 16, 2024
1 parent de0d4fa commit ee3631b
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.remotestore;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -20,6 +22,8 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
static final String INDEX_NAME = "remote-store-test-idx-1";
Expand Down Expand Up @@ -180,4 +184,41 @@ public void onFailure(Exception e) {
assertBusy(() -> assertEquals(Set.of(timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2()));
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}

public void testLastSuccessfulFetchOfPinnedTimestampsPresentInNodeStats() throws Exception {
logger.info("Starting up cluster manager");
logger.info("cluster.remote_store.pinned_timestamps.enabled set to true");
logger.info("cluster.remote_store.pinned_timestamps.scheduler_interval set to minimum value of 1minute");
Settings pinnedTimestampEnabledSettings = Settings.builder()
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.getKey(), "1m")
.build();
internalCluster().startClusterManagerOnlyNode(pinnedTimestampEnabledSettings);
String remoteNodeName = internalCluster().startDataOnlyNodes(1, pinnedTimestampEnabledSettings).get(0);
ensureStableCluster(2);
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
remoteNodeName
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

assertBusy(() -> {
long lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
assertTrue(lastSuccessfulFetchOfPinnedTimestamps > 0L);
NodesStatsResponse nodesStatsResponse = internalCluster().client()
.admin()
.cluster()
.prepareNodesStats()
.addMetric(REMOTE_STORE.metricName())
.execute()
.actionGet();
for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
long lastRecordedFetch = nodeStats.getRemoteStoreNodeStats().getLastSuccessfulFetchOfPinnedTimestamps();
assertTrue(lastRecordedFetch >= lastSuccessfulFetchOfPinnedTimestamps);
}
});

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.node.NodesResourceUsageStats;
import org.opensearch.node.remotestore.RemoteStoreNodeStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
import org.opensearch.repositories.RepositoriesStats;
import org.opensearch.script.ScriptCacheStats;
Expand Down Expand Up @@ -162,6 +163,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private NodeCacheStats nodeCacheStats;

@Nullable
private RemoteStoreNodeStats remoteStoreNodeStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -243,6 +247,12 @@ public NodeStats(StreamInput in) throws IOException {
} else {
nodeCacheStats = null;
}
// TODO: change version to V_2_18_0
if (in.getVersion().onOrAfter(Version.CURRENT)) {
remoteStoreNodeStats = in.readOptionalWriteable(RemoteStoreNodeStats::new);
} else {
remoteStoreNodeStats = null;
}
}

public NodeStats(
Expand Down Expand Up @@ -274,7 +284,8 @@ public NodeStats(
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
@Nullable RepositoriesStats repositoriesStats,
@Nullable AdmissionControlStats admissionControlStats,
@Nullable NodeCacheStats nodeCacheStats
@Nullable NodeCacheStats nodeCacheStats,
@Nullable RemoteStoreNodeStats remoteStoreNodeStats
) {
super(node);
this.timestamp = timestamp;
Expand Down Expand Up @@ -305,6 +316,7 @@ public NodeStats(
this.repositoriesStats = repositoriesStats;
this.admissionControlStats = admissionControlStats;
this.nodeCacheStats = nodeCacheStats;
this.remoteStoreNodeStats = remoteStoreNodeStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -467,6 +479,11 @@ public NodeCacheStats getNodeCacheStats() {
return nodeCacheStats;
}

@Nullable
public RemoteStoreNodeStats getRemoteStoreNodeStats() {
return remoteStoreNodeStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -525,6 +542,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
out.writeOptionalWriteable(nodeCacheStats);
}
// TODO: change version to V_2_18_0
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalWriteable(remoteStoreNodeStats);
}
}

@Override
Expand Down Expand Up @@ -631,6 +652,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getNodeCacheStats() != null) {
getNodeCacheStats().toXContent(builder, params);
}
if (getRemoteStoreNodeStats() != null) {
getRemoteStoreNodeStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ public enum Metric {
SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"),
REPOSITORIES("repositories"),
ADMISSION_CONTROL("admission_control"),
CACHE_STATS("caches");
CACHE_STATS("caches"),
REMOTE_STORE("remote_store");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics),
NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics),
NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics)
NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.REMOTE_STORE.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.MonitorService;
import org.opensearch.node.remotestore.RemoteStoreNodeStats;
import org.opensearch.plugins.PluginsService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -241,7 +242,8 @@ public NodeStats stats(
boolean segmentReplicationTrackerStats,
boolean repositoriesStats,
boolean admissionControl,
boolean cacheService
boolean cacheService,
boolean remoteStoreNodeStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand Down Expand Up @@ -274,7 +276,8 @@ public NodeStats stats(
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null,
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null,
admissionControl ? this.admissionControlService.stats() : null,
cacheService ? this.cacheService.stats(indices) : null
cacheService ? this.cacheService.stats(indices) : null,
remoteStoreNodeStats ? new RemoteStoreNodeStats() : null
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.node.remotestore;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Node level remote store stats
* @opensearch.internal
*/
public class RemoteStoreNodeStats implements Writeable, ToXContentFragment {

public static final String STATS_NAME = "remote_store";
public static final String LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS = "last_successful_fetch_of_pinned_timestamps";

/**
* Time stamp for the last successful fetch of pinned timestamps by the {@linkplain RemoteStorePinnedTimestampService}
*/
private final long lastSuccessfulFetchOfPinnedTimestamps;

public RemoteStoreNodeStats() {
this.lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
}

public long getLastSuccessfulFetchOfPinnedTimestamps() {
return this.lastSuccessfulFetchOfPinnedTimestamps;
}

public RemoteStoreNodeStats(StreamInput in) throws IOException {
this.lastSuccessfulFetchOfPinnedTimestamps = in.readLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(this.lastSuccessfulFetchOfPinnedTimestamps);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(STATS_NAME);
builder.field(LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS, this.lastSuccessfulFetchOfPinnedTimestamps);
return builder.endObject();
}

@Override
public String toString() {
return "RemoteStoreNodeStats{ lastSuccessfulFetchOfPinnedTimestamps=" + lastSuccessfulFetchOfPinnedTimestamps + "}";
}

@Override
public boolean equals(Object o) {
if (o == null) {
return false;
}
if (o.getClass() != RemoteStoreNodeStats.class) {
return false;
}
RemoteStoreNodeStats other = (RemoteStoreNodeStats) o;
return this.lastSuccessfulFetchOfPinnedTimestamps == other.lastSuccessfulFetchOfPinnedTimestamps;
}

@Override
public int hashCode() {
return Objects.hash(lastSuccessfulFetchOfPinnedTimestamps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.opensearch.node.NodeResourceUsageStats;
import org.opensearch.node.NodesResourceUsageStats;
import org.opensearch.node.ResponseCollectorService;
import org.opensearch.node.remotestore.RemoteStoreNodeStats;
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
Expand Down Expand Up @@ -614,6 +615,14 @@ public void testSerialization() throws IOException {
} else {
assertEquals(nodeCacheStats, deserializedNodeCacheStats);
}

RemoteStoreNodeStats remoteStoreNodeStats = nodeStats.getRemoteStoreNodeStats();
RemoteStoreNodeStats deserializedRemoteStoreNodeStats = deserializedNodeStats.getRemoteStoreNodeStats();
if (remoteStoreNodeStats == null) {
assertNull(deserializedRemoteStoreNodeStats);
} else {
assertEquals(remoteStoreNodeStats, deserializedRemoteStoreNodeStats);
}
}
}
}
Expand Down Expand Up @@ -996,6 +1005,16 @@ public void apply(String action, AdmissionControlActionType admissionControlActi
nodeCacheStats = new NodeCacheStats(cacheStatsMap, flags);
}

RemoteStoreNodeStats remoteStoreNodeStats = null;
if (frequently()) {
remoteStoreNodeStats = new RemoteStoreNodeStats() {
@Override
public long getLastSuccessfulFetchOfPinnedTimestamps() {
return 123456L;
}
};
}

// TODO: Only remote_store based aspects of NodeIndicesStats are being tested here.
// It is possible to test other metrics in NodeIndicesStats as well since it extends Writeable now
return new NodeStats(
Expand Down Expand Up @@ -1027,7 +1046,8 @@ public void apply(String action, AdmissionControlActionType admissionControlActi
segmentReplicationRejectionStats,
null,
admissionControlStats,
nodeCacheStats
nodeCacheStats,
remoteStoreNodeStats
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ private ClusterStatsNodeResponse createClusterStatsNodeResponse(
null,
null,
null,
null,
null
);
if (defaultBehavior) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -226,6 +227,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -257,6 +259,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -319,6 +322,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -350,6 +354,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -381,6 +386,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
)
);
Expand Down
Loading

0 comments on commit ee3631b

Please sign in to comment.