Skip to content

Commit

Permalink
feat: mem pool p2p sync
Browse files Browse the repository at this point in the history
  • Loading branch information
blckngm committed Apr 18, 2022
1 parent d4cc2a8 commit bce5d5a
Show file tree
Hide file tree
Showing 18 changed files with 2,773 additions and 52 deletions.
362 changes: 340 additions & 22 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ members = [
"crates/tx-filter",
"crates/replay-chain",
"crates/dynamic-config",
"crates/p2p-network",
]

[profile.release]
Expand Down
2 changes: 2 additions & 0 deletions crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ opentelemetry = { version = "0.16", features = ["rt-tokio"] }
# unstable features
tokio-metrics = "0.1.0"
console-subscriber = "0.1.3"
tentacle = { version = "0.4.0-beta.4" }
gw-p2p-network = { path = "../p2p-network" }

[target.'cfg(all(not(target_env = "msvc"), not(target_os="macos")))'.dependencies]
tikv-jemallocator = { version = "0.4.0", features = ["unprefixed_malloc_on_supported_platforms"] }
Expand Down
56 changes: 52 additions & 4 deletions crates/block-producer/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ use gw_mem_pool::{
default_provider::DefaultMemPoolProvider,
pool::{MemPool, MemPoolCreateArgs},
spawn_sub_mem_pool_task,
sync::p2p,
traits::MemPoolErrorTxHandler,
};
use gw_p2p_network::P2PNetwork;
use gw_rpc_client::{
ckb_client::CKBClient, contract::ContractsCellDepManager, indexer_client::CKBIndexerClient,
rpc_client::RPCClient,
Expand Down Expand Up @@ -61,6 +63,7 @@ use std::{
sync::{atomic::Ordering, Arc},
time::{Duration, Instant},
};
use tentacle::service::ProtocolMeta;
use tokio::{
spawn,
sync::{broadcast, mpsc, Mutex},
Expand Down Expand Up @@ -816,6 +819,48 @@ pub async fn run(config: Config, skip_config_check: bool) -> Result<()> {
}
};

//Graceful shutdown event. If all the shutdown_sends get dropped, then we can shutdown gracefully.
let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);
//Broadcase shutdown event.
let (shutdown_event, shutdown_event_recv) = broadcast::channel(1);

// P2P network.
let p2p_control_and_handle = if let Some(ref p2p_network_config) = config.p2p_network_config {
let mut protocols: Vec<ProtocolMeta> = Vec::new();
let mut sync_server_state: Option<Arc<Mutex<p2p::SyncServerState>>> = None;
match (&mem_pool, config.node_mode) {
(Some(_), NodeMode::FullNode | NodeMode::Test) => {
log::info!("will enable mem-pool p2p sync server");
let s = Arc::new(Mutex::new(Default::default()));
sync_server_state = Some(s.clone());
protocols.push(p2p::sync_server_protocol(s));
}
(Some(mem_pool), NodeMode::ReadOnly) => {
log::info!("will enable mem-pool p2p sync client");
protocols.push(p2p::sync_client_protocol(
mem_pool.clone(),
shutdown_event.clone(),
));
}
_ => {}
}
let mut network = P2PNetwork::init(p2p_network_config, protocols).await?;
let control = network.control().clone();
if let (Some(sync_server_state), Some(mem_pool)) = (sync_server_state, &mem_pool) {
let mut mem_pool = mem_pool.lock().await;
mem_pool
.enable_publishing(control.clone(), sync_server_state)
.await;
}
let handle = tokio::spawn(async move {
log::info!("running the p2p network");
network.run().await;
});
Some((control, handle))
} else {
None
};

// RPC registry
let args = RegistryArgs {
store,
Expand Down Expand Up @@ -859,10 +904,6 @@ pub async fn run(config: Config, skip_config_check: bool) -> Result<()> {
}

log::info!("{:?} mode", config.node_mode);
//Graceful shutdown event. If all the shutdown_sends get dropped, then we can shutdown gracefully.
let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);
//Broadcase shutdown event.
let (shutdown_event, shutdown_event_recv) = broadcast::channel(1);

