Skip to content

Commit

Permalink
Merge pull request #42 from nervosnetwork/quake/tcp-subscription
Browse files Browse the repository at this point in the history
feat: index the pending txs in the ckb tx-pool
  • Loading branch information
quake authored Oct 24, 2021
2 parents 6902426 + 4d474a7 commit 72c9af0
Show file tree
Hide file tree
Showing 6 changed files with 342 additions and 16 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,29 @@ All the [indexing](https://github.com/nervosnetwork/ckb/tree/develop/rpc#indexer

## Usage

Build binary from source

```bash
cargo build --release
```

Connect to default ckb rpc service at `http://127.0.0.1:8114` and stores the indexer data at `/tmp/ckb-indexer-test` folder
```bash
RUST_LOG=info ./target/release/ckb-indexer -s /tmp/ckb-indexer-test
```

Or connect to ckb rpc service at `tcp://127.0.0.1:18114`
```bash
RUST_LOG=info ./target/release/ckb-indexer -s /tmp/ckb-indexer-test -c tcp://127.0.0.1:18114
```

Indexing the pending txs in the ckb tx-pool
```bash
RUST_LOG=info ./target/release/ckb-indexer -s /tmp/ckb-indexer-test -c tcp://127.0.0.1:18114 --index-tx-pool
```

Run `ckb-indexer --help` for more information

## RPC

### `get_tip`
Expand Down
28 changes: 24 additions & 4 deletions src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::store::{Batch, Error as StoreError, IteratorDirection, Store};
use crate::{
pool::Pool,
store::{Batch, Error as StoreError, IteratorDirection, Store},
};

use ckb_types::{
core::{BlockNumber, BlockView},
Expand All @@ -7,8 +10,11 @@ use ckb_types::{
};
use thiserror::Error;

use std::collections::HashMap;
use std::convert::TryInto;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};

pub type TxIndex = u32;
pub type OutputIndex = u32;
Expand Down Expand Up @@ -215,14 +221,23 @@ pub struct Indexer<S> {
// keep_num: 100, current tip: 321, will prune ConsumedOutPoint / TxHash kv pair whiches block_number <= 221
keep_num: u64,
prune_interval: u64,
// an optional overlay to index the pending txs in the ckb tx pool
// currently only supports removals of dead cells from the pending txs
pool: Option<Arc<RwLock<Pool>>>,
}

impl<S> Indexer<S> {
pub fn new(store: S, keep_num: u64, prune_interval: u64) -> Self {
pub fn new(
store: S,
keep_num: u64,
prune_interval: u64,
pool: Option<Arc<RwLock<Pool>>>,
) -> Self {
Self {
store,
keep_num,
prune_interval,
pool,
}
}

Expand Down Expand Up @@ -251,6 +266,7 @@ where

let block_number = block.number();
let transactions = block.transactions();
let pool = self.pool.as_ref().map(|p| p.write().expect("acquire lock"));
for (tx_index, tx) in transactions.iter().enumerate() {
let tx_index = tx_index as u32;
let tx_hash = tx.hash();
Expand Down Expand Up @@ -397,6 +413,10 @@ where

batch.commit()?;

if let Some(mut pool) = pool {
pool.transactions_commited(&transactions);
}

if block_number % self.prune_interval == 0 {
self.prune()?;
}
Expand Down Expand Up @@ -774,7 +794,7 @@ mod tests {
fn new_indexer<S: Store>(prefix: &str) -> Indexer<S> {
let tmp_dir = tempfile::Builder::new().prefix(prefix).tempdir().unwrap();
let store = S::new(tmp_dir.path().to_str().unwrap());
Indexer::new(store, 10, 1)
Indexer::new(store, 10, 1, None)
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod indexer;
pub mod pool;
pub mod service;
pub mod store;
168 changes: 158 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
use ckb_indexer::service::Service;
use ckb_indexer::{pool::Pool, service::Service};
use ckb_jsonrpc_types::{PoolTransactionEntry, PoolTransactionReject};
use ckb_types::packed::Transaction;
use clap::{crate_version, App, Arg};
use jsonrpc_core_client::transports::http;
use futures::{SinkExt, StreamExt, TryStreamExt};
use jsonrpc_core_client::{
transports::{duplex::duplex, http},
TypedClient,
};
use jsonrpc_server_utils::{
codecs::StreamCodec,
tokio::{self, net::TcpStream},
tokio_util::codec::Decoder,
};
use log::{debug, error};
use std::sync::{Arc, RwLock};

#[tokio::main]
async fn main() {
Expand All @@ -12,7 +25,7 @@ async fn main() {
.arg(
Arg::with_name("ckb_uri")
.short("c")
.help("CKB rpc http service uri, default http://127.0.0.1:8114")
.help("CKB rpc service uri, supports http and tcp, for example: `http://127.0.0.1:8114` or `tcp://127.0.0.1:18114`")
.takes_value(true),
)
.arg(
Expand All @@ -28,10 +41,23 @@ async fn main() {
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("index_tx_pool")
.long("index-tx-pool")
.help("Whether to index the pending txs in the ckb tx-pool")
.required(false)
)
.get_matches();

let index_tx_pool = matches.is_present("index_tx_pool");
let pool = Arc::new(RwLock::new(Pool::new()));
let service = Service::new(
matches.value_of("store_path").expect("required arg"),
if index_tx_pool {
Some(pool.clone())
} else {
None
},
matches.value_of("listen_uri").unwrap_or("127.0.0.1:8116"),
std::time::Duration::from_secs(2),
crate_version!().to_string(),
Expand All @@ -42,15 +68,137 @@ async fn main() {
.value_of("ckb_uri")
.unwrap_or("http://127.0.0.1:8114")
.to_owned();
if !uri.starts_with("http") {
uri = format!("http://{}", uri);
}

let client = http::connect(&uri)
.await
.expect(&format!("Failed to connect to {:?}", uri));
if uri.starts_with("tcp://") {
let uri = uri.split_off(6);
let codec = StreamCodec::stream_incoming();
let tcp_stream = TcpStream::connect(&uri)
.await
.expect(&format!("Failed to connect to {:?}", uri));
let (sink, stream) = codec.framed(tcp_stream).split();
let sink = sink.sink_map_err(|e| error!("tcp sink error: {:?}", e));
let stream = stream.map_err(|e| error!("tcp stream error: {:?}", e));

let (duplex, rpc_channel) = duplex(
Box::pin(sink),
Box::pin(
stream
.take_while(|x| futures::future::ready(x.is_ok()))
.map(|x| x.expect("stream is closed on errors")),
),
);
tokio::spawn(duplex);

service.poll(client).await;
if index_tx_pool {
// subscribe `new_transaction` topic
{
let typed_client: TypedClient = rpc_channel.clone().into();
let pool = pool.clone();
let fut = async move {
match typed_client.subscribe::<_, String>(
"subscribe",
["new_transaction"],
"subscribe",
"unsubscribe",
"String",
) {
Ok(mut stream) => loop {
if let Some(subscription) = stream.next().await {
match subscription {
Ok(json_string) => {
debug!(
"Rpc subscription notified a new transaction: {}",
json_string
);
if let Ok(pool_tx_entry) =
serde_json::from_str::<PoolTransactionEntry>(
&json_string,
)
{
let tx: Transaction =
pool_tx_entry.transaction.inner.into();
pool.write()
.expect("acquire lock")
.new_transaction(&tx.into_view());
} else {
error!("Failed to parse json_string: {}", json_string);
}
}
Err(err) => {
error!("Rpc subscription error {:?}", err);
}
}
}
},
Err(err) => {
error!("subscribe error {:?}", err);
}
}
};
tokio::spawn(fut);
}

// subscribe `rejected_transaction` topic
{
let typed_client: TypedClient = rpc_channel.clone().into();
let pool = pool.clone();
let fut = async move {
match typed_client.subscribe::<_, String>(
"subscribe",
["rejected_transaction"],
"subscribe",
"unsubscribe",
"String",
) {
Ok(mut stream) => loop {
if let Some(subscription) = stream.next().await {
match subscription {
Ok(json_string) => {
debug!(
"Rpc subscription notified a rejected transaction: {}",
json_string
);
if let Ok((pool_tx_entry, _)) = serde_json::from_str::<(
PoolTransactionEntry,
PoolTransactionReject,
)>(
&json_string
) {
let tx: Transaction =
pool_tx_entry.transaction.inner.into();
pool.write()
.expect("acquire lock")
.transaction_rejected(&tx.into_view());
} else {
error!("Failed to parse json_string: {}", json_string);
}
}
Err(err) => {
error!("Rpc subscription error {:?}", err);
}
}
}
},
Err(err) => {
error!("subscribe error {:?}", err);
}
}
};
tokio::spawn(fut);
}
}

service.poll(rpc_channel.into()).await;
} else {
if index_tx_pool {
error!("indexing the pending txs in the ckb tx-pool is only supported when connecting to ckb rpc service with tcp protocol")
} else {
let client = http::connect(&uri)
.await
.expect(&format!("Failed to connect to {:?}", uri));
service.poll(client).await;
}
}

rpc_server.close();
}
47 changes: 47 additions & 0 deletions src/pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use ckb_types::{core::TransactionView, packed::OutPoint};
use std::collections::HashSet;

/// an overlay to index the pending txs in the ckb tx pool,
/// currently only supports removals of dead cells from the pending txs
pub struct Pool {
dead_cells: HashSet<OutPoint>,
}

impl Pool {
pub fn new() -> Self {
Self {
dead_cells: HashSet::new(),
}
}

// the tx has been comitted in a block, it should be removed from pending dead cells
pub fn transaction_commited(&mut self, tx: &TransactionView) {
for input in tx.inputs() {
self.dead_cells.remove(&input.previous_output());
}
}

// the tx has been rejected for some reason, it should be removed from pending dead cells
pub fn transaction_rejected(&mut self, tx: &TransactionView) {
for input in tx.inputs() {
self.dead_cells.remove(&input.previous_output());
}
}

// a new tx is submitted to the pool, mark its inputs as dead cells
pub fn new_transaction(&mut self, tx: &TransactionView) {
for input in tx.inputs() {
self.dead_cells.insert(input.previous_output());
}
}

pub fn is_consumed_by_pool_tx(&self, out_point: &OutPoint) -> bool {
self.dead_cells.contains(out_point)
}

pub fn transactions_commited(&mut self, txs: &[TransactionView]) {
for tx in txs {
self.transaction_commited(tx);
}
}
}
Loading

0 comments on commit 72c9af0

Please sign in to comment.