Skip to content

Commit

Permalink
feat(rpc-server): get in queue requests
Browse files Browse the repository at this point in the history
Now on a full node the `gw_get_transaction`/`gw_get_withdrawal` RPC
calls will return status: pending instead of null for in queue
transactions/withdrawals.

Also added RPC method `gw_is_request_in_queue`, which returns whether a
transaction or withdrawal is in queue.
  • Loading branch information
blckngm committed Jun 14, 2022
1 parent 6905a78 commit 3cbbdbc
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 111 deletions.
5 changes: 0 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/benches/benches/benchmarks/fee_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn bench_add_full(b: &mut Bencher) {
sender: 2,
order: queue.len(),
};
queue.add(entry1);
queue.add(entry1, ());
}

assert_eq!(queue.len(), MAX_QUEUE_SIZE);
Expand All @@ -71,7 +71,7 @@ fn bench_add_full(b: &mut Bencher) {
sender: 2,
order: queue.len(),
};
queue.add(entry1);
queue.add(entry1, ());
});
}

Expand Down Expand Up @@ -107,7 +107,7 @@ fn bench_add_fetch_20(b: &mut Bencher) {
sender: 2,
order: queue.len(),
};
queue.add(entry1);
queue.add(entry1, ());
}

let mem_store = MemStore::new(snap);
Expand All @@ -130,7 +130,7 @@ fn bench_add_fetch_20(b: &mut Bencher) {
sender: 2,
order: queue.len(),
};
queue.add(entry1);
queue.add(entry1, ());
}
queue.fetch(&tree, 20)
});
Expand Down
87 changes: 43 additions & 44 deletions crates/mem-pool/src/fee/queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use gw_common::state::State;
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeMap, HashMap};
use tracing::instrument;

/// Max queue size
Expand All @@ -11,16 +11,16 @@ const DROP_SIZE: usize = 100;
use super::types::FeeEntry;

