Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profile code #22

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[build]
rustflags = [
"-C",
"force-frame-pointers=y",
]
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
[workspace]
members = ["store", "crypto", "network", "mempool", "consensus", "node"]
members = ["store", "crypto", "network", "mempool", "consensus", "node", "profile"]

[profile.release]
debug = true
179 changes: 179 additions & 0 deletions benchmark/async_profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import re
import sys

log_regexp = "^\[.*T(.*):(.*):(.*)\.(.*)Z DEBUG .*\] <APROF> (.*)$"
log_lines = re.findall(log_regexp, open(sys.argv[1]).read(), re.M)

print(f"Log lines {len(log_lines)}")

task_names = {}
task_parents = {}
task_wake = {}
task_states = {}


class TaskState:

def __init__(self, no, time):
self.no = no
self.tot_time = 0
self.blocked_time = 0
self.pending_time = 0
self.running_time = 0

# inner
self.prev_time = time
self.can_work = True
self.running = False
self.never_wake = True
self.calls = 0

def node_label(self):
name = self.name.split(":")[0]
if self.never_wake or self.calls == 0 or not self.to_print():
return name

per_call = (self.running_time + self.pending_time) / self.calls
return f"{name} | {self.running_time}ms / {self.calls} | ({int(per_call*1000):}us P:{self.pending_time / (self.running_time + self.pending_time):.2%})"

def to_print(self):
return (self.running_time + self.pending_time > 10) or self.never_wake

def summary(self):
return f"R:{self.running_time:6} P:{self.pending_time:6} B:{self.blocked_time:6} {self.name} "

def name(self, name):
self.name = name

def resume(self, time):
assert not self.running
self.calls += 1
period = time - self.prev_time
if self.can_work:
self.pending_time += period
else:
self.blocked_time += period
self.tot_time += period

# reset
self.can_work = False
self.running = True
self.prev_time = time

def pause(self, time):
assert self.running
period = time - self.prev_time
self.running_time += period
self.tot_time += period

# reset
self.running = False
self.prev_time = time

def signal(self, time):
self.never_wake = False
if self.can_work:
return
self.can_work = True
if not self.running:
period = time - self.prev_time
self.blocked_time += period
self.tot_time += period

# reset
self.prev_time = time


for (H,M,S,Mill,line) in log_lines:
# print((H,M,S,Mill,line))
time_ms = int(H)*60*60*1000 + int(M)*60*1000 + int(S)*1000 + int(Mill)

# Task creation
if line[:4] == "Task":
# Define a task
match_obj = re.match("Task (.*) from (.*) defined (.*)", line)
task_no = match_obj.group(1)
parent_no = match_obj.group(2)
if parent_no == "None":
parent_no = None
else:
# Strip the Some(*)
parent_no = parent_no[5:-1]

source = match_obj.group(3)

task_names[task_no] = f"{source}-{task_no}"
task_parents[task_no] = parent_no

if task_no not in task_states:
task_states[task_no] = TaskState(task_no, time_ms)
task_states[task_no].name(task_names[task_no])

# Wake relations
if line[:4] == "Wake":
match_obj = re.match("Wake: (.*) -> (.*)", line)
source = match_obj.group(1)
if source == "None":
source = None
else:
source = source[5:-1]
target = match_obj.group(2)

pair = (source, target)
task_states[target].signal(time_ms)
if pair not in task_wake:
task_wake[pair] = 1
else:
task_wake[pair] += 1

if line[:4] == "Paus":
task_no = line[len("Pause task: "):]
task_states[task_no].pause(time_ms)

if line[:4] == "Resu":
task_no = line[len("Resume task: "):]
if task_no not in task_states:
task_states[task_no] = TaskState(task_no, time_ms)
task_states[task_no].resume(time_ms)

wake_number = sum(task_wake.values())

show = {}

# Make a graph of task parent relations
parent_graph = open('parentgraph.dot', 'w')
print("digraph regexp {", file=parent_graph)
print('graph [ rankdir = "LR" ];', file=parent_graph)

