Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log entry checksum validation #123

Merged
merged 27 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
28a1b7c
(feat) Adds JRaftServiceFactory for custom services such as LogStorag…
Apr 18, 2019
3b70045
(feat) update toString in NodeOptions
Apr 18, 2019
469f76b
(feat) Supports custom log entry serialization, #106
Apr 18, 2019
03c325e
(fix) testSaveFail
Apr 18, 2019
f2bb7f8
(feat) Adds serviceFactory to BootstrapOptions and fix tests
Apr 19, 2019
8008c46
Merge branch 'feature/log-entry-enhances' into feature/log-codec-factory
killme2008 Apr 21, 2019
7937443
(fix) by code review comments
Apr 21, 2019
0557c8f
(feat) Impl log entry checksum
Apr 18, 2019
b681d43
(feat) remove RaftGroupService default constructor
Apr 19, 2019
4dbaa16
(feat) Checksum in iterator and readUserCommitLog
Apr 19, 2019
42a845f
(feat) Refactor old version 1 log entry codec
Apr 19, 2019
d7a25ea
(feat) Impl v2 codec factory supports checksum
Apr 21, 2019
638fac7
(feat) format code
Apr 21, 2019
42c9e8e
(feat) Adds reserved flag
Apr 21, 2019
54d4ab1
(feat) Adds test for v2 codec
Apr 21, 2019
d10524b
(feat) Adds log entry checksum test
Apr 21, 2019
9462ea0
(fix) header comments
Apr 21, 2019
c3440a5
(fix) resolve conflicts
Apr 21, 2019
d4f70d2
(fix) comment
Apr 21, 2019
e0b2a63
(fix) by code review comments
Apr 22, 2019
30a2f43
(feat) tweak v2 codec performance
Apr 22, 2019
9246ec5
(feat) Use AsciiStringUtil.unsafeEncode replacs Utils.getBytes in pee…
Apr 22, 2019
4d84be1
(feat) Use ThreadLocal to replace FastThreadLocal
Apr 22, 2019
bae170a
(feat) forgot CrcUtilTest
Apr 22, 2019
55c44a8
(feat) remove unnecessary method handler (#124)
fengjiachun Apr 23, 2019
30d1f00
Feat/optimize encode (#126)
fengjiachun Apr 23, 2019
1af9a30
feat/optimize v2 decode (#127)
fengjiachun Apr 24, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 8 additions & 18 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang.StringUtils;

Expand All @@ -29,6 +28,7 @@
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.BootstrapOptions;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;

/**
Expand All @@ -49,7 +49,7 @@ private JRaftUtils() {
* @param opts options of bootstrap
* @return true if bootstrap success
*/
public static boolean bootstrap(BootstrapOptions opts) throws InterruptedException {
public static boolean bootstrap(final BootstrapOptions opts) throws InterruptedException {
final NodeImpl node = new NodeImpl();
final boolean ret = node.bootstrap(opts);
node.shutdown();
Expand All @@ -64,7 +64,7 @@ public static boolean bootstrap(BootstrapOptions opts) throws InterruptedExcepti
* @param number thread number
* @return a new {@link ThreadPoolExecutor} instance
*/
public static Executor createExecutor(final String prefix, int number) {
public static Executor createExecutor(final String prefix, final int number) {
if (number <= 0) {
return null;
}
Expand All @@ -81,24 +81,14 @@ public static Executor createExecutor(final String prefix, int number) {
* @since 0.0.3
*/
public static ThreadFactory createThreadFactory(final String prefixName) {
return new ThreadFactory() {
private final AtomicInteger c = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
final Thread t = new Thread(r);
t.setName(prefixName + c.getAndIncrement());
t.setDaemon(true);
return t;
}
};
return new NamedThreadFactory(prefixName);
}

/**
* Create a configuration from a string in the form of "host1:port1[:idx],host2:port2[:idx]......",
* returns a empty configuration when string is blank.
*/
public static Configuration getConfiguration(String s) {
public static Configuration getConfiguration(final String s) {
final Configuration conf = new Configuration();
if (StringUtils.isBlank(s)) {
return conf;
Expand All @@ -113,7 +103,7 @@ public static Configuration getConfiguration(String s) {
* Create a peer from a string in the form of "host:port[:idx]",
* returns a empty peer when string is blank.
*/
public static PeerId getPeerId(String s) {
public static PeerId getPeerId(final String s) {
final PeerId peer = new PeerId();
if (StringUtils.isBlank(s)) {
return peer;
Expand All @@ -128,11 +118,11 @@ public static PeerId getPeerId(String s) {
* Create a Endpoint instance from a string in the form of "host:port",
* returns null when string is blank.
*/
public static Endpoint getEndPoint(String s) {
public static Endpoint getEndPoint(final String s) {
if (StringUtils.isBlank(s)) {
return null;
}
final String[] tmps = s.split(":");
final String[] tmps = StringUtils.split(s, ':');
if (tmps.length != 2) {
throw new IllegalArgumentException("Invalid endpoint string: " + s);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class RaftGroupService {
/**
* If we want to share the rpcServer instance, then we can't stop it when shutdown.
*/
private boolean sharedRpcServer;
private final boolean sharedRpcServer;

/**
* The raft group id
Expand All @@ -75,22 +75,19 @@ public class RaftGroupService {
*/
private Node node;

public RaftGroupService() {
this(null, null, null);
}

public RaftGroupService(String groupId, PeerId serverId, NodeOptions nodeOptions) {
public RaftGroupService(final String groupId, final PeerId serverId, final NodeOptions nodeOptions) {
this(groupId, serverId, nodeOptions, RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint(),
JRaftUtils.createExecutor("RAFT-RPC-executor-", nodeOptions.getRaftRpcThreadPoolSize()),
JRaftUtils.createExecutor("CLI-RPC-executor-", nodeOptions.getCliRpcThreadPoolSize())));
}

public RaftGroupService(String groupId, PeerId serverId, NodeOptions nodeOptions, RpcServer rpcServer) {
public RaftGroupService(final String groupId, final PeerId serverId, final NodeOptions nodeOptions,
final RpcServer rpcServer) {
this(groupId, serverId, nodeOptions, rpcServer, false);
}

public RaftGroupService(String groupId, PeerId serverId, NodeOptions nodeOptions, RpcServer rpcServer,
boolean sharedRpcServer) {
public RaftGroupService(final String groupId, final PeerId serverId, final NodeOptions nodeOptions,
final RpcServer rpcServer, final boolean sharedRpcServer) {
super();
this.groupId = groupId;
this.serverId = serverId;
Expand All @@ -115,21 +112,21 @@ public synchronized Node start() {
*
* @param startRpcServer whether to start RPC server.
*/
public synchronized Node start(boolean startRpcServer) {
public synchronized Node start(final boolean startRpcServer) {
if (this.started) {
return this.node;
}
if (this.serverId == null || this.serverId.getEndpoint() == null
|| this.serverId.getEndpoint().equals(new Endpoint(Utils.IP_ANY, 0))) {
throw new IllegalArgumentException("Blank serverId:" + serverId);
throw new IllegalArgumentException("Blank serverId:" + this.serverId);
}
if (StringUtils.isBlank(this.groupId)) {
throw new IllegalArgumentException("Blank group id" + this.groupId);
}
//Adds RPC server to Server.
NodeManager.getInstance().addAddress(this.serverId.getEndpoint());

this.node = RaftServiceFactory.createAndInitRaftNode(groupId, serverId, nodeOptions);
this.node = RaftServiceFactory.createAndInitRaftNode(this.groupId, this.serverId, this.nodeOptions);
if (startRpcServer) {
this.rpcServer.start();
} else {
Expand Down Expand Up @@ -193,7 +190,7 @@ public String getGroupId() {
/**
* Set the raft group id
*/
public void setGroupId(String groupId) {
public void setGroupId(final String groupId) {
if (this.started) {
throw new IllegalStateException("Raft group service already started");
}
Expand All @@ -210,7 +207,7 @@ public PeerId getServerId() {
/**
* Set the node serverId
*/
public void setServerId(PeerId serverId) {
public void setServerId(final PeerId serverId) {
if (this.started) {
throw new IllegalStateException("Raft group service already started");
}
Expand All @@ -227,7 +224,7 @@ public RpcOptions getNodeOptions() {
/**
* Set node options.
*/
public void setNodeOptions(NodeOptions nodeOptions) {
public void setNodeOptions(final NodeOptions nodeOptions) {
if (this.started) {
throw new IllegalStateException("Raft group service already started");
}
Expand All @@ -248,7 +245,7 @@ public RpcServer getRpcServer() {
/**
* Set rpc server.
*/
public void setRpcServer(RpcServer rpcServer) {
public void setRpcServer(final RpcServer rpcServer) {
if (this.started) {
throw new IllegalStateException("Raft group service already started");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.List;
import java.util.Set;

import org.apache.commons.lang.StringUtils;

import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.util.Copiable;

Expand All @@ -40,7 +42,7 @@ public Configuration() {
super();
}

public Configuration(Iterable<PeerId> conf) {
public Configuration(final Iterable<PeerId> conf) {
for (final PeerId peer : conf) {
this.peers.add(peer.copy());
}
Expand All @@ -56,50 +58,50 @@ public void reset() {
}

public boolean isEmpty() {
return peers.isEmpty();
return this.peers.isEmpty();
}

public int size() {
return peers.size();
return this.peers.size();
}

@Override
public Iterator<PeerId> iterator() {
return peers.iterator();
return this.peers.iterator();
}

public Set<PeerId> getPeerSet() {
return new HashSet<>(peers);
return new HashSet<>(this.peers);
}

public List<PeerId> listPeers() {
return new ArrayList<>(this.peers);
}

public List<PeerId> getPeers() {
return peers;
return this.peers;
}

public void setPeers(List<PeerId> peers) {
public void setPeers(final List<PeerId> peers) {
this.peers.clear();
for (final PeerId peer : peers) {
this.peers.add(peer.copy());
}
}

public void appendPeers(Collection<PeerId> set) {
public void appendPeers(final Collection<PeerId> set) {
this.peers.addAll(set);
}

public boolean addPeer(PeerId peer) {
public boolean addPeer(final PeerId peer) {
return this.peers.add(peer);
}

public boolean removePeer(PeerId peer) {
public boolean removePeer(final PeerId peer) {
return this.peers.remove(peer);
}

public boolean contains(PeerId peer) {
public boolean contains(final PeerId peer) {
return this.peers.contains(peer);
}

Expand All @@ -112,7 +114,7 @@ public int hashCode() {
}

@Override
public boolean equals(Object obj) {
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
Expand All @@ -133,7 +135,7 @@ public boolean equals(Object obj) {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
final List<PeerId> peers = this.listPeers();
final List<PeerId> peers = listPeers();
int i = 0;
final int size = peers.size();
for (final PeerId peer : peers) {
Expand All @@ -146,12 +148,12 @@ public String toString() {
return sb.toString();
}

public boolean parse(String conf) {
public boolean parse(final String conf) {
if (conf == null) {
return false;
}
reset();
final String[] peerStrs = conf.split(",");
final String[] peerStrs = StringUtils.split(conf, ",");
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
for (final String peerStr : peerStrs) {
final PeerId peer = new PeerId();
if (peer.parse(peerStr)) {
Expand All @@ -166,7 +168,7 @@ public boolean parse(String conf) {
* |included| would be assigned to |*this| - |rhs|
* |excluded| would be assigned to |rhs| - |*this|
*/
public void diff(Configuration rhs, Configuration included, Configuration excluded) {
public void diff(final Configuration rhs, final Configuration included, final Configuration excluded) {
included.peers = new ArrayList<>(this.peers);
included.peers.removeAll(rhs.peers);
excluded.peers = new ArrayList<>(rhs.peers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.apache.commons.lang.StringUtils;

import com.alipay.sofa.jraft.JRaftServiceFactory;
import com.alipay.sofa.jraft.entity.codec.DefaultLogEntryCodecFactory;
import com.alipay.sofa.jraft.entity.codec.LogEntryCodecFactory;
import com.alipay.sofa.jraft.entity.codec.v2.LogEntryV2CodecFactory;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.storage.LogStorage;
import com.alipay.sofa.jraft.storage.RaftMetaStorage;
Expand Down Expand Up @@ -65,6 +65,6 @@ public RaftMetaStorage createRaftMetaStorage(final String uri, final RaftOptions

@Override
public LogEntryCodecFactory createLogEntryCodecFactory() {
return DefaultLogEntryCodecFactory.getInstance();
return LogEntryV2CodecFactory.getInstance();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public boolean init(final FSMCallerOptions opts) {
this.node = opts.getNode();
this.nodeMetrics = this.node.getNodeMetrics();
this.lastAppliedIndex.set(opts.getBootstrapId().getIndex());
this.notifyLastAppliedIndexUpdated(this.lastAppliedIndex.get());
notifyLastAppliedIndexUpdated(this.lastAppliedIndex.get());
this.lastAppliedTerm = opts.getBootstrapId().getTerm();
this.disruptor = new Disruptor<>(new ApplyTaskFactory(), opts.getDisruptorBufferSize(), new NamedThreadFactory(
"JRaft-FSMCaller-disruptor-", true));
Expand Down Expand Up @@ -262,15 +262,15 @@ public boolean onSnapshotSave(final SaveSnapshotClosure done) {

@Override
public boolean onLeaderStop(final Status status) {
return this.enqueueTask((task, sequence) -> {
return enqueueTask((task, sequence) -> {
task.type = TaskType.LEADER_STOP;
task.status = new Status(status);
});
}

@Override
public boolean onLeaderStart(final long term) {
return this.enqueueTask((task, sequence) -> {
return enqueueTask((task, sequence) -> {
task.type = TaskType.LEADER_START;
task.term = term;
});
Expand Down Expand Up @@ -301,7 +301,7 @@ public boolean onStopFollowing(final LeaderChangeContext ctx) {
public class OnErrorClosure implements Closure {
private RaftException error;

public OnErrorClosure(RaftException error) {
public OnErrorClosure(final RaftException error) {
super();
this.error = error;
}
Expand All @@ -310,7 +310,7 @@ public RaftException getError() {
return this.error;
}

public void setError(RaftException error) {
public void setError(final RaftException error) {
this.error = error;
}

Expand All @@ -322,7 +322,7 @@ public void run(final Status st) {
@Override
public boolean onError(final RaftException error) {
final OnErrorClosure c = new OnErrorClosure(error);
return this.enqueueTask((task, sequence) -> {
return enqueueTask((task, sequence) -> {
task.type = TaskType.ERROR;
task.done = c;
});
Expand Down Expand Up @@ -406,7 +406,7 @@ private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final bo
break;
}
} finally {
nodeMetrics.recordLatency(task.type.metricName(), Utils.monotonicMs() - startMs);
this.nodeMetrics.recordLatency(task.type.metricName(), Utils.monotonicMs() - startMs);
}
}
try {
Expand Down Expand Up @@ -680,7 +680,7 @@ private void setError(final RaftException e) {

@OnlyForTest
RaftException getError() {
return error;
return this.error;
}

private boolean passByStatus(final Closure done) {
Expand Down
Loading