Skip to content

Commit

Permalink
add pagination for nodes and channels rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Sep 24, 2024
1 parent c342ab1 commit 048e60b
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 71 deletions.
20 changes: 17 additions & 3 deletions src/fiber/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,20 @@ where
self.nodes.values()
}

pub fn query_nodes(
pub fn get_nodes_with_params(
&self,
limit: usize,
after: Option<JsonBytes>,
) -> (Vec<NodeInfo>, JsonBytes) {
self.store.get_nodes_with_query(limit, after, None)
self.store.get_nodes_with_params(limit, after, None)
}

pub fn get_channels_with_params(
&self,
limit: usize,
after: Option<JsonBytes>,
) -> (Vec<ChannelInfo>, JsonBytes) {
self.store.get_channels_with_params(limit, after, None)
}

pub fn get_node(&self, node_id: Pubkey) -> Option<&NodeInfo> {
Expand Down Expand Up @@ -745,12 +753,18 @@ where
pub trait NetworkGraphStateStore {
fn get_channels(&self, outpoint: Option<OutPoint>) -> Vec<ChannelInfo>;
fn get_nodes(&self, peer_id: Option<Pubkey>) -> Vec<NodeInfo>;
fn get_nodes_with_query(
fn get_nodes_with_params(
&self,
limit: usize,
after: Option<JsonBytes>,
node_id: Option<Pubkey>,
) -> (Vec<NodeInfo>, JsonBytes);
fn get_channels_with_params(
&self,
limit: usize,
after: Option<JsonBytes>,
outpoint: Option<OutPoint>,
) -> (Vec<ChannelInfo>, JsonBytes);
fn insert_channel(&self, channel: ChannelInfo);
fn insert_node(&self, node: NodeInfo);
fn insert_connected_peer(&self, peer_id: PeerId, multiaddr: Multiaddr);
Expand Down
22 changes: 16 additions & 6 deletions src/fiber/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::fiber::graph::{ChannelInfo, NetworkGraph, NodeInfo};
use crate::fiber::types::Pubkey;
use crate::invoice::{CkbInvoice, InvoiceError, InvoiceStore};
use ckb_jsonrpc_types::JsonBytes;
use ckb_types::packed::OutPoint;
use ckb_types::{core::TransactionView, packed::Byte32};
use ractor::{Actor, ActorRef};
Expand Down Expand Up @@ -340,13 +341,22 @@ impl NetworkGraphStateStore for MemoryStore {
}
}

fn get_nodes_with_query(
fn get_nodes_with_params(
&self,
_limit: usize,
_after: Option<ckb_jsonrpc_types::JsonBytes>,
_node_id: Option<Pubkey>,
) -> (Vec<NodeInfo>, ckb_jsonrpc_types::JsonBytes) {
unimplemented!("get_nodes_with_query currently not used in mock store");
limit: usize,
after: Option<JsonBytes>,
node_id: Option<Pubkey>,
) -> (Vec<NodeInfo>, JsonBytes) {
unimplemented!("currently not used in mock store");
}

fn get_channels_with_params(
&self,
limit: usize,
after: Option<JsonBytes>,
outpoint: Option<OutPoint>,
) -> (Vec<ChannelInfo>, JsonBytes) {
unimplemented!("currently not used in mock store");
}

fn insert_node(&self, node: NodeInfo) {
Expand Down
36 changes: 23 additions & 13 deletions src/rpc/graph.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::fiber::graph::{NetworkGraph, NetworkGraphStateStore};
use crate::fiber::serde_utils::EntityHex;
use crate::fiber::types::{Hash256, Pubkey};
use crate::fiber::{
config::AnnouncedNodeName,
graph::{NetworkGraph, NetworkGraphStateStore},
};
use ckb_jsonrpc_types::JsonBytes;
use ckb_types::packed::OutPoint;
use jsonrpsee::{core::async_trait, proc_macros::rpc, types::ErrorObjectOwned};
Expand All @@ -15,13 +12,13 @@ use tokio::sync::RwLock;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GraphNodesParams {
limit: usize,
limit: Option<usize>,
after: Option<JsonBytes>,
}

#[derive(Serialize, Deserialize, Clone)]
pub struct NodeInfo {
pub alias: AnnouncedNodeName,
pub alias: String,
pub addresses: Vec<MultiAddr>,
pub node_id: Pubkey,
pub timestamp: u64,
Expand All @@ -35,7 +32,10 @@ pub struct GraphNodesResult {
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GraphChannelsParams {}
pub struct GraphChannelsParams {
limit: Option<usize>,
after: Option<JsonBytes>,
}

#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
Expand All @@ -57,6 +57,7 @@ pub struct ChannelInfo {
#[derive(Serialize, Deserialize, Clone)]
pub struct GraphChannelsResult {
pub channels: Vec<ChannelInfo>,
pub last_cursor: JsonBytes,
}

#[rpc(server)]
Expand Down Expand Up @@ -104,12 +105,14 @@ where
params: GraphNodesParams,
) -> Result<GraphNodesResult, ErrorObjectOwned> {
let network_graph = self.network_graph.read().await;
let (nodes, last_cursor) = network_graph.query_nodes(params.limit, params.after);
let default_max_limit = 500;
let (nodes, last_cursor) = network_graph
.get_nodes_with_params(params.limit.unwrap_or(default_max_limit), params.after);

let nodes = nodes
.iter()
.map(|node_info| NodeInfo {
alias: node_info.anouncement_msg.alias,
alias: node_info.anouncement_msg.alias.as_str().to_string(),
addresses: node_info.anouncement_msg.addresses.clone(),
node_id: node_info.node_id,
timestamp: node_info.timestamp,
Expand All @@ -121,12 +124,16 @@ where

async fn graph_channels(
&self,
_params: GraphChannelsParams,
params: GraphChannelsParams,
) -> Result<GraphChannelsResult, ErrorObjectOwned> {
let default_max_limit = 500;
let network_graph = self.network_graph.read().await;
let chain_hash = network_graph.chain_hash();
let channels = network_graph
.channels()
let (channels, last_cursor) = network_graph
.get_channels_with_params(params.limit.unwrap_or(default_max_limit), params.after);

let channels = channels
.iter()
.map(|channel_info| ChannelInfo {
channel_outpoint: channel_info.out_point(),
funding_tx_block_number: channel_info.funding_tx_block_number,
Expand All @@ -141,6 +148,9 @@ where
chain_hash,
})
.collect();
Ok(GraphChannelsResult { channels })
Ok(GraphChannelsResult {
channels,
last_cursor,
})
}
}
73 changes: 53 additions & 20 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
use ckb_jsonrpc_types::JsonBytes;
use ckb_types::packed::{OutPoint, Script};
use ckb_types::prelude::Entity;
use rocksdb::{prelude::*, DBIterator, IteratorMode, WriteBatch, DB};
use rocksdb::{prelude::*, DBIterator, Direction, IteratorMode, WriteBatch, DB};
use serde_json;
use std::{path::Path, sync::Arc};
use tentacle::{multiaddr::Multiaddr, secio::PeerId};
Expand Down Expand Up @@ -347,41 +347,73 @@ impl InvoiceStore for Store {

impl NetworkGraphStateStore for Store {
fn get_channels(&self, channel_id: Option<OutPoint>) -> Vec<ChannelInfo> {
let key = match channel_id.clone() {
Some(channel_id) => {
let (channels, _) = self.get_channels_with_params(usize::MAX, None, channel_id);
channels
}

fn get_channels_with_params(
&self,
limit: usize,
after: Option<JsonBytes>,
outpoint: Option<OutPoint>,
) -> (Vec<ChannelInfo>, JsonBytes) {
let channel_prefix = vec![CHANNEL_INFO_PREFIX];
let (prefix, skip) = after
.clone()
.map(|after| {
let mut key = Vec::with_capacity(37);
key.extend_from_slice(&[CHANNEL_INFO_PREFIX]);
key.extend_from_slice(channel_id.as_slice());
key
}
None => vec![CHANNEL_INFO_PREFIX],
};
key.extend_from_slice(after.as_bytes());
(key, 1)
})
.unwrap_or((vec![CHANNEL_INFO_PREFIX], 0));
let outpoint_key = outpoint.map(|outpoint| {
let mut key = Vec::with_capacity(37);
key.extend_from_slice(&[CHANNEL_INFO_PREFIX]);
key.extend_from_slice(outpoint.as_slice());
key
});

let mode = IteratorMode::From(prefix.as_ref(), Direction::Forward);
let iter = self
.db
.prefix_iterator(key.as_ref())
.take_while(|(col_key, _)| col_key.starts_with(&key));
iter.map(|(_key, value)| {
serde_json::from_slice(value.as_ref()).expect("deserialize ChannelInfo should be OK")
})
.collect()
.iterator(mode)
.take_while(|(key, _)| key.starts_with(&channel_prefix))
.filter_map(|(col_key, value)| {
if let Some(key) = &outpoint_key {
if !col_key.starts_with(&key) {
return None;
}
}
Some((col_key, value))
})
.skip(skip)
.take(limit);
let mut last_key = Vec::new();
let channels = iter
.map(|(col_key, value)| {
last_key = col_key.to_vec();
serde_json::from_slice(value.as_ref()).expect("deserialize NodeInfo should be OK")
})
.collect();
(channels, JsonBytes::from_bytes(last_key.into()))
}

fn get_nodes(&self, node_id: Option<Pubkey>) -> Vec<NodeInfo> {
let (nodes, _) = self.get_nodes_with_query(usize::MAX, None, node_id);
let (nodes, _) = self.get_nodes_with_params(usize::MAX, None, node_id);
nodes
}

fn get_nodes_with_query(
fn get_nodes_with_params(
&self,
limit: usize,
after: Option<JsonBytes>,
node_id: Option<Pubkey>,
) -> (Vec<NodeInfo>, JsonBytes) {
let node_prefix = vec![NODE_INFO_PREFIX];
let (prefix, skip) = after
.clone()
.map(|after| {
let mut key = Vec::with_capacity(34);
key.push(NODE_INFO_PREFIX);
key.extend_from_slice(after.as_bytes());
(key, 1)
})
Expand All @@ -392,10 +424,11 @@ impl NetworkGraphStateStore for Store {
key.extend_from_slice(node_id.serialize().as_ref());
key
});
let mode = IteratorMode::From(prefix.as_ref(), Direction::Forward);
let iter = self
.db
.prefix_iterator(prefix.as_ref())
.take_while(|(key, _)| key.starts_with(&prefix))
.iterator(mode)
.take_while(|(key, _)| key.starts_with(&node_prefix))
.filter_map(|(col_key, value)| {
if let Some(key) = &node_key {
if !col_key.starts_with(&key) {
Expand Down
5 changes: 1 addition & 4 deletions tests/bruno/e2e/router-pay/16-node1-get-nodes.bru
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ body:json {
"id": "42",
"jsonrpc": "2.0",
"method": "graph_nodes",
"params": [
{
}
]
"params": [{}]
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/bruno/e2e/router-pay/17-node1-get-channels.bru
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ script:post-response {
// Sleep for sometime to make sure current operation finishes before next request starts.
await new Promise(r => setTimeout(r, 100));
console.log("get result: ", res.body.result);
if (res.body.result.channels.length != 4) {
if (res.body.result.channels.length != 2) {
throw new Error("Assertion failed: channels number is not right");
}
}
46 changes: 46 additions & 0 deletions tests/bruno/e2e/router-pay/18-node1-get-nodes-page.bru
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
meta {
name: Node1 send get_nodes rpc request
type: http
seq: 18
}

post {
url: {{NODE1_RPC_URL}}
body: json
auth: none
}

headers {
Content-Type: application/json
Accept: application/json
}

body:json {
{
"id": "42",
"jsonrpc": "2.0",
"method": "graph_nodes",
"params": [{
"limit": 2
}]
}
}

assert {
res.body.error: isUndefined
}

script:pre-request {
// sleep for a while
await new Promise(r => setTimeout(r, 1000));
}

script:post-response {
// Sleep for sometime to make sure current operation finishes before next request starts.
await new Promise(r => setTimeout(r, 100));
console.log("get result: ", res.body.result);
if (res.body.result.nodes.length != 2) {
throw new Error("Assertion failed: nodes number is not right");
}
bru.setVar("last_cursor", res.body.result.last_cursor);
}
Loading

0 comments on commit 048e60b

Please sign in to comment.