Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster applier stats #77552

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -794,8 +794,8 @@ public DiscoveryStats stats() {
return new DiscoveryStats(
new PendingClusterStateStats(0, 0, 0),
publicationHandler.stats(),
getLocalNode().isMasterNode() ? masterService.getClusterStateUpdateStats() : null
);
getLocalNode().isMasterNode() ? masterService.getClusterStateUpdateStats() : null,
clusterApplier.getStats());
}

public void startInitialJoin() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ public interface ClusterApplier {
* themselves, typically using a more specific logger and at a less dramatic log level.
*/
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ActionListener<Void> listener);

ClusterApplierRecordingService.Stats getStats();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.cluster.service;

import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Stats.Recording;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public final class ClusterApplierRecordingService {

private final Map<String, MeanMetric> recordedActions = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the access to this is synchronized so I think we can just use a regular HashMap


synchronized Stats getStats() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems mildly scary, though I'm having a hard time guessing the impact. But technically, a barrage of stats requests would cause the CS applier thread to slow down (potentially considerably?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking is that this should complete in microseconds => we'd need implausibly many concurrent stats requests to cause meaningful starvation here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I guess it also runs on the management pool with somewhat limited concurrency 🤞 :)

return new Stats(
recordedActions.entrySet().stream()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> new Recording(e.getValue().count(), e.getValue().sum())))
);
}

synchronized void updateStats(Recorder recorder) {
Set<String> seenActions = new HashSet<>();
for (Tuple<String, Long> entry : recorder.recordings) {
String action = entry.v1();
long timeSpentMS = entry.v2();

MeanMetric metric = recordedActions.computeIfAbsent(action, key -> new MeanMetric());
metric.inc(timeSpentMS);
seenActions.add(action);
}
recordedActions.entrySet().removeIf(entry -> seenActions.contains(entry.getKey()) == false);
}

static final class Recorder {

private String currentAction;
private long startTimeNS;
private boolean recording;
private final List<Tuple<String, Long>> recordings = new LinkedList<>();

Releasable record(String action) {
if (recording) {
throw new IllegalStateException("already recording");
}

this.recording = true;
this.currentAction = action;
this.startTimeNS = System.nanoTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're ok with milliseconds so we can use ThreadPool#rawRelativeTimeInMillis which will let us simulate the passage of time in tests.

return this::stop;
}

void stop() {
recording = false;
long timeSpentMS = TimeValue.nsecToMSec(System.nanoTime() - this.startTimeNS);
recordings.add(new Tuple<>(currentAction, timeSpentMS));
}

List<Tuple<String, Long>> getRecordings() {
return recordings;
}
}

