-
Notifications
You must be signed in to change notification settings - Fork 14
/
main.rs
124 lines (102 loc) · 3.46 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#[macro_use]
extern crate slog;
extern crate slog_async;
extern crate slog_term;
use actix_web::{web, App, HttpServer};
use raftify::{
raft::{formatter::set_custom_formatter, logger::Slogger},
CustomFormatter,
};
use slog::Drain;
use slog_envlogger::LogBuilder;
use std::sync::Arc;
use structopt::StructOpt;
use example_harness::config::build_config;
use memstore_example_harness::{
state_machine::{HashStore, LogEntry, Raft},
web_server_api::{
campaign, debug, demote, get, leader, leave, leave_joint, peers, put, snapshot,
transfer_leader,
},
};
use memstore_static_members::utils::load_peers;
#[cfg(feature = "inmemory_storage")]
use raftify::MemStorage;
#[cfg(feature = "heed_storage")]
use raftify::HeedStorage;
#[cfg(feature = "rocksdb_storage")]
use raftify::RocksDBStorage;
#[derive(Debug, StructOpt)]
struct Options {
#[structopt(long)]
raft_addr: String,
#[structopt(long)]
web_server: Option<String>,
// TODO: Make "bootstrap_from_snapshot" option here
}
#[actix_rt::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
color_backtrace::install();
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let mut builder = LogBuilder::new(drain);
builder = builder.filter(None, slog::FilterLevel::Debug);
if let Ok(s) = std::env::var("RUST_LOG") {
builder = builder.parse(&s);
}
let drain = builder.build().fuse();
let logger = Arc::new(Slogger {
slog: slog::Logger::root(drain, o!()),
});
set_custom_formatter(CustomFormatter::<LogEntry, HashStore>::new());
let options = Options::from_args();
let store = HashStore::new();
let initial_peers = load_peers("cluster_config.toml").await?;
let node_id = initial_peers
.get_node_id_by_addr(options.raft_addr.clone())
.unwrap();
let cfg = build_config(node_id, Some(initial_peers.clone()));
#[cfg(feature = "inmemory_storage")]
let log_storage = MemStorage::create();
#[cfg(feature = "heed_storage")]
let log_storage = HeedStorage::create(cfg.get_log_dir(), &cfg.clone(), logger.clone())
.expect("Failed to create storage");
#[cfg(feature = "rocksdb_storage")]
let log_storage = RocksDBStorage::create(cfg.get_log_dir(), logger.clone())
.expect("Failed to create storage");
let raft = Raft::bootstrap(
node_id,
options.raft_addr.clone(),
log_storage,
store.clone(),
cfg.clone(),
logger.clone(),
)?;
let handle = tokio::spawn(raft.clone().run());
if let Some(addr) = options.web_server {
let _web_server = tokio::spawn(
HttpServer::new(move || {
App::new()
.app_data(web::Data::new((store.clone(), raft.clone())))
.service(put)
.service(get)
.service(leave)
.service(debug)
.service(peers)
.service(snapshot)
.service(leader)
.service(leave_joint)
.service(transfer_leader)
.service(campaign)
.service(demote)
})
.bind(addr)
.unwrap()
.run(),
);
}
let result = tokio::try_join!(handle)?;
result.0?;
Ok(())
}