diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/FSMCaller.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/FSMCaller.java index 5f534c4f2..aa86cd0ee 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/FSMCaller.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/FSMCaller.java @@ -21,6 +21,7 @@ import com.alipay.sofa.jraft.entity.LeaderChangeContext; import com.alipay.sofa.jraft.error.RaftException; import com.alipay.sofa.jraft.option.FSMCallerOptions; +import com.alipay.sofa.jraft.util.Describer; /** * Finite state machine caller. @@ -29,7 +30,7 @@ * * 2018-Apr-03 11:07:52 AM */ -public interface FSMCaller extends Lifecycle { +public interface FSMCaller extends Lifecycle, Describer { /** * Listen on lastAppliedLogIndex update events. diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java index 8eda5b601..b0f3d0f01 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java @@ -29,6 +29,7 @@ import com.alipay.sofa.jraft.error.LogNotFoundException; import com.alipay.sofa.jraft.option.NodeOptions; import com.alipay.sofa.jraft.option.RaftOptions; +import com.alipay.sofa.jraft.util.Describer; /** * A raft replica node. @@ -37,7 +38,7 @@ * * 2018-Apr-03 4:06:55 PM */ -public interface Node extends Lifecycle { +public interface Node extends Lifecycle, Describer { /** * Get the leader peer id for redirect, null if absent. diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/NodeDescribeSignalHandler.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/NodeDescribeSignalHandler.java new file mode 100644 index 000000000..6185567df --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/NodeDescribeSignalHandler.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alipay.sofa.jraft.util.Describer; +import com.alipay.sofa.jraft.util.FileOutputSignalHandler; +import com.alipay.sofa.jraft.util.SystemPropertyUtil; + +/** + * + * @author jiachun.fjc + */ +public class NodeDescribeSignalHandler extends FileOutputSignalHandler { + + private static Logger LOG = LoggerFactory.getLogger(NodeDescribeSignalHandler.class); + + private static final String DIR = SystemPropertyUtil.get("jraft.signal.node.describe.dir", ""); + private static final String BASE_NAME = "node_describe.log"; + + @Override + public void handle(final String signalName) { + final List nodes = NodeManager.getInstance().getAllNodes(); + if (nodes.isEmpty()) { + return; + } + + try { + final File file = getOutputFile(DIR, BASE_NAME); + + LOG.info("Describing raft nodes with signal: {} to file: {}.", signalName, file); + + try (final PrintWriter out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(file, true), + StandardCharsets.UTF_8))) { + final Describer.Printer printer = new DefaultPrinter(out); + for (final Node node : nodes) { + node.describe(printer); + } + } + } catch (final IOException e) { + LOG.error("Fail to describe nodes: {}.", nodes, e); + } + } + + private static class DefaultPrinter implements Describer.Printer { + + private final PrintWriter out; + + private DefaultPrinter(PrintWriter out) { + this.out = out; + } + + @Override + public Describer.Printer print(final Object x) { + this.out.print(x); + return this; + } + + @Override + public Describer.Printer println(final Object x) { + this.out.println(x); + return this; + } + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/NodeManager.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/NodeManager.java index 5853d83f4..763eaa905 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/NodeManager.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/NodeManager.java @@ -53,7 +53,7 @@ public static NodeManager getInstance() { /** * Return true when RPC service is registered. */ - public boolean serverExists(Endpoint addr) { + public boolean serverExists(final Endpoint addr) { if (addr.getIp().equals(Utils.IP_ANY)) { return this.addrSet.contains(new Endpoint(Utils.IP_ANY, addr.getPort())); } @@ -63,28 +63,28 @@ public boolean serverExists(Endpoint addr) { /** * Remove a RPC service address. */ - public boolean removeAddress(Endpoint addr) { + public boolean removeAddress(final Endpoint addr) { return this.addrSet.remove(addr); } /** * Adds a RPC service address. */ - public void addAddress(Endpoint addr) { + public void addAddress(final Endpoint addr) { this.addrSet.add(addr); } /** * Adds a node. */ - public boolean add(Node node) { + public boolean add(final Node node) { // check address ok? - if (!this.serverExists(node.getNodeId().getPeerId().getEndpoint())) { + if (!serverExists(node.getNodeId().getPeerId().getEndpoint())) { return false; } - NodeId nodeId = node.getNodeId(); + final NodeId nodeId = node.getNodeId(); if (this.nodeMap.putIfAbsent(nodeId, node) == null) { - String groupId = node.getGroupId(); + final String groupId = node.getGroupId(); List nodes = this.groupMap.get(groupId); if (nodes == null) { nodes = Collections.synchronizedList(new ArrayList<>()); @@ -112,9 +112,9 @@ public void clear() { /** * Remove a node. */ - public boolean remove(Node node) { + public boolean remove(final Node node) { if (this.nodeMap.remove(node.getNodeId(), node)) { - List nodes = this.groupMap.get(node.getGroupId()); + final List nodes = this.groupMap.get(node.getGroupId()); if (nodes != null) { return nodes.remove(node); } @@ -125,14 +125,14 @@ public boolean remove(Node node) { /** * Get node by groupId and peer. */ - public Node get(String groupId, PeerId peerId) { + public Node get(final String groupId, final PeerId peerId) { return this.nodeMap.get(new NodeId(groupId, peerId)); } /** * Get all nodes in a raft group. */ - public List getNodesByGroupId(String groupId) { + public List getNodesByGroupId(final String groupId) { return this.groupMap.get(groupId); } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/NodeMetricsSignalHandler.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/NodeMetricsSignalHandler.java new file mode 100644 index 000000000..2d128cb19 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/NodeMetricsSignalHandler.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alipay.sofa.jraft.core.NodeMetrics; +import com.alipay.sofa.jraft.util.FileOutputSignalHandler; +import com.alipay.sofa.jraft.util.MetricReporter; +import com.alipay.sofa.jraft.util.SystemPropertyUtil; +import com.codahale.metrics.MetricRegistry; + +/** + * + * @author jiachun.fjc + */ +public class NodeMetricsSignalHandler extends FileOutputSignalHandler { + + private static Logger LOG = LoggerFactory.getLogger(NodeMetricsSignalHandler.class); + + private static final String DIR = SystemPropertyUtil.get("jraft.signal.node.metrics.dir", ""); + private static final String BASE_NAME = "node_metrics.log"; + + @Override + public void handle(final String signalName) { + final List nodes = NodeManager.getInstance().getAllNodes(); + if (nodes.isEmpty()) { + return; + } + + try { + final File file = getOutputFile(DIR, BASE_NAME); + + LOG.info("Printing raft nodes metrics with signal: {} to file: {}.", signalName, file); + + try (final PrintStream out = new PrintStream(new FileOutputStream(file, true))) { + for (final Node node : nodes) { + final NodeMetrics nodeMetrics = node.getNodeMetrics(); + final MetricRegistry registry = nodeMetrics.getMetricRegistry(); + if (registry == null) { + LOG.warn("Node: {} received a signal to print metric, but it does not have metric enabled.", + node); + continue; + } + final MetricReporter reporter = MetricReporter.forRegistry(registry) // + .outputTo(out) // + .prefixedWith("-- " + node.getNodeId()) // + .build(); + reporter.report(); + } + } + } catch (final IOException e) { + LOG.error("Fail to print nodes metrics: {}.", nodes, e); + } + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/ReplicatorGroup.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/ReplicatorGroup.java index 1b60dbb0e..4f5c61a18 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/ReplicatorGroup.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/ReplicatorGroup.java @@ -25,6 +25,7 @@ import com.alipay.sofa.jraft.option.ReplicatorGroupOptions; import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesResponse; import com.alipay.sofa.jraft.rpc.RpcResponseClosure; +import com.alipay.sofa.jraft.util.Describer; import com.alipay.sofa.jraft.util.ThreadId; /** @@ -34,7 +35,7 @@ * * 2018-Apr-08 5:35:26 PM */ -public interface ReplicatorGroup { +public interface ReplicatorGroup extends Describer { /** * Init the replicator group. * diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/BallotBox.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/BallotBox.java index 9c56b9295..261b73c98 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/BallotBox.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/BallotBox.java @@ -32,6 +32,7 @@ import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.option.BallotBoxOptions; import com.alipay.sofa.jraft.util.ArrayDeque; +import com.alipay.sofa.jraft.util.Describer; import com.alipay.sofa.jraft.util.OnlyForTest; import com.alipay.sofa.jraft.util.Requires; @@ -42,7 +43,7 @@ * 2018-Apr-04 2:32:10 PM */ @ThreadSafe -public class BallotBox implements Lifecycle { +public class BallotBox implements Lifecycle, Describer { private static final Logger LOG = LoggerFactory.getLogger(BallotBox.class); @@ -64,23 +65,23 @@ ArrayDeque getPendingMetaQueue() { } public long getLastCommittedIndex() { - long stamp = stampedLock.tryOptimisticRead(); - long optimisticVal = this.lastCommittedIndex; - if (stampedLock.validate(stamp)) { + long stamp = this.stampedLock.tryOptimisticRead(); + final long optimisticVal = this.lastCommittedIndex; + if (this.stampedLock.validate(stamp)) { return optimisticVal; } - stamp = stampedLock.readLock(); + stamp = this.stampedLock.readLock(); try { return this.lastCommittedIndex; } finally { - stampedLock.unlockRead(stamp); + this.stampedLock.unlockRead(stamp); } } @Override - public boolean init(BallotBoxOptions opts) { + public boolean init(final BallotBoxOptions opts) { if (opts.getWaiter() == null || opts.getClosureQueue() == null) { - LOG.error("waiter or closure queue is null"); + LOG.error("waiter or closure queue is null."); return false; } this.waiter = opts.getWaiter(); @@ -92,26 +93,26 @@ public boolean init(BallotBoxOptions opts) { * Called by leader, otherwise the behavior is undefined * Set logs in [first_log_index, last_log_index] are stable at |peer|. */ - public boolean commitAt(long firstLogIndex, long lastLogIndex, PeerId peer) { - //TODO use lock-free algorithm here? + public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) { + // TODO use lock-free algorithm here? final long stamp = stampedLock.writeLock(); long lastCommittedIndex = 0; try { - if (pendingIndex == 0) { + if (this.pendingIndex == 0) { return false; } - if (lastLogIndex < pendingIndex) { + if (lastLogIndex < this.pendingIndex) { return true; } - if (lastLogIndex >= pendingIndex + pendingMetaQueue.size()) { + if (lastLogIndex >= this.pendingIndex + this.pendingMetaQueue.size()) { throw new ArrayIndexOutOfBoundsException(); } - final long startAt = Math.max(pendingIndex, firstLogIndex); + final long startAt = Math.max(this.pendingIndex, firstLogIndex); Ballot.PosHint hint = new Ballot.PosHint(); for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) { - final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - pendingIndex)); + final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex)); hint = bl.grant(peer, hint); if (bl.isGranted()) { lastCommittedIndex = logIndex; @@ -126,12 +127,12 @@ public boolean commitAt(long firstLogIndex, long lastLogIndex, PeerId peer) { // logs, since we use the new configuration to deal the quorum of the // removal request, we think it's safe to commit all the uncommitted // previous logs, which is not well proved right now - pendingMetaQueue.removeRange(0, (int) (lastCommittedIndex - pendingIndex) + 1); - LOG.debug("Committed log fromIndex={}, toIndex={}.", pendingIndex, lastCommittedIndex); - pendingIndex = lastCommittedIndex + 1; + this.pendingMetaQueue.removeRange(0, (int) (lastCommittedIndex - this.pendingIndex) + 1); + LOG.debug("Committed log fromIndex={}, toIndex={}.", this.pendingIndex, lastCommittedIndex); + this.pendingIndex = lastCommittedIndex + 1; this.lastCommittedIndex = lastCommittedIndex; } finally { - stampedLock.unlockWrite(stamp); + this.stampedLock.unlockWrite(stamp); } this.waiter.onCommitted(lastCommittedIndex); return true; @@ -144,13 +145,13 @@ public boolean commitAt(long firstLogIndex, long lastLogIndex, PeerId peer) { * truncate. */ public void clearPendingTasks() { - final long stamp = stampedLock.writeLock(); + final long stamp = this.stampedLock.writeLock(); try { this.pendingMetaQueue.clear(); this.pendingIndex = 0; this.closureQueue.clear(); } finally { - stampedLock.unlockWrite(stamp); + this.stampedLock.unlockWrite(stamp); } } @@ -163,24 +164,24 @@ public void clearPendingTasks() { * @param newPendingIndex pending index of new leader * @return returns true if reset success */ - public boolean resetPendingIndex(long newPendingIndex) { - final long stamp = stampedLock.writeLock(); + public boolean resetPendingIndex(final long newPendingIndex) { + final long stamp = this.stampedLock.writeLock(); try { - if (!(pendingIndex == 0 && pendingMetaQueue.isEmpty())) { - LOG.error("resetPendingIndex fail, pendingIndex={}, pendingMetaQueueSize={}", pendingIndex, - pendingMetaQueue.size()); + if (!(this.pendingIndex == 0 && this.pendingMetaQueue.isEmpty())) { + LOG.error("resetPendingIndex fail, pendingIndex={}, pendingMetaQueueSize={}.", this.pendingIndex, + this.pendingMetaQueue.size()); return false; } if (newPendingIndex <= this.lastCommittedIndex) { - LOG.error("resetPendingIndex fail, newPendingIndex={}, lastCommittedIndex={}", newPendingIndex, - lastCommittedIndex); + LOG.error("resetPendingIndex fail, newPendingIndex={}, lastCommittedIndex={}.", newPendingIndex, + this.lastCommittedIndex); return false; } this.pendingIndex = newPendingIndex; this.closureQueue.resetFirstIndex(newPendingIndex); return true; } finally { - stampedLock.unlockWrite(stamp); + this.stampedLock.unlockWrite(stamp); } } @@ -193,23 +194,23 @@ public boolean resetPendingIndex(long newPendingIndex) { * @param done callback * @return returns true on success */ - public boolean appendPendingTask(Configuration conf, Configuration oldConf, Closure done) { + public boolean appendPendingTask(final Configuration conf, final Configuration oldConf, final Closure done) { final Ballot bl = new Ballot(); if (!bl.init(conf, oldConf)) { - LOG.error("Fail to init ballot"); + LOG.error("Fail to init ballot."); return false; } - final long stamp = stampedLock.writeLock(); + final long stamp = this.stampedLock.writeLock(); try { - if (pendingIndex <= 0) { - LOG.error("Fail to appendingTask, pendingIndex={}", pendingIndex); + if (this.pendingIndex <= 0) { + LOG.error("Fail to appendingTask, pendingIndex={}.", this.pendingIndex); return false; } this.pendingMetaQueue.add(bl); this.closureQueue.appendPendingClosure(done); return true; } finally { - stampedLock.unlockWrite(stamp); + this.stampedLock.unlockWrite(stamp); } } @@ -219,13 +220,13 @@ public boolean appendPendingTask(Configuration conf, Configuration oldConf, Clos * @param lastCommittedIndex last committed index * @return returns true if set success */ - public boolean setLastCommittedIndex(long lastCommittedIndex) { + public boolean setLastCommittedIndex(final long lastCommittedIndex) { boolean doUnlock = true; - final long stamp = stampedLock.writeLock(); + final long stamp = this.stampedLock.writeLock(); try { - if (pendingIndex != 0 || !pendingMetaQueue.isEmpty()) { + if (this.pendingIndex != 0 || !this.pendingMetaQueue.isEmpty()) { Requires.requireTrue(lastCommittedIndex < this.pendingIndex, - "Node changes to leader, pendingIndex=%d, param lastCommittedIndex=%d", pendingIndex, + "Node changes to leader, pendingIndex=%d, param lastCommittedIndex=%d", this.pendingIndex, lastCommittedIndex); return false; } @@ -234,13 +235,13 @@ public boolean setLastCommittedIndex(long lastCommittedIndex) { } if (lastCommittedIndex > this.lastCommittedIndex) { this.lastCommittedIndex = lastCommittedIndex; - stampedLock.unlockWrite(stamp); + this.stampedLock.unlockWrite(stamp); doUnlock = false; this.waiter.onCommitted(lastCommittedIndex); } } finally { if (doUnlock) { - stampedLock.unlockWrite(stamp); + this.stampedLock.unlockWrite(stamp); } } return true; @@ -248,6 +249,34 @@ public boolean setLastCommittedIndex(long lastCommittedIndex) { @Override public void shutdown() { - this.clearPendingTasks(); + clearPendingTasks(); + } + + @Override + public void describe(final Printer out) { + long _lastCommittedIndex; + long _pendingIndex; + long _pendingMetaQueueSize; + long stamp = this.stampedLock.tryOptimisticRead(); + if (this.stampedLock.validate(stamp)) { + _lastCommittedIndex = this.lastCommittedIndex; + _pendingIndex = this.pendingIndex; + _pendingMetaQueueSize = this.pendingMetaQueue.size(); + } else { + stamp = this.stampedLock.readLock(); + try { + _lastCommittedIndex = this.lastCommittedIndex; + _pendingIndex = this.pendingIndex; + _pendingMetaQueueSize = this.pendingMetaQueue.size(); + } finally { + this.stampedLock.unlockRead(stamp); + } + } + out.print(" lastCommittedIndex: ") // + .println(_lastCommittedIndex); + out.print(" pendingIndex: ") // + .println(_pendingIndex); + out.print(" pendingMetaQueueSize: ") // + .println(_pendingMetaQueueSize); } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java index 19e679ac2..3e683289a 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java @@ -705,4 +705,10 @@ private boolean passByStatus(final Closure done) { } return true; } + + @Override + public void describe(final Printer out) { + out.print(" ") // + .println(toString()); + } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index c5bdc9108..1e338be31 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -97,11 +97,15 @@ import com.alipay.sofa.jraft.storage.impl.LogManagerImpl; import com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl; import com.alipay.sofa.jraft.util.DisruptorBuilder; +import com.alipay.sofa.jraft.util.JRaftServiceLoader; +import com.alipay.sofa.jraft.util.JRaftSignalHandler; import com.alipay.sofa.jraft.util.LogExceptionHandler; import com.alipay.sofa.jraft.util.NamedThreadFactory; import com.alipay.sofa.jraft.util.OnlyForTest; +import com.alipay.sofa.jraft.util.Platform; import com.alipay.sofa.jraft.util.RepeatedTimer; import com.alipay.sofa.jraft.util.Requires; +import com.alipay.sofa.jraft.util.SignalHelper; import com.alipay.sofa.jraft.util.ThreadHelper; import com.alipay.sofa.jraft.util.ThreadId; import com.alipay.sofa.jraft.util.Utils; @@ -123,11 +127,26 @@ */ public class NodeImpl implements Node, RaftServerService { + private static final Logger LOG = LoggerFactory.getLogger(NodeImpl.class); + + static { + try { + if (SignalHelper.supportSignal()) { + // TODO support windows signal + if (!Platform.isWindows()) { + final List handlers = JRaftServiceLoader.load(JRaftSignalHandler.class) // + .sort(); + SignalHelper.addSignal(SignalHelper.SIG_USR2, handlers); + } + } + } catch (final Throwable t) { + LOG.error("Fail to add signal.", t); + } + } + // Max retry times when applying tasks. private static final int MAX_APPLY_RETRY_TIMES = 3; - private static final Logger LOG = LoggerFactory.getLogger(NodeImpl.class); - public static final AtomicInteger GLOBAL_NUM_NODES = new AtomicInteger(0); /** Internal states */ @@ -2838,6 +2857,65 @@ public UserLog readCommittedUserLog(final long index) { throw new LogNotFoundException("User log is deleted at index: " + curIndex); } + @Override + public void describe(final Printer out) { + // node + final String _nodeId; + final String _state; + final long _currTerm; + final String _conf; + this.readLock.lock(); + try { + _nodeId = String.valueOf(getNodeId()); + _state = String.valueOf(this.state); + _currTerm = this.currTerm; + _conf = String.valueOf(this.conf); + } finally { + this.readLock.unlock(); + } + out.print("nodeId: ") // + .println(_nodeId); + out.print("state: ") // + .println(_state); + out.print("term: ") // + .println(_currTerm); + out.print("conf: ") // + .println(_conf); + + // timers + out.println("electionTimer: "); + this.electionTimer.describe(out); + + out.println("voteTimer: "); + this.voteTimer.describe(out); + + out.println("stepDownTimer: "); + this.stepDownTimer.describe(out); + + out.println("snapshotTimer: "); + this.snapshotTimer.describe(out); + + // logManager + out.println("logManager: "); + this.logManager.describe(out); + + // fsmCaller + out.println("fsmCaller: "); + this.fsmCaller.describe(out); + + // ballotBox + out.println("ballotBox: "); + this.ballotBox.describe(out); + + // snapshotExecutor + out.println("snapshotExecutor: "); + this.snapshotExecutor.describe(out); + + // replicators + out.println("replicatorGroup: "); + this.replicatorGroup.describe(out); + } + @Override public String toString() { return "JRaftNode [nodeId=" + getNodeId() + "]"; diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java index 34810ae9f..54acea015 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java @@ -275,4 +275,12 @@ public PeerId findTheNextCandidate(final ConfigurationEntry conf) { public List listReplicators() { return new ArrayList<>(this.replicatorMap.values()); } + + @Override + public void describe(final Printer out) { + out.print(" replicators: ") // + .println(this.replicatorMap.values()); + out.print(" failureReplicators: ") // + .println(this.failureReplicators); + } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/LogManager.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/LogManager.java index 0c1e30a1a..3cbb4fed1 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/LogManager.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/LogManager.java @@ -26,6 +26,7 @@ import com.alipay.sofa.jraft.entity.LogId; import com.alipay.sofa.jraft.entity.RaftOutter.SnapshotMeta; import com.alipay.sofa.jraft.option.LogManagerOptions; +import com.alipay.sofa.jraft.util.Describer; /** * Log manager. @@ -34,7 +35,7 @@ * * 2018-Apr-04 3:02:42 PM */ -public interface LogManager extends Lifecycle { +public interface LogManager extends Lifecycle, Describer { /** * Closure to to run in stable state. diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/SnapshotExecutor.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/SnapshotExecutor.java index 008f5c6c0..0cb502635 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/SnapshotExecutor.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/SnapshotExecutor.java @@ -23,6 +23,7 @@ import com.alipay.sofa.jraft.rpc.RpcRequestClosure; import com.alipay.sofa.jraft.rpc.RpcRequests.InstallSnapshotRequest; import com.alipay.sofa.jraft.rpc.RpcRequests.InstallSnapshotResponse; +import com.alipay.sofa.jraft.util.Describer; /** * Executing Snapshot related stuff. @@ -31,7 +32,7 @@ * * 2018-Mar-22 2:27:02 PM */ -public interface SnapshotExecutor extends Lifecycle { +public interface SnapshotExecutor extends Lifecycle, Describer { /** * Return the owner NodeImpl @@ -44,7 +45,7 @@ public interface SnapshotExecutor extends Lifecycle { * * @param done snapshot callback */ - void doSnapshot(Closure done); + void doSnapshot(final Closure done); /** * Install snapshot according to the very RPC from leader @@ -58,8 +59,8 @@ public interface SnapshotExecutor extends Lifecycle { * a new RPC with the same or newer snapshot arrives * - Busy: the state machine is saving or loading snapshot */ - void installSnapshot(InstallSnapshotRequest request, InstallSnapshotResponse.Builder response, - RpcRequestClosure done); + void installSnapshot(final InstallSnapshotRequest request, final InstallSnapshotResponse.Builder response, + final RpcRequestClosure done); /** * Interrupt the downloading if possible. @@ -77,7 +78,7 @@ void installSnapshot(InstallSnapshotRequest request, InstallSnapshotResponse.Bui * * @param newTerm new term num */ - void interruptDownloadingSnapshots(long newTerm); + void interruptDownloadingSnapshots(final long newTerm); /** * Returns true if this is currently installing a snapshot, either diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/LogManagerImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/LogManagerImpl.java index 552019ba5..c879f90e9 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/LogManagerImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/LogManagerImpl.java @@ -1133,4 +1133,34 @@ public Status checkConsistency() { this.readLock.unlock(); } } + + @Override + public void describe(final Printer out) { + final long _firstLogIndex; + final long _lastLogIndex; + final String _diskId; + final String _appliedId; + final String _lastSnapshotId; + this.readLock.lock(); + try { + _firstLogIndex = this.firstLogIndex; + _lastLogIndex = this.lastLogIndex; + _diskId = String.valueOf(this.diskId); + _appliedId = String.valueOf(this.appliedId); + _lastSnapshotId = String.valueOf(this.lastSnapshotId); + } finally { + this.readLock.unlock(); + } + out.print(" storage: [") // + .print(_firstLogIndex) // + .print(", ") // + .print(_lastLogIndex) // + .println(']'); + out.print(" diskId: ") // + .println(_diskId); + out.print(" appliedId: ") // + .println(_appliedId); + out.print(" lastSnapshotId: ") // + .println(_lastSnapshotId); + } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/SnapshotExecutorImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/SnapshotExecutorImpl.java index 9eea919df..74f9eb785 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/SnapshotExecutorImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/SnapshotExecutorImpl.java @@ -677,4 +677,36 @@ public void join() throws InterruptedException { this.runningJobs.await(); } + @Override + public void describe(final Printer out) { + final long _lastSnapshotTerm; + final long _lastSnapshotIndex; + final long _term; + final boolean _savingSnapshot; + final boolean _loadingSnapshot; + final boolean _stopped; + this.lock.lock(); + try { + _lastSnapshotTerm = this.lastSnapshotTerm; + _lastSnapshotIndex = this.lastSnapshotIndex; + _term = this.term; + _savingSnapshot = this.savingSnapshot; + _loadingSnapshot = this.loadingSnapshot; + _stopped = this.stopped; + } finally { + this.lock.unlock(); + } + out.print(" lastSnapshotTerm: ") // + .println(_lastSnapshotTerm); + out.print(" lastSnapshotIndex: ") // + .println(_lastSnapshotIndex); + out.print(" term: ") // + .println(_term); + out.print(" savingSnapshot: ") // + .println(_savingSnapshot); + out.print(" loadingSnapshot: ") // + .println(_loadingSnapshot); + out.print(" stopped: ") // + .println(_stopped); + } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Describer.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Describer.java new file mode 100644 index 000000000..5d2d028a1 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Describer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util; + +/** + * Components that implement this interface need to be able to describe + * their own state and output state information via the {@code describe} method. + * + * @author jiachun.fjc + */ +public interface Describer { + + void describe(final Printer out); + + interface Printer { + + /** + * Prints an object. + * @param x The Object to be printed + * @return this printer + */ + Printer print(final Object x); + + /** + * Prints an Object and then terminates the line. + * + * @param x The Object to be printed. + * @return this printer + */ + Printer println(final Object x); + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/FileOutputSignalHandler.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/FileOutputSignalHandler.java new file mode 100644 index 000000000..98180a08e --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/FileOutputSignalHandler.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.apache.commons.io.FileUtils; + +/** + * + * @author jiachun.fjc + */ +public abstract class FileOutputSignalHandler implements JRaftSignalHandler { + + protected File getOutputFile(final String path, final String baseFileName) throws IOException { + makeDir(path); + final String now = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss").format(new Date()); + final String fileName = baseFileName + "." + now; + final File file = Paths.get(path, fileName).toFile(); + if (!file.exists() && !file.createNewFile()) { + throw new IOException("Fail to create file: " + file); + } + return file; + } + + private static void makeDir(final String path) throws IOException { + final File dir = Paths.get(path).toFile().getAbsoluteFile(); + if (dir.exists()) { + Requires.requireTrue(dir.isDirectory(), String.format("[%s] is not directory.", path)); + } else { + FileUtils.forceMkdir(dir); + } + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/JRaftSignalHandler.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/JRaftSignalHandler.java new file mode 100644 index 000000000..fa46cef08 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/JRaftSignalHandler.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util; + +/** + * + * @author jiachun.fjc + */ +public interface JRaftSignalHandler { + + void handle(final String signalName); +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/MetricReporter.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/MetricReporter.java new file mode 100644 index 000000000..39a24adf1 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/MetricReporter.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util; + +import java.io.PrintStream; +import java.text.DateFormat; +import java.util.Collections; +import java.util.Date; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +import com.codahale.metrics.Clock; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricAttribute; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; + +/** + * A reporter which outputs measurements to a {@link PrintStream}, like {@code System.out}. + * + * Fork form {@link com.codahale.metrics.ConsoleReporter} + */ +public class MetricReporter { + + /** + * Returns a new {@link Builder} for {@link MetricReporter}. + * + * @param registry the registry to report + * @return a {@link Builder} instance for a {@link MetricReporter} + */ + public static Builder forRegistry(final MetricRegistry registry) { + return new Builder(registry); + } + + /** + * Report the current values of all metrics in the registry. + */ + public void report() { + synchronized (this) { + report(this.registry.getGauges(this.filter), // + this.registry.getCounters(this.filter), // + this.registry.getHistograms(this.filter), // + this.registry.getMeters(this.filter), // + this.registry.getTimers(this.filter)); + } + } + + /** + * A builder for {@link MetricReporter} instances. Defaults to using the default locale and + * time zone, writing to {@code System.out}, converting rates to events/second, converting + * durations to milliseconds, and not filtering metrics. + */ + public static class Builder { + + private final MetricRegistry registry; + + private String prefix; + private PrintStream output; + private Locale locale; + private Clock clock; + private TimeZone timeZone; + private TimeUnit rateUnit; + private TimeUnit durationUnit; + private MetricFilter filter; + private Set disabledMetricAttributes; + + private Builder(MetricRegistry registry) { + this.registry = registry; + this.prefix = ""; + this.output = System.out; + this.locale = Locale.getDefault(); + this.clock = Clock.defaultClock(); + this.timeZone = TimeZone.getDefault(); + this.rateUnit = TimeUnit.SECONDS; + this.durationUnit = TimeUnit.MILLISECONDS; + this.filter = MetricFilter.ALL; + this.disabledMetricAttributes = Collections.emptySet(); + } + + /** + * Prefix all metric names with the given string. + * + * @param prefix the prefix for all banner names + * @return {@code this} + */ + public Builder prefixedWith(final String prefix) { + this.prefix = prefix; + return this; + } + + /** + * Write to the given {@link PrintStream}. + * + * @param output a {@link PrintStream} instance. + * @return {@code this} + */ + public Builder outputTo(final PrintStream output) { + this.output = output; + return this; + } + + /** + * Format numbers for the given {@link Locale}. + * + * @param locale a {@link Locale} + * @return {@code this} + */ + public Builder formattedFor(final Locale locale) { + this.locale = locale; + return this; + } + + /** + * Use the given {@link Clock} instance for the time. + * + * @param clock a {@link Clock} instance + * @return {@code this} + */ + public Builder withClock(final Clock clock) { + this.clock = clock; + return this; + } + + /** + * Use the given {@link TimeZone} for the time. + * + * @param timeZone a {@link TimeZone} + * @return {@code this} + */ + public Builder formattedFor(final TimeZone timeZone) { + this.timeZone = timeZone; + return this; + } + + /** + * Convert rates to the given time unit. + * + * @param rateUnit a unit of time + * @return {@code this} + */ + public Builder convertRatesTo(final TimeUnit rateUnit) { + this.rateUnit = rateUnit; + return this; + } + + /** + * Convert durations to the given time unit. + * + * @param durationUnit a unit of time + * @return {@code this} + */ + public Builder convertDurationsTo(final TimeUnit durationUnit) { + this.durationUnit = durationUnit; + return this; + } + + /** + * Only report metrics which match the given filter. + * + * @param filter a {@link MetricFilter} + * @return {@code this} + */ + public Builder filter(final MetricFilter filter) { + this.filter = filter; + return this; + } + + /** + * Don't report the passed metric attributes for all metrics (e.g. "p999", "stddev" or "m15"). + * See {@link MetricAttribute}. + * + * @param disabledMetricAttributes a {@link MetricFilter} + * @return {@code this} + */ + public Builder disabledMetricAttributes(final Set disabledMetricAttributes) { + this.disabledMetricAttributes = disabledMetricAttributes; + return this; + } + + /** + * Builds a {@link MetricReporter} with the given properties. + * + * @return a {@link MetricReporter} + */ + public MetricReporter build() { + return new MetricReporter(this.registry, // + this.output, // + this.prefix, // + this.locale, // + this.clock, // + this.timeZone, // + this.rateUnit, // + this.durationUnit, // + this.filter, // + this.disabledMetricAttributes); + } + } + + private static final int CONSOLE_WIDTH = 80; + + private final MetricRegistry registry; + private final Set disabledMetricAttributes; + private final MetricFilter filter; + private final long durationFactor; + private final String durationUnit; + private final long rateFactor; + private final String rateUnit; + private final String prefix; + private final PrintStream output; + private final Locale locale; + private final Clock clock; + private final DateFormat dateFormat; + + private MetricReporter(MetricRegistry registry, // + PrintStream output, // + String prefix, // + Locale locale, // + Clock clock, // + TimeZone timeZone, // + TimeUnit rateUnit, // + TimeUnit durationUnit, // + MetricFilter filter, // + Set disabledMetricAttributes) { + this.registry = registry; + this.output = output; + this.prefix = prefix; + this.locale = locale; + this.clock = clock; + this.dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM, locale); + this.dateFormat.setTimeZone(timeZone); + this.rateFactor = rateUnit.toSeconds(1); + this.rateUnit = calculateRateUnit(rateUnit); + this.durationFactor = durationUnit.toNanos(1); + this.durationUnit = durationUnit.toString().toLowerCase(Locale.US); + this.filter = filter; + this.disabledMetricAttributes = disabledMetricAttributes != null ? disabledMetricAttributes : Collections + .emptySet(); + } + + public void report(final SortedMap gauges, final SortedMap counters, + final SortedMap histograms, final SortedMap meters, + final SortedMap timers) { + final String dateTime = this.dateFormat.format(new Date(this.clock.getTime())); + printWithBanner(dateTime, '='); + this.output.println(); + + if (!gauges.isEmpty()) { + printWithBanner("-- Gauges", '-'); + for (final Map.Entry entry : gauges.entrySet()) { + this.output.println(entry.getKey()); + printGauge(entry.getValue()); + } + this.output.println(); + } + + if (!counters.isEmpty()) { + printWithBanner("-- Counters", '-'); + for (final Map.Entry entry : counters.entrySet()) { + this.output.println(entry.getKey()); + printCounter(entry); + } + this.output.println(); + } + + if (!histograms.isEmpty()) { + printWithBanner("-- Histograms", '-'); + for (final Map.Entry entry : histograms.entrySet()) { + this.output.println(entry.getKey()); + printHistogram(entry.getValue()); + } + this.output.println(); + } + + if (!meters.isEmpty()) { + printWithBanner("-- Meters", '-'); + for (final Map.Entry entry : meters.entrySet()) { + this.output.println(entry.getKey()); + printMeter(entry.getValue()); + } + this.output.println(); + } + + if (!timers.isEmpty()) { + printWithBanner("-- Timers", '-'); + for (Map.Entry entry : timers.entrySet()) { + this.output.println(entry.getKey()); + printTimer(entry.getValue()); + } + this.output.println(); + } + + this.output.println(); + this.output.flush(); + } + + private void printMeter(final Meter meter) { + printIfEnabled(MetricAttribute.COUNT, String.format(this.locale, " count = %d", meter.getCount())); + printIfEnabled(MetricAttribute.MEAN_RATE, String.format(this.locale, " mean rate = %2.2f events/%s", + convertRate(meter.getMeanRate()), this.rateUnit)); + printIfEnabled(MetricAttribute.M1_RATE, String.format(this.locale, " 1-minute rate = %2.2f events/%s", + convertRate(meter.getOneMinuteRate()), this.rateUnit)); + printIfEnabled(MetricAttribute.M5_RATE, String.format(this.locale, " 5-minute rate = %2.2f events/%s", + convertRate(meter.getFiveMinuteRate()), this.rateUnit)); + printIfEnabled(MetricAttribute.M15_RATE, String.format(this.locale, " 15-minute rate = %2.2f events/%s", + convertRate(meter.getFifteenMinuteRate()), this.rateUnit)); + } + + private void printCounter(final Map.Entry entry) { + this.output.printf(this.locale, " count = %d%n", entry.getValue().getCount()); + } + + private void printGauge(final Gauge gauge) { + this.output.printf(this.locale, " value = %s%n", gauge.getValue()); + } + + private void printHistogram(final Histogram histogram) { + printIfEnabled(MetricAttribute.COUNT, + String.format(this.locale, " count = %d", histogram.getCount())); + final Snapshot snapshot = histogram.getSnapshot(); + printIfEnabled(MetricAttribute.MIN, String.format(this.locale, " min = %d", snapshot.getMin())); + printIfEnabled(MetricAttribute.MAX, String.format(this.locale, " max = %d", snapshot.getMax())); + printIfEnabled(MetricAttribute.MEAN, + String.format(this.locale, " mean = %2.2f", snapshot.getMean())); + printIfEnabled(MetricAttribute.STDDEV, + String.format(this.locale, " stddev = %2.2f", snapshot.getStdDev())); + printIfEnabled(MetricAttribute.P50, + String.format(this.locale, " median = %2.2f", snapshot.getMedian())); + printIfEnabled(MetricAttribute.P75, + String.format(this.locale, " 75%% <= %2.2f", snapshot.get75thPercentile())); + printIfEnabled(MetricAttribute.P95, + String.format(this.locale, " 95%% <= %2.2f", snapshot.get95thPercentile())); + printIfEnabled(MetricAttribute.P98, + String.format(this.locale, " 98%% <= %2.2f", snapshot.get98thPercentile())); + printIfEnabled(MetricAttribute.P99, + String.format(this.locale, " 99%% <= %2.2f", snapshot.get99thPercentile())); + printIfEnabled(MetricAttribute.P999, + String.format(this.locale, " 99.9%% <= %2.2f", snapshot.get999thPercentile())); + } + + private void printTimer(final Timer timer) { + final Snapshot snapshot = timer.getSnapshot(); + printIfEnabled(MetricAttribute.COUNT, String.format(this.locale, " count = %d", timer.getCount())); + printIfEnabled(MetricAttribute.MEAN_RATE, String.format(this.locale, " mean rate = %2.2f calls/%s", + convertRate(timer.getMeanRate()), this.rateUnit)); + printIfEnabled(MetricAttribute.M1_RATE, String.format(this.locale, " 1-minute rate = %2.2f calls/%s", + convertRate(timer.getOneMinuteRate()), this.rateUnit)); + printIfEnabled(MetricAttribute.M5_RATE, String.format(this.locale, " 5-minute rate = %2.2f calls/%s", + convertRate(timer.getFiveMinuteRate()), this.rateUnit)); + printIfEnabled(MetricAttribute.M15_RATE, String.format(this.locale, " 15-minute rate = %2.2f calls/%s", + convertRate(timer.getFifteenMinuteRate()), this.rateUnit)); + + printIfEnabled(MetricAttribute.MIN, String.format(this.locale, " min = %2.2f %s", + convertDuration(snapshot.getMin()), this.durationUnit)); + printIfEnabled(MetricAttribute.MAX, String.format(this.locale, " max = %2.2f %s", + convertDuration(snapshot.getMax()), this.durationUnit)); + printIfEnabled(MetricAttribute.MEAN, String.format(this.locale, " mean = %2.2f %s", + convertDuration(snapshot.getMean()), this.durationUnit)); + printIfEnabled(MetricAttribute.STDDEV, String.format(this.locale, " stddev = %2.2f %s", + convertDuration(snapshot.getStdDev()), this.durationUnit)); + printIfEnabled(MetricAttribute.P50, String.format(this.locale, " median = %2.2f %s", + convertDuration(snapshot.getMedian()), this.durationUnit)); + printIfEnabled(MetricAttribute.P75, String.format(this.locale, " 75%% <= %2.2f %s", + convertDuration(snapshot.get75thPercentile()), this.durationUnit)); + printIfEnabled(MetricAttribute.P95, String.format(this.locale, " 95%% <= %2.2f %s", + convertDuration(snapshot.get95thPercentile()), this.durationUnit)); + printIfEnabled(MetricAttribute.P98, String.format(this.locale, " 98%% <= %2.2f %s", + convertDuration(snapshot.get98thPercentile()), this.durationUnit)); + printIfEnabled(MetricAttribute.P99, String.format(this.locale, " 99%% <= %2.2f %s", + convertDuration(snapshot.get99thPercentile()), this.durationUnit)); + printIfEnabled(MetricAttribute.P999, String.format(this.locale, " 99.9%% <= %2.2f %s", + convertDuration(snapshot.get999thPercentile()), this.durationUnit)); + } + + private void printWithBanner(final String s, final char c) { + if (!this.prefix.isEmpty()) { + this.output.print(this.prefix); + this.output.print(' '); + } + this.output.print(s); + this.output.print(' '); + for (int i = 0; i < (CONSOLE_WIDTH - s.length() - 1); i++) { + this.output.print(c); + } + this.output.println(); + } + + /** + * Print only if the attribute is enabled + * + * @param type Metric attribute + * @param status Status to be logged + */ + private void printIfEnabled(final MetricAttribute type, final String status) { + if (this.disabledMetricAttributes.contains(type)) { + return; + } + + this.output.println(status); + } + + private String calculateRateUnit(final TimeUnit unit) { + final String s = unit.toString().toLowerCase(Locale.US); + return s.substring(0, s.length() - 1); + } + + private double convertRate(final double rate) { + return rate * this.rateFactor; + } + + private double convertDuration(final double duration) { + return duration / this.durationFactor; + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/RecyclableByteBufferList.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/RecyclableByteBufferList.java index ec82aa926..928dffd8c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/RecyclableByteBufferList.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/RecyclableByteBufferList.java @@ -118,7 +118,7 @@ private RecyclableByteBufferList(final Recyclers.Handle handle) { this(handle, DEFAULT_INITIAL_CAPACITY); } - private RecyclableByteBufferList(final Recyclers.Handle handle, int initialCapacity) { + private RecyclableByteBufferList(final Recyclers.Handle handle, final int initialCapacity) { super(initialCapacity); this.handle = handle; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/RepeatedTimer.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/RepeatedTimer.java index aa1bb0126..2458e8d31 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/RepeatedTimer.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/RepeatedTimer.java @@ -31,7 +31,7 @@ * * 2018-Mar-30 3:45:37 PM */ -public abstract class RepeatedTimer { +public abstract class RepeatedTimer implements Describer { public static final Logger LOG = LoggerFactory.getLogger(RepeatedTimer.class); @@ -68,16 +68,16 @@ public RepeatedTimer(String name, int timeoutMs) { * @param timeoutMs timeout millis * @return timeout millis */ - protected int adjustTimeout(int timeoutMs) { + protected int adjustTimeout(final int timeoutMs) { return timeoutMs; } public void run() { - lock.lock(); + this.lock.lock(); try { this.invoking = true; } finally { - lock.unlock(); + this.lock.unlock(); } try { onTrigger(); @@ -85,21 +85,21 @@ public void run() { LOG.error("run timer failed", t); } boolean invokeDestroyed = false; - lock.lock(); + this.lock.lock(); try { this.invoking = false; if (this.stopped) { running = false; - invokeDestroyed = destroyed; + invokeDestroyed = this.destroyed; } else { this.timerTask = null; this.schedule(); } } finally { - lock.unlock(); + this.lock.unlock(); } if (invokeDestroyed) { - this.onDestroy(); + onDestroy(); } } @@ -107,30 +107,30 @@ public void run() { * Run the timer at once, it will cancel the timer and re-schedule it. */ public void runOnceNow() { - lock.lock(); + this.lock.lock(); try { if (this.timerTask != null && this.timerTask.cancel()) { this.timerTask = null; - this.run(); - this.schedule(); + run(); + schedule(); } } finally { - lock.unlock(); + this.lock.unlock(); } } /** - * called after destroy timer. + * Called after destroy timer. */ protected void onDestroy() { - + // NO-OP } /** * Start the timer. */ public void start() { - lock.lock(); + this.lock.lock(); try { if (this.destroyed) { return; @@ -142,10 +142,10 @@ public void start() { if (this.running) { return; } - running = true; + this.running = true; schedule(); } finally { - lock.unlock(); + this.lock.unlock(); } } @@ -159,8 +159,8 @@ private void schedule() { public void run() { try { RepeatedTimer.this.run(); - } catch (Throwable t) { - LOG.error("Run timer task failed taskName={}", name, t); + } catch (final Throwable t) { + LOG.error("Run timer task failed taskName={}.", RepeatedTimer.this.name, t); } } }; @@ -169,10 +169,11 @@ public void run() { /** * Reset timer with new timeoutMs. + * * @param timeoutMs timeout millis */ public void reset(int timeoutMs) { - lock.lock(); + this.lock.lock(); this.timeoutMs = timeoutMs; try { if (this.stopped) { @@ -182,7 +183,7 @@ public void reset(int timeoutMs) { schedule(); } } finally { - lock.unlock(); + this.lock.unlock(); } } @@ -190,11 +191,11 @@ public void reset(int timeoutMs) { * reset timer with current timeoutMs */ public void reset() { - lock.lock(); + this.lock.lock(); try { - this.reset(this.timeoutMs); + reset(this.timeoutMs); } finally { - lock.unlock(); + this.lock.unlock(); } } @@ -203,7 +204,7 @@ public void reset() { */ public void destroy() { boolean invokeDestroyed = false; - lock.lock(); + this.lock.lock(); try { if (this.destroyed) { return; @@ -225,25 +226,18 @@ public void destroy() { } this.timer.cancel(); } finally { - lock.unlock(); + this.lock.unlock(); if (invokeDestroyed) { onDestroy(); } } } - @Override - public String toString() { - return "RepeatedTimer [lock=" + this.lock + ", timerTask=" + this.timerTask + ", stopped=" + this.stopped - + ", running=" + this.running + ", destroyed=" + this.destroyed + ", invoking=" + this.invoking - + ", timeoutMs=" + this.timeoutMs + "]"; - } - /** * Stop timer */ public void stop() { - lock.lock(); + this.lock.lock(); try { if (this.stopped) { return; @@ -255,8 +249,27 @@ public void stop() { this.timerTask = null; } } finally { - lock.unlock(); + this.lock.unlock(); } } + @Override + public void describe(final Printer out) { + final String _describeString; + this.lock.lock(); + try { + _describeString = toString(); + } finally { + this.lock.unlock(); + } + out.print(" ") // + .println(_describeString); + } + + @Override + public String toString() { + return "RepeatedTimer [timerTask=" + this.timerTask + ", stopped=" + this.stopped + ", running=" + this.running + + ", destroyed=" + this.destroyed + ", invoking=" + this.invoking + ", timeoutMs=" + this.timeoutMs + + "]"; + } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/SignalHelper.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/SignalHelper.java new file mode 100644 index 000000000..e6984851b --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/SignalHelper.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author jiachun.fjc + */ +public final class SignalHelper { + + private static final Logger LOG = LoggerFactory.getLogger(SignalHelper.class); + + private static final SignalAccessor SIGNAL_ACCESSOR = getSignalAccessor0(); + + public static final String SIG_USR2 = "USR2"; + + public static boolean supportSignal() { + return SIGNAL_ACCESSOR != null; + } + + /** + * Registers user signal handlers. + * + * @param signalName a signal name + * @param handlers user signal handlers + * @return true if support on current platform + */ + public static boolean addSignal(final String signalName, final List handlers) { + if (SIGNAL_ACCESSOR != null) { + SIGNAL_ACCESSOR.addSignal(signalName, handlers); + return true; + } + return false; + } + + private static SignalAccessor getSignalAccessor0() { + return hasSignal0() ? new SignalAccessor() : null; + } + + private static boolean hasSignal0() { + try { + Class.forName("sun.misc.Signal"); + return true; + } catch (final Throwable t) { + if (LOG.isWarnEnabled()) { + LOG.warn("sun.misc.Signal: unavailable, {}.", t); + } + } + return false; + } + + private SignalHelper() { + } + + static class SignalAccessor { + + public void addSignal(final String signalName, final List handlers) { + final sun.misc.Signal signal = new sun.misc.Signal(signalName); + final SignalHandlerAdapter adapter = new SignalHandlerAdapter(signal, handlers); + sun.misc.Signal.handle(signal, adapter); + } + } + + static class SignalHandlerAdapter implements sun.misc.SignalHandler { + + private final sun.misc.Signal target; + private final List handlers; + + public static void addSignal(final SignalHandlerAdapter adapter) { + sun.misc.Signal.handle(adapter.target, adapter); + } + + public SignalHandlerAdapter(sun.misc.Signal target, List handlers) { + this.target = target; + this.handlers = handlers; + } + + @Override + public void handle(final sun.misc.Signal signal) { + try { + if (!this.target.equals(signal)) { + return; + } + + LOG.info("Handling signal {}.", signal); + + for (final JRaftSignalHandler h : this.handlers) { + h.handle(signal.getName()); + } + } catch (final Throwable t) { + LOG.error("Fail to handle signal: {}.", signal, t); + } + } + } +} diff --git a/jraft-core/src/main/resources/META-INF/services/com.alipay.sofa.jraft.util.JRaftSignalHandler b/jraft-core/src/main/resources/META-INF/services/com.alipay.sofa.jraft.util.JRaftSignalHandler new file mode 100644 index 000000000..481820f15 --- /dev/null +++ b/jraft-core/src/main/resources/META-INF/services/com.alipay.sofa.jraft.util.JRaftSignalHandler @@ -0,0 +1,2 @@ +com.alipay.sofa.jraft.NodeDescribeSignalHandler +com.alipay.sofa.jraft.NodeMetricsSignalHandler \ No newline at end of file diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/util/FileOutputSignalHandlerTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/FileOutputSignalHandlerTest.java new file mode 100644 index 000000000..34479fac3 --- /dev/null +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/FileOutputSignalHandlerTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; + +import org.apache.commons.io.FileUtils; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** + * @author jiachun.fjc + */ +public class FileOutputSignalHandlerTest { + + @Test + public void testGetOutputFileWithEmptyPath() throws IOException { + final File f = getOutputFile("", "test1.log"); + assertTrue(f.exists()); + FileUtils.forceDelete(f); + } + + @Test + public void testGetOutputFileWithPath() throws IOException { + final String path = "abc"; + final File f = getOutputFile(path, "test2.log"); + assertTrue(f.exists()); + FileUtils.forceDelete(new File(path)); + } + + @Test + public void testGetOutputFileWithAbsolutePath() throws IOException { + final String path = Paths.get("cde").toAbsolutePath().toString(); + final File f = getOutputFile(path, "test3.log"); + assertTrue(f.exists()); + FileUtils.forceDelete(new File(path)); + } + + private File getOutputFile(final String path, final String baseName) throws IOException { + return new FileOutputSignalHandler() { + @Override + public void handle(String signalName) { + } + }.getOutputFile(path, baseName); + } +} diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/util/SignalHelperTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/SignalHelperTest.java new file mode 100644 index 000000000..079952cf9 --- /dev/null +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/SignalHelperTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util; + +import java.util.ArrayList; +import java.util.List; + +/** + * + * @author jiachun.fjc + */ +public class SignalHelperTest { + + public static void main(String[] args) throws InterruptedException { + // test with: + // + // kill -s USR2 pid + + final List handlers = new ArrayList<>(); + handlers.add((signalName) -> System.out.println("signal test: " + signalName)); + + if (SignalHelper.supportSignal()) { + SignalHelper.addSignal(SignalHelper.SIG_USR2, handlers); + } + + Thread.sleep(300000); + } +} diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/KVCommandProcessor.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/KVCommandProcessor.java index 5101bff5d..6fed58a63 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/KVCommandProcessor.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/KVCommandProcessor.java @@ -137,7 +137,7 @@ public void handleRequest(final BizContext bizCtx, final AsyncContext asyncCtx, @Override public String interest() { - return reqClazz.getName(); + return this.reqClazz.getName(); } @Override diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/RheaKVMetricsSignalHandler.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/RheaKVMetricsSignalHandler.java new file mode 100644 index 000000000..d48d48ef8 --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/RheaKVMetricsSignalHandler.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rhea; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alipay.sofa.jraft.rhea.metrics.KVMetrics; +import com.alipay.sofa.jraft.util.FileOutputSignalHandler; +import com.alipay.sofa.jraft.util.MetricReporter; +import com.alipay.sofa.jraft.util.SystemPropertyUtil; + +/** + * + * @author jiachun.fjc + */ +public class RheaKVMetricsSignalHandler extends FileOutputSignalHandler { + + private static Logger LOG = LoggerFactory.getLogger(RheaKVMetricsSignalHandler.class); + + private static final String DIR = SystemPropertyUtil.get("rheakv.signal.metrics.dir", ""); + private static final String BASE_NAME = "rheakv_metrics.log"; + + @Override + public void handle(final String signalName) { + try { + final File file = getOutputFile(DIR, BASE_NAME); + + LOG.info("Printing rheakv metrics with signal: {} to file: {}.", signalName, file); + + try (final PrintStream out = new PrintStream(new FileOutputStream(file, true))) { + final MetricReporter reporter = MetricReporter.forRegistry(KVMetrics.metricRegistry()) // + .outputTo(out) // + .prefixedWith("-- rheakv") // + .build(); + reporter.report(); + } + } catch (final IOException e) { + LOG.error("Fail to print rheakv metrics.", e); + } + } +} diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java index 76a9fea08..ef8bb55f5 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java @@ -68,6 +68,7 @@ import com.alipay.sofa.jraft.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.util.MetricThreadPoolExecutor; import com.alipay.sofa.jraft.util.Requires; +import com.alipay.sofa.jraft.util.Utils; import com.codahale.metrics.ScheduledReporter; import com.codahale.metrics.Slf4jReporter; @@ -130,7 +131,7 @@ public synchronized boolean init(final StoreEngineOptions opts) { Endpoint serverAddress = Requires.requireNonNull(opts.getServerAddress(), "opts.serverAddress"); final int port = serverAddress.getPort(); final String ip = serverAddress.getIp(); - if (ip == null || Constants.IP_ANY.equals(ip)) { + if (ip == null || Utils.IP_ANY.equals(ip)) { serverAddress = new Endpoint(NetUtil.getLocalCanonicalHostName(), port); opts.setServerAddress(serverAddress); } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Constants.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Constants.java index 4b9bf7b89..2ef624f7d 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Constants.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Constants.java @@ -27,23 +27,18 @@ */ public final class Constants { - /** 换行符 */ public static final String NEWLINE; static { String newLine; try { newLine = new Formatter().format("%n").toString(); - } catch (Exception e) { + } catch (final Exception e) { newLine = "\n"; } NEWLINE = newLine; } - /** ANY IP address 0.0.0.0 */ - // TODO support ipv6 - public static final String IP_ANY = "0.0.0.0"; - public static final boolean THREAD_AFFINITY_ENABLED = SystemPropertyUtil.getBoolean("rhea.thread.affinity.enabled", false); diff --git a/jraft-rheakv/rheakv-core/src/main/resources/META-INF/services/com.alipay.sofa.jraft.util.JRaftSignalHandler b/jraft-rheakv/rheakv-core/src/main/resources/META-INF/services/com.alipay.sofa.jraft.util.JRaftSignalHandler new file mode 100644 index 000000000..65ed47fe6 --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/main/resources/META-INF/services/com.alipay.sofa.jraft.util.JRaftSignalHandler @@ -0,0 +1 @@ +com.alipay.sofa.jraft.rhea.RheaKVMetricsSignalHandler \ No newline at end of file