/// Txs & withdrawals queue sorted by fee rate
pub struct FeeQueue {
pub struct FeeQueue<T> {
// priority queue to store tx and withdrawal
queue: BTreeSet<FeeEntry>,
queue: BTreeMap<FeeEntry, T>,
}

impl FeeQueue {
impl<T> FeeQueue<T> {
#[inline]
pub fn new() -> Self {
Self {
queue: BTreeSet::new(),
queue: BTreeMap::new(),
}
}

Expand All @@ -34,21 +34,20 @@ impl FeeQueue {
self.queue.is_empty()
}

/// Add item to queue
#[instrument(skip_all, fields(count = self.len()))]
pub fn add(&mut self, entry: FeeEntry) {
pub fn add(&mut self, entry: FeeEntry, handle: T) {
// push to queue
log::debug!(
"QueueLen: {} | add entry: {:?} {}",
self.len(),
entry.item.kind(),
hex::encode(entry.item.hash().as_slice())
);
self.queue.insert(entry);
self.queue.insert(entry, handle);

// drop items if full
if self.is_full() {
if let Some(first_to_keep) = self.queue.iter().nth(DROP_SIZE + 1).cloned() {
if let Some(first_to_keep) = self.queue.keys().nth(DROP_SIZE + 1).cloned() {
self.queue = self.queue.split_off(&first_to_keep);
}
log::debug!(
Expand All @@ -64,25 +63,25 @@ impl FeeQueue {
self.queue.len() > MAX_QUEUE_SIZE
}

fn pop_last(&mut self) -> Option<FeeEntry> {
if let Some(entry) = self.queue.iter().next_back().cloned() {
self.queue.take(&entry)
fn pop_last(&mut self) -> Option<(FeeEntry, T)> {
if let Some(entry) = self.queue.keys().next_back().cloned() {
self.queue.remove_entry(&entry)
} else {
None
}
}

/// Fetch items by fee sort
#[instrument(skip_all, fields(count = count))]
pub fn fetch(&mut self, state: &impl State, count: usize) -> Result<Vec<FeeEntry>> {
pub fn fetch(&mut self, state: &impl State, count: usize) -> Result<Vec<(FeeEntry, T)>> {
// sorted fee items
let mut fetched_items = Vec::with_capacity(count as usize);
let mut fetched_senders: HashMap<u32, u32> = Default::default();
// future items, we will push back this queue
let mut future_queue = Vec::default();

// Fetch item from PQ
while let Some(entry) = self.pop_last() {
while let Some((entry, t)) = self.pop_last() {
let nonce = match fetched_senders.get(&entry.sender) {
Some(&nonce) => nonce,
None => state.get_nonce(entry.sender)?,
Expand All @@ -92,11 +91,11 @@ impl FeeQueue {
// update nonce
fetched_senders.insert(entry.sender, nonce.saturating_add(1));
// fetch this item
fetched_items.push(entry);
fetched_items.push((entry, t));
}
std::cmp::Ordering::Greater => {
// push item back if it still has change to get fetched
future_queue.push(entry);
future_queue.push((entry, t));
}
_ => {
log::debug!(
Expand All @@ -116,10 +115,10 @@ impl FeeQueue {
}

// Add back future items
for entry in future_queue {
for (entry, t) in future_queue {
// Only add back if we fetched another item from the same sender
if fetched_senders.contains_key(&entry.sender) {
self.add(entry);
self.add(entry, t);
} else {
log::debug!(
"QueueLen: {} | drop future entry: {:?} {} entry_nonce {}",
Expand All @@ -142,7 +141,7 @@ impl FeeQueue {
}
}

impl Default for FeeQueue {
impl<T> Default for FeeQueue<T> {
#[inline]
fn default() -> Self {
Self::new()
Expand Down Expand Up @@ -226,10 +225,10 @@ mod tests {
order: queue.len(),
};

queue.add(entry1);
queue.add(entry2);
queue.add(entry3);
queue.add(entry4);
queue.add(entry1, ());
queue.add(entry2, ());
queue.add(entry3, ());
queue.add(entry4, ());

let mem_store = MemStore::new(snap);
let tree = mem_store.state().unwrap();
Expand All @@ -238,15 +237,15 @@ mod tests {
{
let items = queue.fetch(&tree, 3).expect("fetch");
assert_eq!(items.len(), 3);
assert_eq!(items[0].sender, 3);
assert_eq!(items[1].sender, 5);
assert_eq!(items[2].sender, 2);
assert_eq!(items[0].0.sender, 3);
assert_eq!(items[1].0.sender, 5);
assert_eq!(items[2].0.sender, 2);
}
// fetch 3
{
let items = queue.fetch(&tree, 3).expect("fetch");
assert_eq!(items.len(), 1);
assert_eq!(items[0].sender, 4);
assert_eq!(items[0].0.sender, 4);
}
// fetch 3
{
Expand Down Expand Up @@ -284,7 +283,7 @@ mod tests {
order: queue.len(),
};

queue.add(entry1);
queue.add(entry1, ());

let entry2 = FeeEntry {
item: FeeItem::Tx(Default::default()),
Expand All @@ -294,7 +293,7 @@ mod tests {
order: queue.len(),
};

queue.add(entry2);
queue.add(entry2, ());

let entry3 = FeeEntry {
item: FeeItem::Tx(Default::default()),
Expand All @@ -304,7 +303,7 @@ mod tests {
order: queue.len(),
};

queue.add(entry3);
queue.add(entry3, ());

let entry4 = FeeEntry {
item: FeeItem::Withdrawal(Default::default()),
Expand All @@ -314,7 +313,7 @@ mod tests {
order: queue.len(),
};

queue.add(entry4);
queue.add(entry4, ());

let mem_store = MemStore::new(snap);
let tree = mem_store.state().unwrap();
Expand All @@ -323,10 +322,10 @@ mod tests {
{
let items = queue.fetch(&tree, 5).expect("fetch");
assert_eq!(items.len(), 4);
assert_eq!(items[0].sender, 5);
assert_eq!(items[1].sender, 2);
assert_eq!(items[2].sender, 3);
assert_eq!(items[3].sender, 4);
assert_eq!(items[0].0.sender, 5);
assert_eq!(items[1].0.sender, 2);
assert_eq!(items[2].0.sender, 3);
assert_eq!(items[3].0.sender, 4);
}
}

Expand Down Expand Up @@ -374,8 +373,8 @@ mod tests {
order: queue.len(),
};

queue.add(entry1);
queue.add(entry2);
queue.add(entry1, ());
queue.add(entry2, ());

let snap = store.get_snapshot();
let mem_store = MemStore::new(snap);
Expand All @@ -385,8 +384,8 @@ mod tests {
{
let items = queue.fetch(&tree, 3).expect("fetch");
assert_eq!(items.len(), 2);
assert_eq!(items[0].item.nonce(), 0);
assert_eq!(items[1].item.nonce(), 1);
assert_eq!(items[0].0.item.nonce(), 0);
assert_eq!(items[1].0.item.nonce(), 1);
}
}
#[test]
Expand Down Expand Up @@ -433,8 +432,8 @@ mod tests {
order: queue.len(),
};

queue.add(entry1);
queue.add(entry2);
queue.add(entry1, ());
queue.add(entry2, ());

let snap = store.get_snapshot();
let mem_store = MemStore::new(snap);
Expand All @@ -444,7 +443,7 @@ mod tests {
{
let items = queue.fetch(&tree, 3).expect("fetch");
assert_eq!(items.len(), 1);
assert_eq!(items[0].fee, (101 * 1000u64).into());
assert_eq!(items[0].0.fee, (101 * 1000u64).into());
// try fetch remain items
let items = queue.fetch(&tree, 1).expect("fetch");
assert_eq!(items.len(), 0);
Expand Down Expand Up @@ -483,7 +482,7 @@ mod tests {
sender: 2,
order: queue.len(),
};
queue.add(entry1);
queue.add(entry1, ());
}

assert_eq!(queue.len(), MAX_QUEUE_SIZE);
Expand All @@ -505,7 +504,7 @@ mod tests {
sender: 2,
order: queue.len(),
};
queue.add(entry1);
queue.add(entry1, ());
}

// we should trigger the drop
Expand Down
5 changes: 0 additions & 5 deletions crates/rpc-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,8 @@ faster-hex = "0.4"
ckb-crypto = "0.100.0"
ckb-fixed-hash = "0.100.0"
ckb-types = "0.100.0"
toml = "0.5"
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
async-channel = "1.4"
async-jsonrpc-client = { version = "0.3.0", default-features = false, features = ["http-tokio"] }
clap = "2.33.3"
env_logger = "0.8.3"
futures = "0.3.13"
hyper = { version = "0.14", features = ["server"] }
jsonrpc-v2 = { version = "0.10.0", default-features = false, features = ["hyper-integration", "easy-errors"] }
Expand Down
Loading

0 comments on commit 3cbbdbc

Please sign in to comment.