Skip to content

Commit

Permalink
Merge branch 'develop' into yukang-fix-script-test
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang authored Aug 14, 2024
2 parents 3d82f0f + bfcab0c commit 7af44ce
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 16 deletions.
2 changes: 1 addition & 1 deletion network/src/tests/peer_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ fn test_accept_inbound_peer_eviction() {
peer.connected_time = now - Duration::from_secs(10);
};
}
// thoses peers will not be protect, we add them to evict_targets
// these peers will not be protect, we add them to evict_targets
for _ in 0..longest_connection_time_peers_count {
let peer_addr = peers_iter.next().unwrap();
let peer_id = extract_peer_id(peer_addr).unwrap();
Expand Down
2 changes: 2 additions & 0 deletions rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,7 @@ Response
"block_hash": null,
"block_number": null,
"status": "pending",
"tx_index": null,
"reason": null
}
}
Expand All @@ -946,6 +947,7 @@ The response looks like below when `verbosity` is 0.
"block_hash": null,
"block_number": null,
"status": "pending",
"tx_index": null,
"reason": null
}
}
Expand Down
4 changes: 4 additions & 0 deletions rpc/src/module/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ pub trait ChainRpc {
/// "block_hash": null,
/// "block_number": null,
/// "status": "pending",
/// "tx_index": null,
/// "reason": null
/// }
/// }
Expand All @@ -645,6 +646,7 @@ pub trait ChainRpc {
/// "block_hash": null,
/// "block_number": null,
/// "status": "pending",
/// "tx_index": null,
/// "reason": null
/// }
/// }
Expand Down Expand Up @@ -2159,6 +2161,7 @@ impl ChainRpcImpl {
None,
tx_info.block_number,
tx_info.block_hash.unpack(),
tx_info.index as u32,
cycles,
None,
));
Expand Down Expand Up @@ -2207,6 +2210,7 @@ impl ChainRpcImpl {
Some(tx),
tx_info.block_number,
tx_info.block_hash.unpack(),
tx_info.index as u32,
cycles,
None,
));
Expand Down
8 changes: 7 additions & 1 deletion test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,19 @@ fn main() {
let mut test_results = Vec::new();
let mut worker_running = worker_count;
let mut done_specs = 0;
let mut started_sequential = false;
while worker_running > 0 {
if max_time > 0 && start_time.elapsed().as_secs() > max_time {
// shutdown, specs running to long
workers.shutdown();
break;
}

if worker_running == 1 && !started_sequential {
started_sequential = true;
workers.start_sequencial()
}

let msg = match notify_rx.recv_timeout(Duration::from_secs(5)) {
Ok(msg) => msg,
Err(err) => {
Expand Down Expand Up @@ -590,9 +596,9 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(CheckVmVersion2),
Box::new(CheckVmBExtension),
Box::new(RandomlyKill),
Box::new(SyncChurn),
];
specs.shuffle(&mut thread_rng());
specs.insert(0, Box::new(SyncChurn) as Box<dyn Spec>);
specs
}

Expand Down
2 changes: 1 addition & 1 deletion test/src/specs/sync/sync_churn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl Spec for SyncChurn {
let mining_node = select_random_node(&mut rng, &mut mining_nodes);
mining_node.mine(1);
// Because the test that waiting for nodes to sync has a implicit maximum waiting time
// (currently 60 seconds, we can sync about 200 blocks per second, so a maxium blocks of 10000 is reasonable)
// (currently 60 seconds, we can sync about 200 blocks per second, so a maximum blocks of 10000 is reasonable)
// and the implicit waiting time is not long enough when there are too many blocks to sync,
// so we stop mining when the tip block number is greater than 15000.
// Otherwise nodes may not be able to sync within the implicit waiting time.
Expand Down
85 changes: 76 additions & 9 deletions test/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::time::Instant;
#[derive(PartialEq, Eq)]
pub enum Command {
Shutdown,
StartSequencial,
}

/// Notify from worker
Expand Down Expand Up @@ -45,6 +46,9 @@ pub struct Worker {
inbox: Receiver<Command>,
outbox: Sender<Notify>,
start_port: Arc<AtomicU16>,

sequencial_tasks: Arc<Mutex<Vec<Box<dyn Spec>>>>,
sequencial_worker: bool,
}

impl Clone for Worker {
Expand All @@ -54,13 +58,18 @@ impl Clone for Worker {
inbox: self.inbox.clone(),
outbox: self.outbox.clone(),
start_port: Arc::clone(&self.start_port),
sequencial_tasks: Arc::clone(&self.sequencial_tasks),
sequencial_worker: self.sequencial_worker,
}
}
}

const SEQUENCIAL_TASKS: &[&str] = &["RandomlyKill", "SyncChurn"];

impl Worker {
pub fn new(
tasks: Arc<Mutex<Vec<Box<dyn Spec>>>>,
sequencial_tasks: Arc<Mutex<Vec<Box<dyn Spec>>>>,
inbox: Receiver<Command>,
outbox: Sender<Notify>,
start_port: Arc<AtomicU16>,
Expand All @@ -70,12 +79,16 @@ impl Worker {
inbox,
outbox,
start_port,
sequencial_tasks,
sequencial_worker: false,
}
}

/// start handle tasks
pub fn start(self) -> JoinHandle<()> {
thread::spawn(move || {
let mut start_sequencial_task = false;

loop {
let msg = match self.inbox.try_recv() {
Ok(msg) => Some(msg),
Expand All @@ -88,20 +101,54 @@ impl Worker {
}
};
// check command
if Some(Command::Shutdown) == msg {
self.outbox.send(Notify::Stop).unwrap();
return;
match msg {
Some(Command::StartSequencial) => {
start_sequencial_task = true;
}
Some(Command::Shutdown) => {
self.outbox.send(Notify::Stop).unwrap();
return;
}
_ => {}
}

// pick a spec to run
let spec = match self.tasks.lock().pop() {
Some(spec) => spec,

let task = self.tasks.lock().pop();
match task {
Some(spec) => {
// if spec.name() is RandomlyKill or SyncChurn, then push it to sequencial_tasks
if SEQUENCIAL_TASKS.contains(&spec.name()) {
info!("push {} to sequencial_tasks", spec.name());
self.sequencial_tasks.lock().push(spec);
} else {
self.run_spec(spec.as_ref(), 0);
}
}
None => {
self.outbox.send(Notify::Stop).unwrap();
return;
if self.sequencial_worker {
info!("sequencial worker is waiting for command");
if start_sequencial_task {
match self.sequencial_tasks.lock().pop() {
Some(spec) => {
self.run_spec(spec.as_ref(), 0);
}
None => {
info!("sequencial worker has no task to run");
self.outbox.send(Notify::Stop).unwrap();
return;
}
};
} else {
info!("sequencial worker is waiting for parallel workers finish");
std::thread::sleep(std::time::Duration::from_secs(1));
}
} else {
self.outbox.send(Notify::Stop).unwrap();
return;
}
}
};

self.run_spec(spec.as_ref(), 0);
}
})
}
Expand Down Expand Up @@ -176,13 +223,17 @@ impl Workers {
start_port: u16,
) -> Self {
let start_port = Arc::new(AtomicU16::new(start_port));

let sequencial_tasks = Arc::new(Mutex::new(Vec::new()));
let workers: Vec<_> = (0..count)
.map({
let tasks = Arc::clone(&tasks);
let sequencial_tasks = Arc::clone(&sequencial_tasks);
move |_| {
let (command_tx, command_rx) = unbounded();
let worker = Worker::new(
Arc::clone(&tasks),
Arc::clone(&sequencial_tasks),
command_rx,
outbox.clone(),
Arc::clone(&start_port),
Expand All @@ -200,6 +251,8 @@ impl Workers {

/// start all workers
pub fn start(&mut self) {
self.workers.first_mut().unwrap().1.sequencial_worker = true;

let mut join_handles = Vec::new();
for w in self.workers.iter_mut() {
let h = w.1.clone().start();
Expand All @@ -208,6 +261,20 @@ impl Workers {
self.join_handles.replace(join_handles);
}

pub fn start_sequencial(&mut self) {
if let Err(err) = self
.workers
.first()
.unwrap()
.0
.send(Command::StartSequencial)
{
error!("start sequencial worker failed, error: {}", err);
} else {
info!("start sequencial worker success")
}
}

/// shutdown all workers, must call join_all after this.
pub fn shutdown(&mut self) {
if self.is_shutdown {
Expand Down
13 changes: 11 additions & 2 deletions util/jsonrpc-types/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,8 @@ pub struct TxStatus {
pub block_number: Option<BlockNumber>,
/// The block hash of the block which has committed this transaction in the canonical chain.
pub block_hash: Option<H256>,
/// The transaction index in the block.
pub tx_index: Option<Uint32>,
/// The reason why the transaction is rejected
pub reason: Option<String>,
}
Expand All @@ -602,7 +604,9 @@ impl From<tx_pool::TxStatus> for TxStatus {
match tx_pool_status {
tx_pool::TxStatus::Pending => TxStatus::pending(),
tx_pool::TxStatus::Proposed => TxStatus::proposed(),
tx_pool::TxStatus::Committed(number, hash) => TxStatus::committed(number.into(), hash),
tx_pool::TxStatus::Committed(number, hash, tx_index) => {
TxStatus::committed(number.into(), hash, tx_index.into())
}
tx_pool::TxStatus::Rejected(reason) => TxStatus::rejected(reason),
tx_pool::TxStatus::Unknown => TxStatus::unknown(),
}
Expand All @@ -616,6 +620,7 @@ impl TxStatus {
status: Status::Pending,
block_number: None,
block_hash: None,
tx_index: None,
reason: None,
}
}
Expand All @@ -626,6 +631,7 @@ impl TxStatus {
status: Status::Proposed,
block_number: None,
block_hash: None,
tx_index: None,
reason: None,
}
}
Expand All @@ -635,11 +641,12 @@ impl TxStatus {
/// ## Params
///
/// * `hash` - the block hash in which the transaction is committed.
pub fn committed(number: BlockNumber, hash: H256) -> Self {
pub fn committed(number: BlockNumber, hash: H256, tx_index: Uint32) -> Self {
Self {
status: Status::Committed,
block_number: Some(number),
block_hash: Some(hash),
tx_index: Some(tx_index),
reason: None,
}
}
Expand All @@ -654,6 +661,7 @@ impl TxStatus {
status: Status::Rejected,
block_number: None,
block_hash: None,
tx_index: None,
reason: Some(reason),
}
}
Expand All @@ -664,6 +672,7 @@ impl TxStatus {
status: Status::Unknown,
block_number: None,
block_hash: None,
tx_index: None,
reason: None,
}
}
Expand Down
5 changes: 3 additions & 2 deletions util/types/src/core/tx_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub enum TxStatus {
/// Status "proposed". The transaction is in the pool and has been proposed.
Proposed,
/// Status "committed". The transaction has been committed to the canonical chain.
Committed(BlockNumber, H256),
Committed(BlockNumber, H256, u32),
/// Status "unknown". The node has not seen the transaction,
/// or it should be rejected but was cleared due to storage limitations.
Unknown,
Expand Down Expand Up @@ -216,11 +216,12 @@ impl TransactionWithStatus {
tx: Option<core::TransactionView>,
number: BlockNumber,
hash: H256,
tx_index: u32,
cycles: Option<core::Cycle>,
fee: Option<Capacity>,
) -> Self {
Self {
tx_status: TxStatus::Committed(number, hash),
tx_status: TxStatus::Committed(number, hash, tx_index),
transaction: tx,
cycles,
fee,
Expand Down

0 comments on commit 7af44ce

Please sign in to comment.