Skip to content

Commit

Permalink
Add more detail to Legislator trace logs (elastic#39)
Browse files Browse the repository at this point in the history
It is sometimes challenging to work out the sequence of events that led to a
test failure, and a contributing factor is that scheduled tasks are identified
only by their scheduled time and not by any more useful identifier. This change
adds simple descriptions to these tasks to help untangle things a bit.
  • Loading branch information
DaveCTurner authored May 12, 2018
1 parent 429beeb commit 448697d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public LeaderCheckResponse handleLeaderCheckRequest(DiscoveryNode sender) {

private void publish(PublishRequest publishRequest) {
final Publication publication = new Publication(publishRequest);
futureExecutor.schedule(publishTimeout, publication::onTimeout);
futureExecutor.schedule(publishTimeout, "Publication#onTimeout", publication::onTimeout);
activeLeaderFailureDetector.get().updateNodesAndPing(publishRequest.getAcceptedState());
publication.start();
}
Expand Down Expand Up @@ -588,7 +588,6 @@ public void handleException(TransportException exp) {
public String executor() {
return ThreadPool.Names.SAME;
}

}

private class ApplyCommitResponseHandler implements TransportResponseHandler<TransportResponse.Empty> {
Expand Down Expand Up @@ -879,8 +878,8 @@ public OfferJoin handleSeekJoins(DiscoveryNode sender, SeekJoins seekJoins) {
masterService.submitTask("join of " + sender,
clusterState -> joinNodes(clusterState, Collections.singletonList(sender)).resultingState);
}
logger.debug("handleSeekJoins: not offering join: lastAcceptedVersion={}, term={}, mode={}",
consensusState.getLastAcceptedVersion(), consensusState.getCurrentTerm(), mode);
logger.debug("handleSeekJoins: not offering join: lastAcceptedVersion={}, term={}, mode={}, lastKnownLeader={}",
consensusState.getLastAcceptedVersion(), consensusState.getCurrentTerm(), mode, lastKnownLeader);
throw new ConsensusMessageRejectedException("not offering join");
}
}
Expand Down Expand Up @@ -933,7 +932,7 @@ public void handleOfferJoin(DiscoveryNode sender, OfferJoinCollector offerJoinCo

if (offerJoin.getLastAcceptedTerm() > consensusState.getLastAcceptedTerm()
|| (offerJoin.getLastAcceptedTerm() == consensusState.getLastAcceptedTerm()
&& offerJoin.getLastAcceptedVersion() > consensusState.getLastAcceptedVersion())) {
&& offerJoin.getLastAcceptedVersion() > consensusState.getLastAcceptedVersion())) {
logger.debug("handleOfferJoin: handing over pre-voting to [{}] because of {}", sender, offerJoin);
currentOfferJoinCollector = Optional.empty();
transport.sendPreJoinHandover(sender);
Expand Down Expand Up @@ -972,30 +971,30 @@ public void handleAbdication(DiscoveryNode sender, long currentTerm) {
private void sendStartJoin(StartJoinRequest startStartJoinRequest) {
nodeSupplier.get().forEach(n -> transport.sendStartJoin(n, startStartJoinRequest,
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public TransportResponse.Empty read(StreamInput in) throws IOException {
return TransportResponse.Empty.INSTANCE;
}
@Override
public TransportResponse.Empty read(StreamInput in) throws IOException {
return TransportResponse.Empty.INSTANCE;
}

@Override
public void handleResponse(TransportResponse.Empty response) {
// ignore
}
@Override
public void handleResponse(TransportResponse.Empty response) {
// ignore
}

@Override
public void handleException(TransportException exp) {
if (exp.getRootCause() instanceof ConsensusMessageRejectedException) {
logger.debug("handleStartJoinResponse: [{}] failed: {}", n, exp.getRootCause().getMessage());
} else {
logger.debug(() -> new ParameterizedMessage("handleStartJoinResponse: failed to get join from [{}]", n), exp);
@Override
public void handleException(TransportException exp) {
if (exp.getRootCause() instanceof ConsensusMessageRejectedException) {
logger.debug("handleStartJoinResponse: [{}] failed: {}", n, exp.getRootCause().getMessage());
} else {
logger.debug(() -> new ParameterizedMessage("handleStartJoinResponse: failed to get join from [{}]", n), exp);
}
}
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}));
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}));
}

public void handleDisconnectedNode(DiscoveryNode sender) {
Expand Down Expand Up @@ -1054,7 +1053,7 @@ void sendStartJoin(DiscoveryNode destination, StartJoinRequest startJoinRequest,
}

public interface FutureExecutor {
void schedule(TimeValue delay, Runnable task);
void schedule(TimeValue delay, String description, Runnable task);
}

private class SeekJoinsScheduler {
Expand Down Expand Up @@ -1087,7 +1086,7 @@ private void scheduleNextWakeUp() {
assert mode == Mode.CANDIDATE;
currentDelayMillis = Math.min(maxDelay.getMillis(), currentDelayMillis + minDelay.getMillis());
final long delay = randomLongBetween(minDelay.getMillis(), currentDelayMillis + 1);
futureExecutor.schedule(TimeValue.timeValueMillis(delay), this::handleWakeUp);
futureExecutor.schedule(TimeValue.timeValueMillis(delay), "SeekJoinsScheduler#scheduleNextWakeUp", this::handleWakeUp);
}

private void handleWakeUp() {
Expand Down Expand Up @@ -1128,7 +1127,7 @@ public void start() {
}

private void scheduleNextWakeUp() {
futureExecutor.schedule(heartbeatDelay, this::handleWakeUp);
futureExecutor.schedule(heartbeatDelay, "ActiveLeaderFailureDetector#handleWakeUp", this::handleWakeUp);
}

private void handleWakeUp() {
Expand Down Expand Up @@ -1173,7 +1172,7 @@ void start() {
logger.trace("FollowerCheck: sending follower check to [{}]", followerNode);
assert inFlight == false;
inFlight = true;
futureExecutor.schedule(heartbeatTimeout, this::onTimeout);
futureExecutor.schedule(heartbeatTimeout, "FollowerCheck#onTimeout", this::onTimeout);

HeartbeatRequest heartbeatRequest = new HeartbeatRequest(
consensusState.getCurrentTerm(), consensusState.getLastPublishedVersion());
Expand Down Expand Up @@ -1251,7 +1250,7 @@ public void start() {
}

private void scheduleNextWakeUp() {
futureExecutor.schedule(heartbeatDelay, this::handleWakeUp);
futureExecutor.schedule(heartbeatDelay, "ActiveFollowerFailureDetector#handleWakeUp", this::handleWakeUp);
}

void stop() {
Expand Down Expand Up @@ -1307,7 +1306,7 @@ void start() {
logger.trace("LeaderCheck: sending leader check to [{}]", leader);
assert inFlight == false;
inFlight = true;
futureExecutor.schedule(heartbeatTimeout, this::onTimeout);
futureExecutor.schedule(heartbeatTimeout, "LeaderCheck#onTimeout", this::onTimeout);

transport.sendLeaderCheckRequest(leader, new TransportResponseHandler<LeaderCheckResponse>() {
@Override
Expand All @@ -1321,7 +1320,7 @@ public void handleResponse(LeaderCheckResponse leaderCheckResponse) {
if (leaderVersion > localVersion) {
logger.trace("LeaderCheck.handleResponse: heartbeat for version {} > local version {}, starting lag detector",
leaderVersion, localVersion);
futureExecutor.schedule(publishTimeout, () -> {
futureExecutor.schedule(publishTimeout, "LeaderCheck#lagDetection", () -> {
long localVersion2 = getLastCommittedState().map(ClusterState::getVersion).orElse(-1L);
if (leaderVersion > localVersion2) {
logger.debug("LeaderCheck.handleResponse: lag detected: local version {} < leader version {} after {}",
Expand All @@ -1338,7 +1337,7 @@ public void handleException(TransportException exp) {
logger.debug("LeaderCheck.handleException: {}", exp.getRootCause().getMessage());
} else {
logger.debug(() -> new ParameterizedMessage(
"LeaderCheck.handleException: received exception from [{}]", leader), exp);
"LeaderCheck.handleException: received exception from [{}]", leader), exp);
}
inFlight = false;
onCheckFailure(exp.getRootCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ void sendFromTo(DiscoveryNode sender, DiscoveryNode destination, Consumer<Legisl
} else {
if (clusterNode.localNode.getId().equals(destination.getId()) &&
clusterNode.localNode.equals(destination) == false) {
logger.debug("found node with same id but different instance: {} instead of {}", clusterNode.localNode,
logger.debug("found node with same id but different instance: {} instead of {}", clusterNode.localNode,
destination);
}
}
Expand All @@ -558,7 +558,6 @@ void sendFromTo(DiscoveryNode sender, DiscoveryNode destination, Consumer<Legisl
}
}


void sendPublishRequestFrom(DiscoveryNode sender, DiscoveryNode destination, PublishRequest publishRequest,
TransportResponseHandler<LegislatorPublishResponse> responseHandler) {
sendFromTo(sender, destination, e -> {
Expand Down Expand Up @@ -772,7 +771,7 @@ private DiscoveryNode createDiscoveryNode() {
}

private void sendMasterServiceTask(String reason, Function<ClusterState, ClusterState> runnable) {
futureExecutor.schedule(TimeValue.timeValueMillis(0L), () -> {
futureExecutor.schedule(TimeValue.timeValueMillis(0L), "sendMasterServiceTask:" + reason, () -> {
try {
if (legislator.getMode() == Legislator.Mode.LEADER) {
ClusterState newState = runnable.apply(legislator.getLastAcceptedState());
Expand Down Expand Up @@ -800,9 +799,11 @@ ClusterNode initialise(VotingConfiguration initialConfiguration) {

// TODO: have some tests that use the on-disk data for persistence across reboots.
void reboot() {
logger.trace("reboot: taking down [{}]", localNode);
tasks.removeIf(task -> task.scheduledFor(localNode));
inFlightMessages.removeIf(action -> action.hasDestination(localNode));
localNode = createDiscoveryNode();
logger.trace("reboot: starting up [{}]", localNode);
try {
BytesStreamOutput outStream = new BytesStreamOutput();
outStream.setVersion(Version.CURRENT);
Expand Down Expand Up @@ -837,14 +838,14 @@ public DiscoveryNode getLocalNode() {
private class FutureExecutor implements Legislator.FutureExecutor {

@Override
public void schedule(TimeValue delay, Runnable task) {
public void schedule(TimeValue delay, String description, Runnable task) {
assert delay.getMillis() >= 0;
final long actualDelay = delay.getMillis() + randomLongBetween(0L, delayVariability);
final long executionTimeMillis = currentTimeMillis + actualDelay;
logger.debug("[{}] schedule: requested delay [{}ms] after [{}ms], " +
logger.debug("[{}] scheduling [{}]: requested delay [{}ms] after [{}ms], " +
"scheduling with delay [{}ms] at [{}ms]",
localNode.getId(), delay.getMillis(), currentTimeMillis, actualDelay, executionTimeMillis);
tasks.add(new TaskWithExecutionTime(executionTimeMillis, task, localNode));
localNode.getId(), description, delay.getMillis(), currentTimeMillis, actualDelay, executionTimeMillis);
tasks.add(new TaskWithExecutionTime(executionTimeMillis, task, localNode, description));
}
}

Expand Down Expand Up @@ -925,11 +926,13 @@ private class TaskWithExecutionTime implements Runnable {
final long executionTimeMillis;
final Runnable task;
private final DiscoveryNode taskNode;
private final String description;

TaskWithExecutionTime(long executionTimeMillis, Runnable task, DiscoveryNode taskNode) {
TaskWithExecutionTime(long executionTimeMillis, Runnable task, DiscoveryNode taskNode, String description) {
this.executionTimeMillis = executionTimeMillis;
this.task = task;
this.taskNode = taskNode;
this.description = description;
}

public long getExecutionTimeMillis() {
Expand All @@ -938,7 +941,7 @@ public long getExecutionTimeMillis() {

@Override
public String toString() {
return "task on [" + taskNode + "] scheduled at time [" + executionTimeMillis + "ms]";
return "[" + description + "] on [" + taskNode + "] scheduled at time [" + executionTimeMillis + "ms]";
}

public boolean scheduledFor(DiscoveryNode discoveryNode) {
Expand Down

0 comments on commit 448697d

Please sign in to comment.