Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport action for ClusterFormationFailureHelper warnings #85782

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/85782.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 85782
summary: Transport action for `ClusterFormationFailureHelper` warnings
area: Health
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.coordination.ClusterFormationInfoAction;
import org.elasticsearch.action.admin.cluster.coordination.TransportClusterFormationInfoAction;
import org.elasticsearch.action.admin.cluster.desirednodes.DeleteDesiredNodesAction;
import org.elasticsearch.action.admin.cluster.desirednodes.GetDesiredNodesAction;
import org.elasticsearch.action.admin.cluster.desirednodes.TransportDeleteDesiredNodesAction;
Expand Down Expand Up @@ -548,6 +550,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
actions.register(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
actions.register(ClusterFormationInfoAction.INSTANCE, TransportClusterFormationInfoAction.class);
actions.register(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class);
actions.register(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class);
actions.register(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.coordination;

import org.elasticsearch.action.ActionType;

public class ClusterFormationInfoAction extends ActionType<ClusterFormationInfoResponse> {

public static final ClusterFormationInfoAction INSTANCE = new ClusterFormationInfoAction();
public static final String NAME = "cluster:formation/info";

private ClusterFormationInfoAction() {
super(NAME, ClusterFormationInfoResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.coordination;

import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Represents the information a node can offer with regards to cluster formation.
* It currently holds an optional warning message that the {@link org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper}
* is sometimes emitting.
*/
public class ClusterFormationInfoNodeResponse extends BaseNodeResponse implements ToXContentFragment {

private static final String WARNING_MESSAGE_FIELD_NAME = "warning";

@Nullable
private final String warningMessage;

public ClusterFormationInfoNodeResponse(StreamInput in) throws IOException {
super(in);
warningMessage = in.readOptionalString();
}

public ClusterFormationInfoNodeResponse(DiscoveryNode node, @Nullable String warningMessage) {
super(node);
this.warningMessage = warningMessage;
}

@Nullable
public String warningMessage() {
return warningMessage;
}

public static ClusterFormationInfoNodeResponse readNodeResponse(StreamInput in) throws IOException {
return new ClusterFormationInfoNodeResponse(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(warningMessage);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (warningMessage != null) {
builder.startObject(getNode().getId());
builder.field(WARNING_MESSAGE_FIELD_NAME, warningMessage);
builder.endObject();
}
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.coordination;

import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Map;

/**
* A request to get information about cluster formation from a list of nodes (it's usually the master-eligible nodes that have this
* information).
*/
public class ClusterFormationInfoRequest extends BaseNodesRequest<ClusterFormationInfoRequest> {

public ClusterFormationInfoRequest(StreamInput in) throws IOException {
super(in);
}

/**
* Get cluster formation info from the specified nodes.
*/
public ClusterFormationInfoRequest(String... nodeIds) {
super(nodeIds);
if (nodeIds == null || nodeIds.length == 0) {
throw new IllegalArgumentException("Target nodes must be specified");
}
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.coordination;

import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;

public class ClusterFormationInfoResponse extends BaseNodesResponse<ClusterFormationInfoNodeResponse> implements ToXContentFragment {

public ClusterFormationInfoResponse(StreamInput in) throws IOException {
super(in);
}

public ClusterFormationInfoResponse(
ClusterName clusterName,
List<ClusterFormationInfoNodeResponse> nodes,
List<FailedNodeException> failures
) {
super(clusterName, nodes, failures);
}

@Override
protected List<ClusterFormationInfoNodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(ClusterFormationInfoNodeResponse::new);
}

@Override
protected void writeNodesTo(StreamOutput out, List<ClusterFormationInfoNodeResponse> nodes) throws IOException {
out.writeList(nodes);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("nodes");
for (ClusterFormationInfoNodeResponse nodeResponse : getNodes()) {
nodeResponse.toXContent(builder, params);
}
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.coordination;

import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* Transport action for retrieving information regarding the cluster formation.
* This will fetch the information from the requested nodes, and it'll likely be sourced from
* {@link Coordinator}.
*/
public class TransportClusterFormationInfoAction extends TransportNodesAction<
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realised that this probably can't be a TransportNodesAction: we're using this action when not a proper cluster so there's nothing holding the connections to other nodes open, so we'll have to explicitly acquire/release the connection yourself. See e.g. how Coordinator#handleJoinRequest and HotThreadsLoggingLagListener#onLagDetected wrap their transport activities in a call to TransportService#connect and release the connection afterwards. TransportNodesAction won't do that, we probably need to call TransportService#sendRequest directly.

Also it won't work to use the current cluster state to resolve node IDs since there is no cluster when using this. We'll need to use the exact DiscoveryNode instances that discovery tells us about.

ClusterFormationInfoRequest,
ClusterFormationInfoResponse,
TransportClusterFormationInfoAction.ClusterFormationInfoTransportRequest,
ClusterFormationInfoNodeResponse> {

private final Coordinator coordinator;

@Inject
public TransportClusterFormationInfoAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
Coordinator coordinator
) {
super(
ClusterFormationInfoAction.NAME,
threadPool,
clusterService,
transportService,
actionFilters,
ClusterFormationInfoRequest::new,
ClusterFormationInfoTransportRequest::new,
ThreadPool.Names.MANAGEMENT,
ThreadPool.Names.MANAGEMENT,
ClusterFormationInfoNodeResponse.class
);
this.coordinator = coordinator;
}

@Override
protected ClusterFormationInfoResponse newResponse(
ClusterFormationInfoRequest request,
List<ClusterFormationInfoNodeResponse> responses,
List<FailedNodeException> failures
) {
return new ClusterFormationInfoResponse(clusterService.getClusterName(), responses, failures);
}

@Override
protected ClusterFormationInfoTransportRequest newNodeRequest(ClusterFormationInfoRequest request) {
return new ClusterFormationInfoTransportRequest(request);
}

@Override
protected ClusterFormationInfoNodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new ClusterFormationInfoNodeResponse(in);
}

@Override
protected ClusterFormationInfoNodeResponse nodeOperation(ClusterFormationInfoTransportRequest nodeRequest, Task task) {
return new ClusterFormationInfoNodeResponse(clusterService.localNode(), coordinator.getClusterFormationWarning().orElse(null));
}

public static class ClusterFormationInfoTransportRequest extends TransportRequest {

ClusterFormationInfoRequest request;

public ClusterFormationInfoTransportRequest(StreamInput in) throws IOException {
super(in);
request = new ClusterFormationInfoRequest(in);
}

ClusterFormationInfoTransportRequest(ClusterFormationInfoRequest request) {
this.request = request;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

Expand All @@ -51,6 +52,8 @@ public class ClusterFormationFailureHelper {
private final Runnable logLastFailedJoinAttempt;
@Nullable // if no warning is scheduled
private volatile WarningScheduler warningScheduler;
@Nullable // if no warning was ever scheduled or if we stopped emitting warnings
private volatile String emittedWarning;

public ClusterFormationFailureHelper(
Settings settings,
Expand All @@ -76,6 +79,7 @@ public void start() {

public void stop() {
warningScheduler = null;
emittedWarning = null;
}

private class WarningScheduler {
Expand All @@ -95,7 +99,8 @@ public void onFailure(Exception e) {
protected void doRun() {
if (isActive()) {
logLastFailedJoinAttempt.run();
logger.warn(clusterFormationStateSupplier.get().getDescription());
emittedWarning = clusterFormationStateSupplier.get().getDescription();
logger.warn(emittedWarning);
}
}

Expand All @@ -114,6 +119,10 @@ public String toString() {
}
}

public Optional<String> emittedWarning() {
return Optional.ofNullable(emittedWarning);
}

record ClusterFormationState(
Settings settings,
ClusterState clusterState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ private ClusterFormationState getClusterFormationState() {
);
}

public Optional<String> getClusterFormationWarning() {
return clusterFormationFailureHelper.emittedWarning();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're going to need to need a structured response, e.g. to detect partial network partitions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started out making tweaks to this PR for this and wound up just putting up this draft instead -- #87306. Before I put more effort into that, do you think it's appropriate to expose all of ClusterFormationState? We seem to need most of the information in it (if not all of it), but it does seem a little odd to publicly expose a private record like that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only a private record because we've never done anything with this info except log it before. Definitely makes sense to me to make it public and use it more widely.

}

private void onLeaderFailure(MessageSupplier message, Exception e) {
synchronized (mutex) {
if (mode != Mode.CANDIDATE) {
Expand Down