let chain_task = spawn({
let ctx = ChainTaskContext {
Expand Down Expand Up @@ -942,6 +983,13 @@ pub async fn run(config: Config, skip_config_check: bool) -> Result<()> {
if let Err(err) = shutdown_event.send(()) {
log::error!("Failed to brodcast error message: {:?}", err);
}
// Shutdown p2p network.
if let Some((control, handle)) = p2p_control_and_handle {
log::info!("closing p2p network");
let _ = control.close().await;
let _ = handle.await;
log::info!("p2p network closed");
}

// Make sure all the senders are dropped.
drop(shutdown_send);
Expand Down
12 changes: 12 additions & 0 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct Config {
pub reload_config_github_url: Option<GithubConfigUrl>,
#[serde(default)]
pub dynamic_config: DynamicConfig,
#[serde(default)]
pub p2p_network_config: Option<P2PNetworkConfig>,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)]
Expand Down Expand Up @@ -260,6 +262,16 @@ impl Default for OffChainValidatorConfig {
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct P2PNetworkConfig {
/// Multiaddr listen address, e.g. /ip4/1.2.3.4/tcp/443
#[serde(default)]
pub listen: Option<String>,
/// Multiaddr dial addresses, e.g. /ip4/1.2.3.4/tcp/443
#[serde(default)]
pub dial: Vec<String>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct PublishMemPoolConfig {
pub hosts: Vec<String>,
Expand Down
2 changes: 2 additions & 0 deletions crates/mem-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ gw-config = { path = "../config" }
gw-utils = { path = "../utils" }
gw-rpc-ws-server = { path = "../rpc-ws-server" }
gw-dynamic-config = { path = "../dynamic-config" }
gw-p2p-network = { path = "../p2p-network" }
rdkafka = { version = "0.25", default-features = false }
futures = { version = "0.3"}
tokio = "1.15"
Expand All @@ -26,6 +27,7 @@ log = "0.4"
hex = "0.4"
async-trait = "0.1"
tracing = { version = "0.1", features = ["attributes"] }
tentacle = { version = "0.4.0-beta.4", features = ["unstable"] }

[dev-dependencies]
tempfile = "3.2"
Expand Down
2 changes: 1 addition & 1 deletion crates/mem-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub mod fee;
pub mod mem_block;
pub mod pool;
pub mod restore_manager;
mod sync;
pub mod sync;
pub mod traits;
mod types;
pub mod withdrawal;
Expand Down
58 changes: 52 additions & 6 deletions crates/mem-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,20 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use tentacle::service::ServiceAsyncControl;
use tokio::sync::{broadcast, Mutex};
use tracing::instrument;

use crate::{
constants::{MAX_MEM_BLOCK_TXS, MAX_MEM_BLOCK_WITHDRAWALS, MAX_TX_SIZE, MAX_WITHDRAWAL_SIZE},
custodian::AvailableCustodians,
mem_block::MemBlock,
restore_manager::RestoreManager,
sync::{mq::tokio_kafka, publish::MemPoolPublishService},
sync::{
mq::tokio_kafka,
p2p::{self, SyncServerState},
publish::MemPoolPublishService,
},
traits::{MemPoolErrorTxHandler, MemPoolProvider},
types::EntryList,
withdrawal::Generator as WithdrawalGenerator,
Expand Down Expand Up @@ -95,6 +101,7 @@ pub struct MemPool {
node_mode: NodeMode,
mem_pool_state: Arc<MemPoolState>,
dynamic_config_manager: Arc<ArcSwap<DynamicConfigManager>>,
new_tip_publisher: broadcast::Sender<(H256, u64)>,
}

pub struct MemPoolCreateArgs {
Expand Down Expand Up @@ -156,21 +163,28 @@ impl MemPool {
}

mem_block.clear_txs();
let fan_out_mem_block_handler = config

let producer = config
.publish
.map(|config| -> Result<MemPoolPublishService> {
.map(|config| -> Result<tokio_kafka::Producer> {
log::info!("Setup fan out mem_block handler.");
let producer = tokio_kafka::Producer::connect(config.hosts, config.topic)?;
let handler = MemPoolPublishService::start(producer);
Ok(handler)
Ok(producer)
})
.transpose()?;
let mem_pool_publish_service = if producer.is_some() {
Some(MemPoolPublishService::start(producer, None))
} else {
None
};

let mem_pool_state = {
let mem_store = MemStore::new(store.get_snapshot());
Arc::new(MemPoolState::new(Arc::new(mem_store), false))
};

let (new_tip_publisher, _) = broadcast::channel(1);

let mut mem_pool = MemPool {
store,
current_tip: tip,
Expand All @@ -183,10 +197,11 @@ impl MemPool {
pending_deposits,
restore_manager: restore_manager.clone(),
pending_restored_tx_hashes,
mem_pool_publish_service: fan_out_mem_block_handler,
mem_pool_publish_service,
node_mode,
mem_pool_state,
dynamic_config_manager,
new_tip_publisher,
};

// update mem block info
Expand Down Expand Up @@ -615,6 +630,12 @@ impl MemPool {
// set tip
self.current_tip = (new_tip, new_tip_block.raw().number().unpack());

// Publish new tip.
let _ = self.new_tip_publisher.send(self.current_tip);
if let Some(ref publish) = self.mem_pool_publish_service {
publish.new_tip(self.current_tip).await;
}

// mem block withdrawals
let mem_block_withdrawals: Vec<_> = {
let mut withdrawals = Vec::with_capacity(mem_block_content.withdrawals.len());
Expand Down Expand Up @@ -1182,6 +1203,31 @@ impl MemPool {
self.push_transaction(tx).await?;
Ok(())
}

pub(crate) fn current_tip(&self) -> (H256, u64) {
self.current_tip
}

pub(crate) fn subscribe_new_tip(&self) -> broadcast::Receiver<(H256, u64)> {
self.new_tip_publisher.subscribe()
}

pub async fn enable_publishing(
&mut self,
control: ServiceAsyncControl,
shared: Arc<Mutex<SyncServerState>>,
) {
let p2p_publisher = p2p::sync_server_publisher(control, shared);
match self.mem_pool_publish_service {
Some(ref service) => {
service.set_p2p_publisher(p2p_publisher).await;
}
None => {
self.mem_pool_publish_service =
Some(MemPoolPublishService::start(None, Some(p2p_publisher)));
}
}
}
}

pub(crate) fn repackage_count(
Expand Down
2 changes: 2 additions & 0 deletions crates/mem-pool/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
pub(crate) mod subscribe;
// Fullnode will publish mem pool to readonly nodes.
pub(crate) mod mq;
/// P2P sync.
pub mod p2p;
pub(crate) mod publish;
Loading

0 comments on commit bce5d5a

Please sign in to comment.