for task_no in task_names:
if task_states[task_no].to_print():
print(f'{task_no} [label="{task_states[task_no].node_label()}", shape = "record"];', file=parent_graph)
show[task_no] = True
else:
show[task_no] = False

for task_no in task_parents:
if task_parents[task_no] is None:
continue
if task_states[task_no].to_print():
if not show[task_parents[task_no]]:
print(f'{task_parents[task_no]} [label="{task_states[task_parents[task_no]].node_label()}", shape = "record"];', file=parent_graph)
show[task_parents[task_no]] = True

print(f'{task_parents[task_no]} -> {task_no};', file=parent_graph)

print(f'edge [weight=1000 style=dashed color=dimgrey]', file=parent_graph)

for (source_no, target_no) in task_wake:
pc = task_wake[(source_no, target_no)] / wake_number

if source_no is None:
source_no = "Env"

if (source_no == "Env" or task_states[source_no].to_print()) and task_states[target_no].to_print():
print(f'{source_no} -> {target_no} [label="{pc:.2%}"];', file=parent_graph)

print("}", file=parent_graph)

for task_no in task_states:
print(task_states[task_no].summary())
10 changes: 5 additions & 5 deletions benchmark/fabfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
def local(ct):
bench_params = {
'nodes': 4,
'txs': 250_000,
'txs': 1_000_000,
'size': 512,
'rate': 150_000,
'duration': 20,
'rate': 100_000,
'duration': 60,
}
node_params = {
'consensus': {
Expand All @@ -27,11 +27,11 @@ def local(ct):
},
'mempool': {
'queue_capacity': 10_000,
'max_payload_size': 100_000
'max_payload_size': 2_000_000
}
}
try:
ret = LocalBench(bench_params, node_params).run(debug=False).result()
ret = LocalBench(bench_params, node_params).run(debug=True).result()
print(ret)
except BenchError as e:
Print.error(e)
Expand Down
3 changes: 2 additions & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ store = { path = "../store" }
crypto = { path = "../crypto" }
network = { path = "../network" }
mempool = { path = "../mempool" }
profile = { path = "../profile" }

[dev-dependencies]
rand = "0.7.3"
rand = "0.7.3"
10 changes: 7 additions & 3 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ use network::{NetReceiver, NetSender};
use store::Store;
use tokio::sync::mpsc::{channel, Sender};

use profile::pspawn;
use profile::*;


