Skip to content

Commit

Permalink
Merge pull request stratum-mining#558 from darricksee/scale-test
Browse files Browse the repository at this point in the history
Scale test
  • Loading branch information
Fi3 committed May 10, 2023
2 parents 67dcd63 + 47cb5ae commit 96bf196
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 0 deletions.
41 changes: 41 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ members = [
"roles/v2/pool",
"roles/v2/test-utils/mining-device",
"roles/translator",
"test/scale",
"utils/network-helpers",
"utils/buffer",
"utils/error-handling",
Expand Down
22 changes: 22 additions & 0 deletions test/scale/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "scale"
version = "0.1.0"
edition = "2021"

[profile.release]
lto = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
clap = "2.33.3"
serde = { version = "1.0.89", features = ["derive", "alloc"], default-features = false, optional = true}
async-channel = "1.5.1"
async-std="1.8.0"
bytes = "1.0.1"
binary_sv2 = { path = "../../protocols/v2/binary-sv2/binary-sv2" }
codec_sv2 = { path = "../../protocols/v2/codec-sv2", features=["noise_sv2"] }
network_helpers = { version = "0.1", path = "../../utils/network-helpers", features = ["with_tokio"] }
roles_logic_sv2 = { path = "../../protocols/v2/roles-logic-sv2" }
tokio = { version = "1", features = ["full"] }

22 changes: 22 additions & 0 deletions test/scale/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Scale Test

This test simply outputs the time spent sending 1,000,000 SubmitSharesStandard
through the system. When you start the test you specify -h <num of hops> -e (for encryption).
The test spawns <num of hops> "proxies" (ports 19000->19000+<num of hops>) which simply decrypt/encrypt each
SubmitSharesStandard message coming in (if encryption is on). Then it sends
1,000,000 share messages to the first proxy and then times the whole system to see
how long it takes for the last proxy to receive all 1M messages. It uses the same
network_helpers that the pool, and proxies use so it should be a good approximation
of the work they do.

The test is run with the following command:
NOTE: running without `--release` dramatically slows down the test.

```cargo run --release -- -h 4 -e```
This runs the test with 4 hops and encryption on.

```cargo run --release -- -h 4```
This runs the test with 4 hops and encryption off.



235 changes: 235 additions & 0 deletions test/scale/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
use std::thread;
use tokio::{
net::{TcpListener, TcpStream},
task,
};

use async_channel::{bounded, Receiver, Sender};

use binary_sv2::{Deserialize, GetSize, Serialize};
use clap::{App, Arg};
use codec_sv2::{HandshakeRole, Initiator, Responder, StandardEitherFrame, StandardSv2Frame};
use std::time::Duration;

use network_helpers::{
noise_connection_tokio::Connection, plain_connection_tokio::PlainConnection,
};

use roles_logic_sv2::{
mining_sv2::*,
parsers::{Mining, MiningDeviceMessages},
};

pub type EitherFrame = StandardEitherFrame<Message>;

pub const AUTHORITY_PUBLIC_K: [u8; 32] = [
215, 11, 47, 78, 34, 232, 25, 192, 195, 168, 170, 209, 95, 181, 40, 114, 154, 226, 176, 190,
90, 169, 238, 89, 191, 183, 97, 63, 194, 119, 11, 31,
];

pub const AUTHORITY_PRIVATE_K: [u8; 32] = [
204, 93, 167, 220, 169, 204, 172, 35, 9, 84, 174, 208, 171, 89, 25, 53, 196, 209, 161, 148, 4,
5, 173, 0, 234, 59, 15, 127, 31, 160, 136, 131,
];

static HOST: &str = "127.0.0.1";

#[tokio::main]
async fn main() {
let matches = App::new("ScaleTest")
.arg(Arg::with_name("encrypt").short("e").help("Use encryption"))
.arg(
Arg::with_name("hops")
.short("h")
.takes_value(true)
.help("Number of hops"),
)
.get_matches();

let total_messages = 1_000_000;
let encrypt = matches.is_present("encrypt");
let hops: u16 = matches.value_of("hops").unwrap_or("0").parse().unwrap_or(0);
let mut orig_port: u16 = 19000;

// create channel to tell final server number of messages
let (tx, rx) = bounded(1);

if hops > 0 {
orig_port = spawn_proxies(encrypt, hops, tx, total_messages).await;
} else {
println!("Usage: ./program -h <hops> -e");
}
println!("Connecting to localhost:{}", orig_port);
setup_driver(orig_port, encrypt, rx, total_messages, hops).await;
}

async fn setup_driver(
server_port: u16,
encrypt: bool,
rx: Receiver<String>,
total_messages: i32,
hops: u16,
) {
let server_stream = TcpStream::connect(format!("{}:{}", HOST, server_port))
.await
.unwrap();
let (_server_receiver, server_sender): (Receiver<EitherFrame>, Sender<EitherFrame>);

if encrypt {
let initiator = Initiator::from_raw_k(AUTHORITY_PUBLIC_K).unwrap();

(_server_receiver, server_sender) =
Connection::new(server_stream, HandshakeRole::Initiator(initiator)).await;
} else {
(_server_receiver, server_sender) = PlainConnection::new(server_stream).await;
}
// Create timer to see how long this method takes
let start = std::time::Instant::now();

send_messages(server_sender, total_messages).await;

//listen for message on rx
let msg = rx.recv().await.unwrap();

let end = std::time::Instant::now();

println!(
"client: {} - Took {}s hops: {} encryption: {}",
msg,
(end - start).as_secs(),
hops,
encrypt
);
}

pub type Message = MiningDeviceMessages<'static>;
pub type StdFrame = StandardSv2Frame<Message>;

async fn send_messages(stream: Sender<EitherFrame>, total_messages: i32) {
let mut number: i32 = 0;
println!("Creating share");
let share = MiningDeviceMessages::Mining(Mining::SubmitSharesStandard(SubmitSharesStandard {
channel_id: 1,
sequence_number: number as u32,
job_id: 2,
nonce: 3,
ntime: 4,
version: 5,
}));

while number <= total_messages {
println!("client: sending msg-{}", number);
let frame: StdFrame = share.clone().try_into().unwrap();
let binary: EitherFrame = frame.into();

stream.send(binary).await.unwrap();
number += 1;
}
}

async fn handle_messages<Mining: Serialize + Deserialize<'static> + GetSize + Send + 'static>(
name: String,
client: Receiver<EitherFrame>,
server: Option<Sender<EitherFrame>>,
total_messages: i32,
tx: Sender<String>,
) {
let mut messages_received = 0;

while messages_received <= total_messages {
let frame: StdFrame = client.recv().await.unwrap().try_into().unwrap();

let binary: EitherFrame = frame.into();

if server.is_some() {
server.as_ref().unwrap().send(binary).await.unwrap();
} else {
messages_received += 1;
println!("last server: {} got msg {}", name, messages_received);
}
}
tx.send("got all messages".to_string()).await.unwrap();
}

async fn create_proxy(
name: String,
listen_port: u16,
server_port: u16,
encrypt: bool,
total_messages: i32,
tx: Sender<String>,
) {
println!(
"Creating proxy listener {}: {} connecting to: {}",
name, listen_port, server_port
);
let listener = TcpListener::bind(format!("0.0.0.0:{}", listen_port))
.await
.unwrap();
println!("Bound - now waiting for connection...");
let cli_stream = listener.accept().await.unwrap().0;
let (cli_receiver, _cli_sender): (Receiver<EitherFrame>, Sender<EitherFrame>);

if encrypt {
let responder = Responder::from_authority_kp(
&AUTHORITY_PUBLIC_K[..],
&AUTHORITY_PRIVATE_K[..],
Duration::from_secs(3600),
)
.unwrap();
(cli_receiver, _cli_sender) =
Connection::new(cli_stream, HandshakeRole::Responder(responder)).await;
} else {
(cli_receiver, _cli_sender) = PlainConnection::new(cli_stream).await;
}

let mut server = None;
if server_port > 0 {
println!("Proxy {} Connecting to server: {}", name, server_port);
let server_stream = TcpStream::connect(format!("{}:{}", HOST, server_port))
.await
.unwrap();
let (_server_receiver, server_sender): (Receiver<EitherFrame>, Sender<EitherFrame>);

if encrypt {
let initiator = Initiator::from_raw_k(AUTHORITY_PUBLIC_K).unwrap();
(_server_receiver, server_sender) =
Connection::new(server_stream, HandshakeRole::Initiator(initiator)).await;
} else {
(_server_receiver, server_sender) = PlainConnection::new(server_stream).await;
}
server = Some(server_sender);
}

println!("Proxy {} has a client", name);
handle_messages::<Mining>(name, cli_receiver, server, total_messages, tx).await;
}

async fn spawn_proxies(encrypt: bool, hops: u16, tx: Sender<String>, total_messages: i32) -> u16 {
let orig_port: u16 = 19000;
let final_server_port = orig_port + (hops - 1);
let mut listen_port = final_server_port;
let mut server_port: u16 = 0;

for name in (0..hops).rev() {
let tx_clone = tx.clone();
let name_clone = name.to_string();

task::spawn(async move {
create_proxy(
name_clone,
listen_port,
server_port,
encrypt,
total_messages,
tx_clone,
)
.await;
});

thread::sleep(std::time::Duration::from_secs(1));
server_port = listen_port;
listen_port -= 1;
}
orig_port
}

0 comments on commit 96bf196

Please sign in to comment.