Skip to content

Commit

Permalink
small fix
Browse files Browse the repository at this point in the history
Change-Id: Ie16b77460133a9b7a5c27922efc82de171b775c7
  • Loading branch information
javeme committed May 15, 2022
1 parent c49999a commit 9df37f0
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeFactory;
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.auth.AuthManager;
Expand All @@ -55,7 +52,6 @@
import com.baidu.hugegraph.backend.store.BackendStoreSystemInfo;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.config.RpcOptions;
import com.baidu.hugegraph.config.ServerOptions;
import com.baidu.hugegraph.config.TypedOption;
import com.baidu.hugegraph.event.EventHub;
Expand Down Expand Up @@ -110,9 +106,7 @@ public GraphManager(HugeConfig conf, EventHub hub) {

this.initAllSystemSchema();

com.alipay.remoting.rpc.RpcServer remotingRpcServer;
remotingRpcServer = this.remotingRpcServer();
this.waitGraphsReady(remotingRpcServer);
this.waitGraphsReady();

this.checkBackendVersionOrExit(conf);

Expand All @@ -135,13 +129,6 @@ public void loadGraphs(HugeConfig serverConfig) {
}
}

public void waitGraphsReady(com.alipay.remoting.rpc.RpcServer rpcServer) {
this.graphs.keySet().forEach(name -> {
HugeGraph graph = this.graph(name);
graph.waitReady(rpcServer);
});
}

