Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send chain txs #187

Merged
merged 1 commit into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/protocols/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@ pub(crate) struct RelayProtocol {
}

// a simple struct to store the pending transactions in memory with size limit
pub(crate) struct PendingTxs {
pub struct PendingTxs {
txs: LinkedHashMap<packed::Byte32, (packed::Transaction, Cycle, HashSet<PeerId>)>,
updated_at: Instant,
limit: usize,
}

impl Default for PendingTxs {
fn default() -> Self {
Self::new(64)
}
}

impl PendingTxs {
pub fn new(limit: usize) -> Self {
Self {
Expand Down
10 changes: 5 additions & 5 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ pub struct BlockFilterRpcImpl {
}

pub struct TransactionRpcImpl {
pub(crate) pending_txs: Arc<RwLock<PendingTxs>>,
pub(crate) swc: StorageWithChainData,
pub(crate) consensus: Arc<Consensus>,
}
Expand Down Expand Up @@ -1173,7 +1172,8 @@ impl TransactionRpc for TransactionRpcImpl {
let tx = tx.into_view();
let cycles = verify_tx(tx.clone(), &self.swc, Arc::clone(&self.consensus))
.map_err(|e| Error::invalid_params(format!("invalid transaction: {:?}", e)))?;
self.pending_txs
self.swc
.pending_txs()
.write()
.expect("pending_txs lock is poisoned")
.push(tx.clone(), cycles);
Expand All @@ -1198,7 +1198,8 @@ impl TransactionRpc for TransactionRpcImpl {
}

if let Some((transaction, cycles, _)) = self
.pending_txs
.swc
.pending_txs()
.read()
.expect("pending_txs lock is poisoned")
.get(&tx_hash.pack())
Expand Down Expand Up @@ -1314,11 +1315,10 @@ impl Service {
consensus: Consensus,
) -> Server {
let mut io_handler = IoHandler::new();
let swc = StorageWithChainData::new(storage, Arc::clone(&peers));
let swc = StorageWithChainData::new(storage, Arc::clone(&peers), Arc::clone(&pending_txs));
let block_filter_rpc_impl = BlockFilterRpcImpl { swc: swc.clone() };
let chain_rpc_impl = ChainRpcImpl { swc: swc.clone() };
let transaction_rpc_impl = TransactionRpcImpl {
pending_txs,
swc,
consensus: Arc::new(consensus),
};
Expand Down
46 changes: 42 additions & 4 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use ckb_types::{
use rocksdb::{prelude::*, Direction, IteratorMode, WriteBatch, DB};

use crate::error::Result;
use crate::protocols::Peers;
use crate::protocols::{Peers, PendingTxs};

pub const LAST_STATE_KEY: &str = "LAST_STATE";
const GENESIS_BLOCK_KEY: &str = "GENESIS_BLOCK";
Expand Down Expand Up @@ -1128,17 +1128,26 @@ impl HeaderProvider for Storage {
pub struct StorageWithChainData {
storage: Storage,
peers: Arc<Peers>,
pending_txs: Arc<RwLock<PendingTxs>>,
}

impl StorageWithChainData {
pub fn new(storage: Storage, peers: Arc<Peers>) -> Self {
Self { storage, peers }
pub fn new(storage: Storage, peers: Arc<Peers>, pending_txs: Arc<RwLock<PendingTxs>>) -> Self {
Self {
storage,
peers,
pending_txs,
}
}

pub fn storage(&self) -> &Storage {
&self.storage
}

pub fn pending_txs(&self) -> &RwLock<PendingTxs> {
&self.pending_txs
}

pub(crate) fn matched_blocks(&self) -> &RwLock<HashMap<H256, (bool, Option<packed::Block>)>> {
self.peers.matched_blocks()
}
Expand Down Expand Up @@ -1190,7 +1199,36 @@ impl CellDataProvider for StorageWithChainData {

impl CellProvider for StorageWithChainData {
fn cell(&self, out_point: &OutPoint, eager_load: bool) -> CellStatus {
self.storage.cell(out_point, eager_load)
match self.storage.cell(out_point, eager_load) {
CellStatus::Live(cell_meta) => CellStatus::Live(cell_meta),
_ => {
if let Some((tx, _, _)) = self
.pending_txs
.read()
.expect("poisoned")
.get(&out_point.tx_hash())
{
if let Some(cell_output) = tx.raw().outputs().get(out_point.index().unpack()) {
let output_data = tx
.raw()
.outputs_data()
.get(out_point.index().unpack())
.expect("output_data's index should be same as output")
.raw_data();
let output_data_data_hash = CellOutput::calc_data_hash(&output_data);
return CellStatus::Live(CellMeta {
out_point: out_point.clone(),
cell_output,
transaction_info: None,
data_bytes: output_data.len() as u64,
mem_cell_data: Some(output_data),
mem_cell_data_hash: Some(output_data_data_hash),
});
}
}
CellStatus::Unknown
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/subcmds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl RunConfig {
.expect("build consensus should be OK");
storage.init_genesis_block(consensus.genesis_block().data());

let pending_txs = Arc::new(RwLock::new(PendingTxs::new(64)));
let pending_txs = Arc::new(RwLock::new(PendingTxs::default()));
let max_outbound_peers = self.run_env.network.max_outbound_peers;
let network_state = NetworkState::from_config(self.run_env.network)
.map(|network_state| {
Expand Down
65 changes: 50 additions & 15 deletions src/tests/service.rs

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions src/tests/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ fn verify_valid_transaction() {
// https://pudge.explorer.nervos.org/transaction/0xf34f4eaac4a662927fb52d4cb608e603150b9e0678a0f5ed941e3cfd5b68fb30
let transaction: packed::Transaction = serde_json::from_str::<Transaction>(r#"{"cell_deps":[{"dep_type":"dep_group","out_point":{"index":"0x0","tx_hash":"0xf8de3bb47d055cdf460d93a2a6e1b05f7432f9777c8c474abf4eec1d4aee5d37"}}],"header_deps":[],"inputs":[{"previous_output":{"index":"0x7","tx_hash":"0x8f8c79eb6671709633fe6a46de93c0fedc9c1b8a6527a18d3983879542635c9f"},"since":"0x0"}],"outputs":[{"capacity":"0x470de4df820000","lock":{"args":"0xff5094c2c5f476fc38510018609a3fd921dd28ad","code_hash":"0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8","hash_type":"type"},"type":null},{"capacity":"0xb61134e5a35e800","lock":{"args":"0x64257f00b6b63e987609fa9be2d0c86d351020fb","code_hash":"0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8","hash_type":"type"},"type":null}],"outputs_data":["0x","0x"],"version":"0x0","witnesses":["0x5500000010000000550000005500000041000000af34b54bebf8c5971da6a880f2df5a186c3f8d0b5c9a1fe1a90c95b8a4fb89ef3bab1ccec13797dcb3fee80400f953227dd7741227e08032e3598e16ccdaa49c00"]}"#).unwrap().into();

let swc = StorageWithChainData::new(storage.to_owned(), chain.create_peers());
let swc =
StorageWithChainData::new(storage.to_owned(), chain.create_peers(), Default::default());
let result = verify_tx(transaction.into_view(), &swc, consensus).unwrap();
// please note that the cycle (1682789) of this transaction displayed on the explorer is wrong
// it's fixed in https://github.com/nervosnetwork/ckb/pull/4218
Expand All @@ -43,7 +44,8 @@ fn non_contextual_transaction_verifier() {
let chain = MockChain::new_with_default_pow("non_contextual_transaction_verifier");
let storage = chain.client_storage();
let consensus = Arc::new(chain.consensus().clone());
let swc = StorageWithChainData::new(storage.to_owned(), chain.create_peers());
let swc =
StorageWithChainData::new(storage.to_owned(), chain.create_peers(), Default::default());

// duplicate cell deps base on a valid transaction
// https://pudge.explorer.nervos.org/transaction/0xf34f4eaac4a662927fb52d4cb608e603150b9e0678a0f5ed941e3cfd5b68fb30
Expand Down
Loading