#[cfg(test)]
#[path = "tests/consensus_tests.rs"]
pub mod consensus_tests;
Expand All @@ -36,12 +40,12 @@ impl Consensus {
x
})?;
let network_receiver = NetReceiver::new(address, tx_core.clone());
tokio::spawn(async move {
pspawn!("Net-Receiver", {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pspawn!("Net-Receiver", {
pspawn!("Consensus-Receiver", {

network_receiver.run().await;
});

let mut network_sender = NetSender::new(rx_network);
tokio::spawn(async move {
pspawn!("Net-Sender", {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pspawn!("Net-Sender", {
pspawn!("Consensus-Sender", {

network_sender.run().await;
});

Expand Down Expand Up @@ -76,7 +80,7 @@ impl Consensus {
/* network_channel */ tx_network,
commit_channel,
);
tokio::spawn(async move {
pspawn!("Consensus-Core", {
core.run().await;
});

Expand Down
5 changes: 4 additions & 1 deletion consensus/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use std::collections::HashMap;
use store::Store;
use tokio::sync::mpsc::{channel, Receiver, Sender};

use profile::pspawn;
use profile::*;

type DriverMessage = (Vec<u8>, Block, Receiver<()>);

pub struct MempoolDriver<Mempool> {
Expand All @@ -22,7 +25,7 @@ impl<Mempool: 'static + NodeMempool> MempoolDriver<Mempool> {
pub fn new(mempool: Mempool, core_channel: Sender<CoreMessage>, store: Store) -> Self {
let (tx_inner, mut rx_inner): (_, Receiver<DriverMessage>) = channel(1000);
let mut waiting = FuturesUnordered::new();
tokio::spawn(async move {
pspawn!("Mempool", {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pspawn!("Mempool", {
pspawn!("Mempool-Driver", {

loop {
tokio::select! {
Some((wait_on, block, handler)) = rx_inner.recv().fuse() => {
Expand Down
5 changes: 4 additions & 1 deletion consensus/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ use std::collections::HashSet;
use store::Store;
use tokio::sync::mpsc::{channel, Receiver, Sender};

use profile::pspawn;
use profile::*;

#[cfg(test)]
#[path = "tests/synchronizer_tests.rs"]
pub mod synchronizer_tests;
Expand All @@ -37,7 +40,7 @@ impl Synchronizer {
timer.schedule(sync_retry_delay, true).await;

let store_copy = store.clone();
tokio::spawn(async move {
pspawn!("Synchronizer", {
let mut waiting = FuturesUnordered::new();
let mut pending = HashSet::new();
let mut requests = HashSet::new();
Expand Down
5 changes: 4 additions & 1 deletion consensus/src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use std::time::Duration;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::sleep;

use profile::pspawn;
use profile::*;

#[cfg(test)]
#[path = "tests/timer_tests.rs"]
pub mod timer_tests;
Expand All @@ -26,7 +29,7 @@ impl<Id: 'static + Hash + Eq + Clone + Send + Sync> Timer<Id> {
pub fn new() -> Self {
let (tx_notifier, rx_notifier) = channel(100);
let (tx_inner, mut rx_inner): (Sender<Command<Id>>, _) = channel(100);
tokio::spawn(async move {
pspawn!("Timer", {
let mut waiting = FuturesUnordered::new();
let mut pending = HashMap::new();
loop {
Expand Down
4 changes: 3 additions & 1 deletion crypto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ tokio = { version = "1.1.0", features = ["sync", "rt", "macros"] }
ed25519-dalek = { version = "1.0.1", features = ["batch"] }
serde = { version = "1.0", features = ["derive"] }
rand = "0.7.3"
base64 = "0.13.0"
base64 = "0.13.0"

profile = { path = "../profile" }
5 changes: 4 additions & 1 deletion crypto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use std::fmt;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::oneshot;

use profile::pspawn;
use profile::*;

#[cfg(test)]
#[path = "tests/crypto_tests.rs"]
pub mod crypto_tests;
Expand Down Expand Up @@ -211,7 +214,7 @@ pub struct SignatureService {
impl SignatureService {
pub fn new(secret: SecretKey) -> Self {
let (tx, mut rx): (Sender<(_, oneshot::Sender<_>)>, _) = channel(100);
tokio::spawn(async move {
pspawn!("Signature-Service", {
while let Some((digest, sender)) = rx.recv().await {
let signature = Signature::new(&digest, &secret);
let _ = sender.send(signature);
Expand Down
3 changes: 2 additions & 1 deletion mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ futures = "0.3.8"
crypto = { path = "../crypto" }
store = { path = "../store" }
network = { path = "../network" }
profile = { path = "../profile" }

[dev-dependencies]
rand = "0.7.3"
rand = "0.7.3"
5 changes: 4 additions & 1 deletion mempool/src/front.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::Sender;
use tokio_util::codec::{Framed, LengthDelimitedCodec};

use profile::pspawn;
use profile::*;

pub struct Front {
address: SocketAddr,
deliver: Sender<Transaction>,
Expand Down Expand Up @@ -38,7 +41,7 @@ impl Front {
}

async fn spawn_worker(socket: TcpStream, peer: SocketAddr, deliver: Sender<Transaction>) {
tokio::spawn(async move {
pspawn!("Front-Worker", {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pspawn!("Front-Worker", {
pspawn!("Txs-Receiver-Worker", {

let mut transport = Framed::new(socket, LengthDelimitedCodec::new());
while let Some(frame) = transport.next().await {
match frame {
Expand Down
Loading