public HugeGraph cloneGraph(String name, String newName,
String configText) {
/*
Expand Down Expand Up @@ -272,22 +259,6 @@ public void close() {
this.unlistenChanges();
}

private void loadGraph(HugeConfig serverConfig, String name, String path) {
HugeConfig config = new HugeConfig(path);
String raftGroupPeers = serverConfig.get(ServerOptions.RAFT_GROUP_PEERS);
config.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(), raftGroupPeers);

final Graph graph = GraphFactory.open(config);
this.graphs.put(name, graph);
LOG.info("Graph '{}' was successfully configured via '{}'", name, path);

if (this.requireAuthentication() &&
!(graph instanceof HugeGraphAuthProxy)) {
LOG.warn("You may need to support access control for '{}' with {}",
path, HugeFactoryAuthProxy.GRAPH_FACTORY);
}
}

private com.alipay.remoting.rpc.RpcServer remotingRpcServer() {
ServerConfig serverConfig = Whitebox.getInternalState(this.rpcServer,
"serverConfig");
Expand Down Expand Up @@ -369,6 +340,31 @@ private void closeTx(final Set<String> graphSourceNamesToCloseTxOn,
});
}

private void loadGraph(HugeConfig serverConfig, String name, String path) {
HugeConfig config = new HugeConfig(path);
String raftGroupPeers = serverConfig.get(ServerOptions.RAFT_GROUP_PEERS);
config.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(), raftGroupPeers);

final Graph graph = GraphFactory.open(config);
this.graphs.put(name, graph);
LOG.info("Graph '{}' was successfully configured via '{}'", name, path);

if (this.requireAuthentication() &&
!(graph instanceof HugeGraphAuthProxy)) {
LOG.warn("You may need to support access control for '{}' with {}",
path, HugeFactoryAuthProxy.GRAPH_FACTORY);
}
}

private void waitGraphsReady() {
com.alipay.remoting.rpc.RpcServer remotingRpcServer =
this.remotingRpcServer();
this.graphs.keySet().forEach(name -> {
HugeGraph graph = this.graph(name);
graph.waitReady(remotingRpcServer);
});
}

private void checkBackendVersionOrExit(HugeConfig config) {
LOG.info("Check backend version");
for (String graph : this.graphs()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

import com.baidu.hugegraph.config.HugeConfig;
import org.slf4j.Logger;

import com.baidu.hugegraph.HugeGraph;
Expand Down Expand Up @@ -134,15 +133,10 @@ public void clear() throws BackendException {
}

@Override
public void truncate(HugeGraph graph) {
public void truncate() {
this.checkOpened();
HugeConfig config = (HugeConfig) graph.configuration();
String systemStoreName = config.get(CoreOptions.STORE_SYSTEM);
for (BackendStore store : this.stores.values()) {
// Don't truncate system store
if (!store.store().equals(systemStoreName)) {
store.truncate();
}
store.truncate();
}
this.notifyAndWaitEvent(Events.STORE_TRUNCATE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public interface BackendStoreProvider {

public void clear();

public void truncate(HugeGraph graph);
public void truncate();

public void initSystemInfo(HugeGraph graph);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public RaftBackendStoreProvider(BackendStoreProvider provider) {
}

public void initRaftContext(HugeGraphParams params, RpcServer rpcServer) {
// TODO: pass ServerOptions instead of CoreOptions, to share by graphs
HugeConfig config = params.configuration();
Integer lowWaterMark = config.get(
CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK);
Expand All @@ -74,13 +75,6 @@ public void initRaftContext(HugeGraphParams params, RpcServer rpcServer) {
System.setProperty("bolt.channel_write_buf_high_water_mark",
String.valueOf(highWaterMark));

// TODO: pass ServerOptions object to core context
// PeerId endpoint = new PeerId();
// String endpointStr = config.get(ServerOptions.RAFT_ENDPOINT);
// if (!endpoint.parse(endpointStr)) {
// throw new HugeException("Failed to parse endpoint %s", endpointStr);
// }

// Reference from RaftRpcServerFactory.createAndStartRaftRpcServer
com.alipay.sofa.jraft.rpc.RpcServer raftRpcServer =
new BoltRpcServer(rpcServer);
Expand Down Expand Up @@ -213,15 +207,10 @@ public void clear() {
}

@Override
public void truncate(HugeGraph graph) {
public void truncate() {
this.checkOpened();
HugeConfig config = (HugeConfig) graph.configuration();
String systemStoreName = config.get(CoreOptions.STORE_SYSTEM);
for (RaftBackendStore store : this.stores()) {
// Don't truncate system store
if (!store.store().equals(systemStoreName)) {
store.truncate();
}
store.truncate();
}
this.notifyAndWaitEvent(Events.STORE_TRUNCATE);

Expand Down Expand Up @@ -258,13 +247,12 @@ public void createSnapshot() {
RaftStoreClosure closure = new RaftStoreClosure(command);
RaftClosure<?> future = this.context.node().submitAndWait(command,
closure);
if (future != null) {
try {
future.waitFinished();
LOG.debug("Graph '{}' has writed snapshot", this.graph());
} catch (Throwable e) {
throw new BackendException("Failed to create snapshot", e);
}
E.checkState(future != null, "The snapshot future can't be null");
try {
future.waitFinished();
LOG.debug("Graph '{}' has writed snapshot", this.graph());
} catch (Throwable e) {
throw new BackendException("Failed to create snapshot", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public final class RaftContext {
private final String systemStoreName;
private final RaftBackendStore[] stores;
private final Configuration groupPeers;
@SuppressWarnings("unused")
private final ExecutorService readIndexExecutor;
private final ExecutorService snapshotExecutor;
private final ExecutorService backendExecutor;
Expand All @@ -119,19 +118,14 @@ public RaftContext(HugeGraphParams params, RpcServer rpcServer,
this.stores = new RaftBackendStore[StoreType.ALL.getNumber()];

/*
* TODO Depending on the name of the config item for server options,
* need to get through ServerConfig and CoreConfig
* NOTE: `raft.group_peers` option is transfered from ServerConfig
* to CoreConfig, since it's shared by all graphs.
*/
// this.endpoint = new PeerId();
// String endpointStr = config.getString("raft.endpoint");
// if (!this.endpoint.parse(endpointStr)) {
// throw new HugeException("Failed to parse endpoint %s", endpointStr);
// }
this.groupPeers = new Configuration();
String groupPeersStr = config.getString("raft.group_peers");
if (!this.groupPeers.parse(groupPeersStr)) {
throw new HugeException("Failed to parse group peers %s",
groupPeersStr);
String groupPeersString = config.getString("raft.group_peers");
if (!this.groupPeers.parse(groupPeersString)) {
throw new HugeException("Failed to parse raft.group_peers: '%s'",
groupPeersString);
}

if (config.get(CoreOptions.RAFT_SAFE_READ)) {
Expand All @@ -140,9 +134,12 @@ public RaftContext(HugeGraphParams params, RpcServer rpcServer,
} else {
this.readIndexExecutor = null;
}
this.snapshotExecutor = this.createSnapshotExecutor(4);
int backendThreads = config.get(CoreOptions.RAFT_BACKEND_THREADS);
this.backendExecutor = this.createBackendExecutor(backendThreads);

int threads = config.get(CoreOptions.RAFT_SNAPSHOT_THREADS);
this.snapshotExecutor = this.createSnapshotExecutor(threads);

threads = config.get(CoreOptions.RAFT_BACKEND_THREADS);
this.backendExecutor = this.createBackendExecutor(threads);

this.raftNode = null;
this.raftGroupManager = null;
Expand Down Expand Up @@ -231,9 +228,9 @@ public NodeOptions nodeOptions() throws IOException {
nodeOptions.setRpcConnectTimeoutMs(
config.get(CoreOptions.RAFT_RPC_CONNECT_TIMEOUT));
nodeOptions.setRpcDefaultTimeout(
config.get(CoreOptions.RAFT_RPC_TIMEOUT));
1000 * config.get(CoreOptions.RAFT_RPC_TIMEOUT));
nodeOptions.setRpcInstallSnapshotTimeout(
config.get(CoreOptions.RAFT_INSTALL_SNAPSHOT_TIMEOUT));
1000 * config.get(CoreOptions.RAFT_INSTALL_SNAPSHOT_TIMEOUT));

int electionTimeout = config.get(CoreOptions.RAFT_ELECTION_TIMEOUT);
nodeOptions.setElectionTimeoutMs(electionTimeout);
Expand Down Expand Up @@ -363,6 +360,8 @@ public GraphMode graphMode() {
private HugeConfig config() {
return this.params.configuration();
}

@SuppressWarnings("unused")
private RpcServer initAndStartRpcServer() {
Integer lowWaterMark = this.config().get(
CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK);
Expand All @@ -377,7 +376,7 @@ private RpcServer initAndStartRpcServer() {
NodeManager.getInstance().addAddress(endpoint.getEndpoint());
RpcServer rpcServer = RaftRpcServerFactory.createAndStartRaftRpcServer(
endpoint.getEndpoint());
LOG.info("RPC server is started successfully");
LOG.info("Raft-RPC server is started successfully");
return rpcServer;
}

Expand All @@ -392,6 +391,7 @@ private void registerRpcRequestProcessors() {
this.rpcServer.registerProcessor(new SetLeaderProcessor(this));
this.rpcServer.registerProcessor(new ListPeersProcessor(this));
}

private ExecutorService createReadIndexExecutor(int coreThreads) {
int maxThreads = coreThreads << 2;
String name = "store-read-index-callback";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.RaftServiceFactory;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.core.Replicator.ReplicatorStateListener;
Expand All @@ -49,7 +48,7 @@ public final class RaftNode {
private static final Logger LOG = Log.logger(RaftNode.class);

private final RaftContext context;
private final RaftGroupService raftGroupService;
private RaftGroupService raftGroupService;
private final Node node;
private final StoreStateMachine stateMachine;
private final AtomicReference<LeaderInfo> leaderInfo;
Expand All @@ -60,9 +59,8 @@ public RaftNode(RaftContext context) {
this.context = context;
this.stateMachine = new StoreStateMachine(context);
try {
this.raftGroupService = this.initRaftNode();
// Start node
this.node = this.raftGroupService.start(false);
// Start raft node
this.node = this.initRaftNode();
LOG.info("Start raft node: {}", this);
} catch (IOException e) {
throw new BackendException("Failed to init raft node", e);
Expand Down Expand Up @@ -102,6 +100,16 @@ public void onLeaderInfoChange(PeerId leaderId, boolean selfIsLeader) {
public void shutdown() {
LOG.info("Shutdown raft node: {}", this);
this.node.shutdown();

if (this.raftGroupService != null) {
this.raftGroupService.shutdown();
try {
this.raftGroupService.join();
} catch (final InterruptedException e) {
throw new RaftException(
"Interrupted while shutdown raftGroupService");
}
}
}

public RaftClosure<?> snapshot() {
Expand All @@ -114,36 +122,27 @@ public RaftClosure<?> snapshot() {
}
}

private RaftGroupService initRaftNode() throws IOException {
private Node initRaftNode() throws IOException {
NodeOptions nodeOptions = this.context.nodeOptions();
nodeOptions.setFsm(this.stateMachine);
/*
* TODO: the groupId is same as graph name now, when support sharding,
* groupId needs to be bound to shard Id
* groupId needs to be bound to shard Id
*/
String groupId = this.context.group();
PeerId endpoint = this.context.endpoint();
/*
* Start raft node with shared rpc server:
* Create RaftGroupService with shared rpc-server, then start raft node
* TODO: don't create + hold RaftGroupService and just share rpc-server
* and create Node by RaftServiceFactory.createAndInitRaftNode()
*/
RpcServer rpcServer = this.context.rpcServer();
LOG.info("The raft endpoint '{}', initial group peers [{}]",
endpoint, nodeOptions.getInitialConf());
// Shared rpc server
return new RaftGroupService(groupId, endpoint, nodeOptions,
rpcServer, true);
}

public void close() {
if (this.raftGroupService != null) {
this.raftGroupService.shutdown();
try {
this.raftGroupService.join();
} catch (final InterruptedException e) {
throw new RaftException("Interrupted while shutdown " +
"raftGroupService");
}
}
LOG.debug("Start raft node with endpoint '{}', initial conf [{}]",
endpoint, nodeOptions.getInitialConf());
this.raftGroupService = new RaftGroupService(groupId, endpoint,
nodeOptions,
rpcServer, true);
return this.raftGroupService.start(false);
}

private void submitCommand(StoreCommand command, RaftStoreClosure closure) {
Expand Down
Loading

0 comments on commit 9df37f0

Please sign in to comment.