Skip to content

Commit

Permalink
address some comments
Browse files Browse the repository at this point in the history
Change-Id: Ic27656f2fb8793d02baf40e720b609b4a6fb9a12
  • Loading branch information
Linary committed May 15, 2022
1 parent 435edfe commit c49999a
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,30 +108,16 @@ public GraphManager(HugeConfig conf, EventHub hub) {
// Raft will load snapshot firstly then launch election and replay log
this.startRpcServer();

initAllSystemSchema();
this.initAllSystemSchema();

ServerConfig serverConfig = Whitebox.getInternalState(this.rpcServer,
"serverConfig");
com.alipay.remoting.rpc.RpcServer remotingRpcServer;
remotingRpcServer = Whitebox.getInternalState(serverConfig.getServer(),
"remotingServer");
remotingRpcServer = this.remotingRpcServer();
this.waitGraphsReady(remotingRpcServer);

this.checkBackendVersionOrExit(conf);

this.serverStarted(conf);
this.addMetrics(conf);
}

public {
//
register(-1, ~task, xx, xx, xx,);






}

public void loadGraphs(HugeConfig serverConfig) {
Expand Down Expand Up @@ -302,28 +288,12 @@ private void loadGraph(HugeConfig serverConfig, String name, String path) {
}
}

// private com.alipay.sofa.jraft.rpc.RpcServer startRaftRpcServer(HugeConfig
// config) {
// Integer lowWaterMark = config.get(
// CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK);
// System.setProperty("bolt.channel_write_buf_low_water_mark",
// String.valueOf(lowWaterMark));
// Integer highWaterMark = config.get(
// CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK);
// System.setProperty("bolt.channel_write_buf_high_water_mark",
// String.valueOf(highWaterMark));
//
// PeerId endpoint = new PeerId();
// String endpointStr = config.get(ServerOptions.RAFT_ENDPOINT);
// if (!endpoint.parse(endpointStr)) {
// throw new HugeException("Failed to parse endpoint %s", endpointStr);
// }
// com.alipay.sofa.jraft.rpc.RpcServer rpcServer;
// rpcServer = RaftRpcServerFactory.createAndStartRaftRpcServer(
// endpoint.getEndpoint());
// LOG.info("Raft RPC server is started successfully");
// return rpcServer;
// }
private com.alipay.remoting.rpc.RpcServer remotingRpcServer() {
ServerConfig serverConfig = Whitebox.getInternalState(this.rpcServer,
"serverConfig");
return Whitebox.getInternalState(serverConfig.getServer(),
"remotingServer");
}

private void startRpcServer() {
if (!this.rpcServer.enabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.backend.BackendException;
Expand Down Expand Up @@ -75,6 +74,7 @@ public void initRaftContext(HugeGraphParams params, RpcServer rpcServer) {
System.setProperty("bolt.channel_write_buf_high_water_mark",
String.valueOf(highWaterMark));

// TODO: pass ServerOptions object to core context
// PeerId endpoint = new PeerId();
// String endpointStr = config.get(ServerOptions.RAFT_ENDPOINT);
// if (!endpoint.parse(endpointStr)) {
Expand All @@ -89,8 +89,6 @@ public void initRaftContext(HugeGraphParams params, RpcServer rpcServer) {

PeerId endpoint = new PeerId(rpcServer.ip(), rpcServer.port());
this.context = new RaftContext(params, raftRpcServer, endpoint);

//
this.context.addStore(StoreType.SYSTEM, this.systemStore);
}

Expand Down Expand Up @@ -236,9 +234,7 @@ public void initSystemInfo(HugeGraph graph) {
BackendStoreSystemInfo info = graph.backendStoreSystemInfo();
info.init();

// 创建系统schema,保存到pool里面
init();
// this.notifyAndWaitEvent(Events.STORE_INITED);
this.init();
LOG.debug("Graph '{}' system info has been initialized", this.graph());
/*
* Take the initiative to generate a snapshot, it can avoid this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,15 @@ public RaftContext(HugeGraphParams params, RpcServer rpcServer,
this.systemStoreName = config.get(CoreOptions.STORE_SYSTEM);
this.stores = new RaftBackendStore[StoreType.ALL.getNumber()];

// // TODO: 依赖了ServerOptions的配置项名,需要打通ServerConfig和CoreConfig
// this.endpoint = new PeerId();
// String endpointStr = config.getString("raft.endpoint");
// if (!this.endpoint.parse(endpointStr)) {
// throw new HugeException("Failed to parse endpoint %s", endpointStr);
// }
/*
* TODO Depending on the name of the config item for server options,
* need to get through ServerConfig and CoreConfig
*/
// this.endpoint = new PeerId();
// String endpointStr = config.getString("raft.endpoint");
// if (!this.endpoint.parse(endpointStr)) {
// throw new HugeException("Failed to parse endpoint %s", endpointStr);
// }
this.groupPeers = new Configuration();
String groupPeersStr = config.getString("raft.group_peers");
if (!this.groupPeers.parse(groupPeersStr)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void close() {
private void submitCommand(StoreCommand command, RaftStoreClosure closure) {
// Wait leader elected
LeaderInfo leaderInfo = this.waitLeaderElected(
RaftContext.WAIT_LEADER_TIMEOUT);
RaftContext.WAIT_LEADER_TIMEOUT);
if (!leaderInfo.selfIsLeader) {
this.context.rpcForwarder().forwardToLeader(leaderInfo.leaderId,
command, closure);
Expand Down

0 comments on commit c49999a

Please sign in to comment.