public static class Stats implements Writeable, ToXContentFragment {

private final Map<String, Recording> recordings;

public Stats(Map<String, Recording> recordings) {
this.recordings = recordings;
}

public Stats(StreamInput in) throws IOException {
this(in.readMap(StreamInput::readString, Recording::new));
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("cluster_applier_stats");
builder.startArray("recordings");
toXContentTimeSpent(builder, recordings);
builder.endObject();
return builder;
}

private static void toXContentTimeSpent(XContentBuilder builder, Map<String, Recording> timeSpentPerListener) throws IOException {
timeSpentPerListener.entrySet().stream()
.sorted((o1, o2) -> -Long.compare(o1.getValue().sum, o2.getValue().sum))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can never remember which way round this'll sort, but then I couldn't get the explicit alternative to work with type inference for some reason so it looks like this. Yuck either way.

Suggested change
.sorted((o1, o2) -> -Long.compare(o1.getValue().sum, o2.getValue().sum))
.sorted(Comparator.<Map.Entry<String, Recording>>comparingLong(o -> o.getValue().sum).reversed())

.forEach(entry -> {
try {
builder.startObject();
builder.field("name", entry.getKey());
String name = "cumulative_execution";
builder.field(name + "_count", entry.getValue().count);
builder.humanReadableField(name + "_time_millis", name + "_time", TimeValue.timeValueMillis(entry.getValue().sum));
builder.endObject();
} catch (IOException e) {
throw new UncheckedIOException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think I like this, we don't generally check for UncheckedIOException. Could we just use a normal loop and let a regular IOException propagate?

}
});
builder.endArray();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(recordings, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
}

public static class Recording implements Writeable {

private final long count;
private final long sum;

public Recording(long count, long sum) {
this.count = count;
this.sum = sum;
}

public Recording(StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);
out.writeVLong(sum);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -38,7 +38,6 @@
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -84,13 +83,16 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private final String nodeName;

private final ClusterApplierRecordingService recordingService;

private NodeConnectionsService nodeConnectionsService;

public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
this.nodeName = nodeName;
this.recordingService = new ClusterApplierRecordingService();

this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
Expand Down Expand Up @@ -389,10 +391,10 @@ private void runTask(String source, Function<ClusterState, ClusterState> updateF
final ClusterState previousClusterState = state.get();

final long startTimeMillis = threadPool.relativeTimeInMillis();
final StopWatch stopWatch = new StopWatch();
final Recorder stopWatch = new Recorder();
final ClusterState newClusterState;
try {
try (Releasable ignored = stopWatch.timing("running task [" + source + ']')) {
try (Releasable ignored = stopWatch.record("running task [" + source + ']')) {
newClusterState = updateFunction.apply(previousClusterState);
}
} catch (Exception e) {
Expand Down Expand Up @@ -448,7 +450,7 @@ private TimeValue getTimeSince(long startTimeMillis) {
return TimeValue.timeValueMillis(Math.max(0, threadPool.relativeTimeInMillis() - startTimeMillis));
}

private void applyChanges(ClusterState previousClusterState, ClusterState newClusterState, String source, StopWatch stopWatch) {
private void applyChanges(ClusterState previousClusterState, ClusterState newClusterState, String source, Recorder stopWatch) {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
Expand All @@ -465,15 +467,15 @@ private void applyChanges(ClusterState previousClusterState, ClusterState newClu
}

logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version());
try (Releasable ignored = stopWatch.timing("connecting to new nodes")) {
try (Releasable ignored = stopWatch.record("connecting to new nodes")) {
connectToNodesAndWait(newClusterState);
}

// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metadataChanged()) {
logger.debug("applying settings from cluster state with version {}", newClusterState.version());
final Settings incomingSettings = clusterChangedEvent.state().metadata().settings();
try (Releasable ignored = stopWatch.timing("applying settings")) {
try (Releasable ignored = stopWatch.record("applying settings")) {
clusterSettings.applySettings(incomingSettings);
}
}
Expand Down Expand Up @@ -505,33 +507,35 @@ protected final void connectToNodesAsync(ClusterState newClusterState, Runnable
nodeConnectionsService.connectToNodes(newClusterState.nodes(), onCompletion);
}

private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch) {
callClusterStateAppliers(clusterChangedEvent, stopWatch, highPriorityStateAppliers);
callClusterStateAppliers(clusterChangedEvent, stopWatch, normalPriorityStateAppliers);
callClusterStateAppliers(clusterChangedEvent, stopWatch, lowPriorityStateAppliers);
}

private static void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch,
private static void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch,
Collection<ClusterStateApplier> clusterStateAppliers) {
for (ClusterStateApplier applier : clusterStateAppliers) {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
try (Releasable ignored = stopWatch.timing("running applier [" + applier + "]")) {
final String name = applier.toString();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should render nicer names here ... the output as rendered JSON is a little weird with all the $ in them isn't it? (it's especially bothersome if the applier is a lambda and wouldn't even render to the same string on ever machine)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry this slipped right down my list, I need to get back to it soon.

We were planning to address the descriptions in a follow-up. I mean sure it's a bit weird to see the default toString() but it'd still tell us where to look at least.

Copy link
Member Author

@martijnvg martijnvg Oct 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the plan is to add a unique name for each applier/listener in a follow up pr.
I should have added as a comment to this PR.

try (Releasable ignored = stopWatch.record(name)) {
applier.applyClusterState(clusterChangedEvent);
}
}
}

private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch) {
callClusterStateListener(clusterChangedEvent, stopWatch, clusterStateListeners);
callClusterStateListener(clusterChangedEvent, stopWatch, timeoutClusterStateListeners.keySet());
}

private void callClusterStateListener(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch,
private void callClusterStateListener(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch,
Collection<? extends ClusterStateListener> listeners) {
for (ClusterStateListener listener : listeners) {
try {
logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
try (Releasable ignored = stopWatch.timing("notifying listener [" + listener + "]")) {
final String name = listener.toString();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here with the rendering, we have a couple of lambdas here I think.

try (Releasable ignored = stopWatch.record(name)) {
listener.clusterChanged(clusterChangedEvent);
}
} catch (Exception ex) {
Expand Down Expand Up @@ -579,12 +583,13 @@ public void onResponse(Void unused) {
}
}

private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source, StopWatch stopWatch) {
private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source, Recorder recorder) {
if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) {
logger.warn("cluster state applier task [{}] took [{}] which is above the warn threshold of [{}]: {}", source, executionTime,
slowTaskLoggingThreshold, Arrays.stream(stopWatch.taskInfo())
.map(ti -> '[' + ti.getTaskName() + "] took [" + ti.getTime().millis() + "ms]").collect(Collectors.joining(", ")));
slowTaskLoggingThreshold, recorder.getRecordings().stream()
.map(ti -> '[' + ti.v1() + "] took [" + ti.v2() + "ms]").collect(Collectors.joining(", ")));
}
recordingService.updateStats(recorder);
}

private class NotifyTimeout implements Runnable {
Expand Down Expand Up @@ -623,4 +628,9 @@ public void run() {
protected boolean applicationMayFail() {
return false;
}

@Override
public ClusterApplierRecordingService.Stats getStats() {
return recordingService.getStats();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.discovery;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.service.ClusterApplierRecordingService;
import org.elasticsearch.cluster.service.ClusterStateUpdateStats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -25,15 +26,17 @@ public class DiscoveryStats implements Writeable, ToXContentFragment {
private final PendingClusterStateStats queueStats;
private final PublishClusterStateStats publishStats;
private final ClusterStateUpdateStats clusterStateUpdateStats;
private final ClusterApplierRecordingService.Stats timeTrackerStats;

public DiscoveryStats(
PendingClusterStateStats queueStats,
PublishClusterStateStats publishStats,
ClusterStateUpdateStats clusterStateUpdateStats
) {
ClusterStateUpdateStats clusterStateUpdateStats,
ClusterApplierRecordingService.Stats timeTrackerStats) {
this.queueStats = queueStats;
this.publishStats = publishStats;
this.clusterStateUpdateStats = clusterStateUpdateStats;
this.timeTrackerStats = timeTrackerStats;
}

public DiscoveryStats(StreamInput in) throws IOException {
Expand All @@ -44,6 +47,11 @@ public DiscoveryStats(StreamInput in) throws IOException {
} else {
clusterStateUpdateStats = null;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
timeTrackerStats = in.readOptionalWriteable(ClusterApplierRecordingService.Stats::new);
} else {
timeTrackerStats = null;
}
}

@Override
Expand All @@ -53,6 +61,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_16_0)) {
out.writeOptionalWriteable(clusterStateUpdateStats);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalWriteable(timeTrackerStats);
}
}

@Override
Expand All @@ -67,6 +78,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (clusterStateUpdateStats != null) {
clusterStateUpdateStats.toXContent(builder, params);
}
if (timeTrackerStats != null) {
timeTrackerStats.toXContent(builder, params);
}
builder.endObject();
return builder;
}
Expand All @@ -86,4 +100,8 @@ public PendingClusterStateStats getQueueStats() {
public PublishClusterStateStats getPublishStats() {
return publishStats;
}

public ClusterApplierRecordingService.Stats getTimeTrackerStats() {
return timeTrackerStats;
}
}
Loading