Skip to content

Commit

Permalink
Add JSON-RPC method: peersInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
leeyr committed Apr 4, 2019
1 parent 5d4f580 commit e76f5c4
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 36 deletions.
64 changes: 33 additions & 31 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions cita-chain/src/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,16 +352,17 @@ impl Forward {
error!("Get messages which should not handle by this function!");
}

Request::peercount(_) | Request::un_tx(_) | Request::software_version(_) => {
error!("Get messages which should sent to other micro services!");
}
Request::storage_key(skey) => {
trace!("storage key info is {:?}", skey);
self.ctx_pub
.send((routing_key!(Chain >> Request).into(), imsg))
.unwrap();
return;
}

_ => {
error!("Get messages which should sent to other micro services!");
}
};
let msg: Message = response.into();
self.ctx_pub
Expand Down
1 change: 1 addition & 0 deletions cita-jsonrpc/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub type ReqSender = Mutex<Sender<(String, ProtoRequest)>>;
pub fn select_topic(method: &str) -> String {
match method {
"peerCount" => routing_key!(Jsonrpc >> RequestNet).into(),
"peersInfo" => routing_key!(Jsonrpc >> RequestPeersInfo).into(),
"sendRawTransaction" | "sendTransaction" => routing_key!(Jsonrpc >> RequestNewTx).into(),
"getVersion" => routing_key!(Jsonrpc >> RequestRpc).into(),
_ => routing_key!(Jsonrpc >> Request).into(),
Expand Down
1 change: 1 addition & 0 deletions cita-jsonrpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
//! | jsonrpc | Jsonrpc | Auth | RequestNewTxBatch |
//! | jsonrpc | Jsonrpc | Chain | Request |
//! | jsonrpc | Jsonrpc | Net | RequestNet |
//! | jsonrpc | jsonrpc | Net | RequestPeersInfo |
//!
//! ### Key behavior
//!
Expand Down
2 changes: 2 additions & 0 deletions cita-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ util = { git = "https://github.com/cryptape/cita-common.git", branch = "develop"
libproto = { git = "https://github.com/cryptape/cita-common.git", branch = "develop" }
pubsub = { git = "https://github.com/cryptape/cita-common.git", branch = "develop" }
logger = { git = "https://github.com/cryptape/cita-common.git", branch = "develop" }
jsonrpc-types = { git = "https://github.com/cryptape/cita-common.git", branch = "develop" }
serde = "1.0.84"
serde_json = "1.0"
serde_derive = "1.0.84"
clap = "2.32"
bytes = "0.4"
Expand Down
3 changes: 2 additions & 1 deletion cita-network/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
//! | network_consensus | Consensus | RawBytes |
//! | network | Chain | Status |
//! | network | Chain | SyncResponse |
//! | network | Jonsonrpc | RequestNet |
//! | network | Jsonrpc | RequestNet |
//! | network | Jsonrpc | RequestPeersInfo |
//! | network | Auth | GetBlockTxn |
//! | network | Auth | BlockTxn |
//!
Expand Down
1 change: 1 addition & 0 deletions cita-network/src/mq_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl MqAgent {
Chain >> Status,
Chain >> SyncResponse,
Jsonrpc >> RequestNet,
Jsonrpc >> RequestPeersInfo,
Snapshot >> SnapshotReq
]),
ctx_sub_other_modules,
Expand Down
55 changes: 54 additions & 1 deletion cita-network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use crate::mq_agent::{MqAgentClient, PubMessage};
use crate::node_manager::{BroadcastReq, GetPeerCountReq, NodesManagerClient, SingleTxReq};
use crate::node_manager::{
BroadcastReq, GetPeerCountReq, GetPeersInfoReq, NodesManagerClient, SingleTxReq,
};
use crate::synchronizer::{SynchronizerClient, SynchronizerMessage};
use jsonrpc_types::rpc_types::PeersInfo;
use jsonrpc_types::ErrorCode;
use libproto::router::{MsgType, RoutingKey, SubModules};
use libproto::routing_key;
use libproto::snapshot::{Cmd, Resp, SnapshotResp};
use libproto::{Message as ProtoMessage, OperateType, Response};
use libproto::{TryFrom, TryInto};
use logger::{error, info, trace, warn};
use pubsub::channel::{unbounded, Receiver, Sender};
use serde_json;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

Expand Down Expand Up @@ -143,6 +148,9 @@ impl LocalMessage {
routing_key!(Jsonrpc >> RequestNet) => {
self.reply_rpc(&self.data, service);
}
routing_key!(Jsonrpc >> RequestPeersInfo) => {
self.reply_peers_info(&self.data, service);
}
routing_key!(Snapshot >> SnapshotReq) => {
info!("[Network] Set disconnect and response");
self.snapshot_req(&self.data, service);
Expand Down Expand Up @@ -185,6 +193,51 @@ impl LocalMessage {
}
}

fn reply_peers_info(&self, data: &[u8], service: &mut Network) {
let mut msg = ProtoMessage::try_from(data).unwrap();

let req_opt = msg.take_request();
{
if let Some(mut req) = req_opt {
// Get peer count and send back to JsonRpc from MQ
if req.has_peers_info() {
let mut response = Response::new();
response.set_request_id(req.take_request_id());

let (tx, rx) = unbounded();
service
.nodes_mgr_client
.get_peers_info(GetPeersInfoReq::new(tx));

// Get peers from rx channel
// FIXME: This is a block receive, double check about this
let peers = rx.recv().unwrap();

let peers_info = PeersInfo {
amount: peers.len() as u32,
peers: Some(peers),
error_message: None,
};

if let Ok(json_peers_info) = serde_json::to_value(peers_info) {
response.set_peers_info(json_peers_info.to_string());
} else {
response.set_code(ErrorCode::InternalError.code());
response.set_error_msg(ErrorCode::InternalError.description());
}

let msg: ProtoMessage = response.into();
service.mq_client.send_peer_count(PubMessage::new(
routing_key!(Net >> Response).into(),
msg.try_into().unwrap(),
));
}
} else {
warn!("[Network] Receive unexpected get peers info data");
}
}
}

fn snapshot_req(&self, data: &[u8], service: &mut Network) {
let mut msg = ProtoMessage::try_from(data).unwrap();
let req = msg.take_snapshot_req().unwrap();
Expand Down
37 changes: 37 additions & 0 deletions cita-network/src/node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ impl NodesManagerClient {
self.send_req(NodesManagerMessage::GetPeerCount(req));
}

pub fn get_peers_info(&self, req: GetPeersInfoReq) {
self.send_req(NodesManagerMessage::GetPeersInfo(req));
}

pub fn network_init(&self, req: NetworkInitReq) {
self.send_req(NodesManagerMessage::NetworkInit(req));
}
Expand Down Expand Up @@ -416,6 +420,7 @@ pub enum NodesManagerMessage {
AddConnectedNode(AddConnectedNodeReq),
AddRepeatedNode(AddRepeatedNodeReq),
ConnectedSelf(ConnectedSelfReq),
GetPeersInfo(GetPeersInfoReq),
}

impl NodesManagerMessage {
Expand All @@ -433,6 +438,7 @@ impl NodesManagerMessage {
NodesManagerMessage::AddConnectedNode(req) => req.handle(service),
NodesManagerMessage::AddRepeatedNode(req) => req.handle(service),
NodesManagerMessage::ConnectedSelf(req) => req.handle(service),
NodesManagerMessage::GetPeersInfo(req) => req.handle(service),
}
}
}
Expand Down Expand Up @@ -885,6 +891,37 @@ impl GetPeerCountReq {
}
}

pub struct GetPeersInfoReq {
return_channel: Sender<HashMap<Address, String>>,
}

impl GetPeersInfoReq {
pub fn new(return_channel: Sender<HashMap<Address, String>>) -> Self {
GetPeersInfoReq { return_channel }
}

pub fn handle(self, service: &mut NodesManager) {
let mut peers = HashMap::default();

for (key, value) in service.connected_peer_keys.iter() {
if let Some(socket_addr) = service.connected_addrs.get(&value) {
peers.insert(key.clone(), socket_addr.ip().to_string());
} else {
warn!(
"[NodeManager] Can not get socket address for session {} from connected_addr. It must be something wrong!",
value
);
}
}

debug!("[NodeManager] get peers info : {:?}", peers);

if let Err(e) = self.return_channel.try_send(peers) {
warn!("[NodeManager] Send peers info failed : {:?}", e);
}
}
}

pub struct ConnectedSelfReq {
addr: SocketAddr,
}
Expand Down

0 comments on commit e76f5c4

Please sign in to comment.