forked from e-ivkov/iroha2-longevity-load-rs
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathdaemon.rs
268 lines (259 loc) · 8.73 KB
/
daemon.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
use super::make_instruction_by_operation;
use crate::{args::RunArgs, number::PositiveFloat, operation::Operation, status::Status};
use async_trait::async_trait;
use color_eyre::eyre::Result;
use futures_util::StreamExt;
use hyper::{
header,
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
use iroha_client::client::Client;
use iroha_config::client::Configuration;
use iroha_data_model::{isi::InstructionExpr, prelude::*};
use std::{
collections::HashMap,
fs::File,
io::Write,
net::SocketAddr,
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread,
time::{Duration, Instant},
};
use structopt::StructOpt;
use tokio::{join, select, signal, sync::Notify, task};
use tracing::{debug, info, warn, Level};
use tracing_subscriber::FmtSubscriber;
#[derive(Debug, StructOpt)]
pub struct Args {
#[structopt(short = "a", long, default_value = "127.0.0.1:8084")]
address: SocketAddr,
#[structopt(short = "t", long, default_value = "2.0")]
tps: PositiveFloat,
#[structopt(short = "c", long, default_value = "100")]
count: usize,
#[structopt(short = "o", long, required = true)]
operation: Vec<Operation>,
}
#[async_trait]
impl RunArgs for Args {
async fn run<T: Write + Send>(self, _writer: &mut std::io::BufWriter<T>) -> Result<()> {
run_daemon(self.address, self.tps, self.count, self.operation).await
}
}
async fn run_daemon(
address: SocketAddr,
tps: PositiveFloat,
count: usize,
operations: Vec<Operation>,
) -> Result<()> {
info!("Welcome to the Iroha 2 longevity load script");
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::DEBUG)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("Failed to init logging");
info!("Staring load script");
info!("Reading config file");
let config_file = File::open("config.json").expect("`config.json` not found.");
let cfg: Configuration =
serde_json::from_reader(config_file).expect("Failed to deserialize configuration.");
warn!("No status updates are given in the logs. To access that information please use `curl -X GET {} -i", address);
info!("Reading configuration finished");
debug!("Configuration: {:#?}", cfg);
let operations = operations.into_iter().fold(HashMap::new(), |mut m, op| {
m.insert(op, count);
m
});
let shared_client = Client::new(&cfg)?;
let client = shared_client.clone();
let notify_close = Arc::new(Notify::new());
let shared_status = Arc::new(RwLock::new(Status::default()));
let status = Arc::clone(&shared_status);
info!("Spawning clients");
let update_status_fut = task::spawn(update_status_according_to_events(
client,
status,
Arc::clone(¬ify_close),
));
info!("First client thread spawned");
let client = shared_client;
let status = Arc::clone(&shared_status);
let notify_close_clone = Arc::clone(¬ify_close);
let perform_operations_fut = task::spawn_blocking(move || {
let interval = Duration::from_secs_f64(1_f64 / f64::from(tps));
let is_closed = Arc::new(AtomicBool::new(false));
let is_closed_clone = Arc::clone(&is_closed);
task::spawn(async move {
notify_close_clone.notified().await;
is_closed_clone.store(true, Ordering::SeqCst);
});
perform_operations(
client.clone(),
Arc::clone(&status),
interval,
operations,
Arc::clone(&is_closed),
);
submit_empty_transactions(client, status, interval, is_closed);
});
info!("Second thread is spawned. Starting server");
let service = make_service_fn(move |_conn| {
let status = Arc::clone(&shared_status);
async move {
Result::<_, hyper::Error>::Ok(service_fn(move |req| {
handle_status_request(req, Arc::clone(&status))
}))
}
});
let server = Server::bind(&address)
.serve(service)
.with_graceful_shutdown(handle_shutdown_signal(notify_close));
join!(
async {
update_status_fut
.await
.expect("Failed to update status according events");
},
async {
perform_operations_fut
.await
.expect("Failed to perform operations");
},
async {
server.await.expect("Failed to serve a service");
}
);
Ok(())
}
async fn update_status_according_to_events(
client: Client,
status: Arc<RwLock<Status>>,
notify_close: Arc<Notify>,
) {
let event_filter = FilterBox::Pipeline(PipelineEventFilter::new());
let mut event_stream = client.listen_for_events_async(event_filter).await.unwrap();
loop {
let event = select! {
next = event_stream.next() => {
match next {
Some(event) => event,
None => break
}
},
_ = notify_close.notified() => {
break;
}
};
debug!(event = ?event, "got an event");
if let Ok(Event::Pipeline(event)) = event {
match event.status() {
PipelineStatus::Validating => {}
PipelineStatus::Rejected(_) => {
status
.write()
.expect("Failed to lock to write rejection timestamp to status")
.tx_is_rejected();
}
PipelineStatus::Committed => {
status
.write()
.expect("Failed to lock to write commit timestamp to status")
.tx_is_committed();
}
}
} else {
warn!("TX with unknown status");
status
.write()
.expect("Failed to lock to write unknown status")
.tx_is_unknown()
}
}
event_stream.close().await;
}
fn perform_operations(
client: Client,
status: Arc<RwLock<Status>>,
interval: Duration,
mut operations: HashMap<Operation, usize>,
is_closed: Arc<AtomicBool>,
) {
let alice_id = AccountId::from_str("alice@wonderland").expect("Failed to make Alice id");
let wonderland_id =
DomainId::new(Name::from_str("wonderland").expect("Failed to create Wodnerland name"));
while !operations.is_empty() {
if is_closed.load(Ordering::SeqCst) {
return;
}
operations.retain(|op, count| {
let start_time = Instant::now();
debug!(operation = ?op, count = ?count, "perform operation");
let instructions =
make_instruction_by_operation(op, alice_id.clone(), wonderland_id.clone(), *count);
let res = client.submit_all(instructions);
let elapsed = Instant::now().duration_since(start_time);
if elapsed < interval {
thread::sleep(interval - elapsed);
}
if let Err(err) = res {
warn!("Submit failed: {}", err);
true
} else {
status
.write()
.expect("Failed to lock to write status")
.tx_is_sent();
*count -= 1;
*count != 0
}
});
}
}
fn submit_empty_transactions(
client: Client,
status: Arc<RwLock<Status>>,
interval: Duration,
is_closed: Arc<AtomicBool>,
) {
info!("Submitting empty transactions");
loop {
if is_closed.load(Ordering::SeqCst) {
return;
}
let start_time = Instant::now();
client
.submit_all(Vec::<InstructionExpr>::new().into_iter())
.expect("Failed to submit empty ISI");
status
.write()
.expect("Failed to lock to write status")
.tx_is_sent();
let elapsed = Instant::now().duration_since(start_time);
if elapsed < interval {
thread::sleep(interval - elapsed);
}
}
}
async fn handle_status_request(
_req: Request<Body>,
status: Arc<RwLock<Status>>,
) -> Result<Response<Body>, hyper::Error> {
let guard = status.read().unwrap();
let str_status = serde_json::to_string(&*guard).unwrap();
let res = Response::builder()
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(str_status))
.unwrap();
Ok(res)
}
async fn handle_shutdown_signal(notify_close: Arc<Notify>) {
signal::ctrl_c()
.await
.expect("Failed to install CTRL+C signal handler");
info!("received a shutdown signal");
notify_close.notify_waiters();
}