-
Notifications
You must be signed in to change notification settings - Fork 2k
/
main_plt.rs
112 lines (99 loc) · 3.71 KB
/
main_plt.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
use std::{future::Future, io, pin::Pin, task::Context, task::Poll};
use ntex::util::{ready, PoolId, Ready};
use ntex::{fn_service, http::h1, io::Io, io::RecvError};
use sonic_rs::Serialize;
mod utils;
const JSON: &[u8] =
b"HTTP/1.1 200 OK\r\nServer: N\r\nContent-Type: application/json\r\nContent-Length: 27\r\n";
const PLAIN: &[u8] =
b"HTTP/1.1 200 OK\r\nServer: N\r\nContent-Type: text/plain\r\nContent-Length: 13\r\n";
const HTTPNFOUND: &[u8] = b"HTTP/1.1 400 OK\r\n";
const HDR_SERVER: &[u8] = b"Server: N\r\n";
const BODY: &[u8] = b"Hello, World!";
#[derive(Serialize)]
pub struct Message {
pub message: &'static str,
}
struct App {
io: Io,
codec: h1::Codec,
}
impl Future for App {
type Output = Result<(), ()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();
loop {
match ready!(this.io.poll_recv(&this.codec, cx)) {
Ok((req, _)) => {
let _ = this.io.with_write_buf(|buf| {
buf.with_bytes_mut(|buf| {
utils::reserve(buf, 2 * 1024);
match req.path() {
"/json" => {
buf.extend_from_slice(JSON);
this.codec.set_date_header(buf);
sonic_rs::to_writer(
utils::BytesWriter(buf),
&Message {
message: "Hello, World!",
},
)
.unwrap();
}
"/plaintext" => {
buf.extend_from_slice(PLAIN);
this.codec.set_date_header(buf);
buf.extend_from_slice(BODY);
}
_ => {
buf.extend_from_slice(HTTPNFOUND);
buf.extend_from_slice(HDR_SERVER);
}
}
})
});
}
Err(RecvError::WriteBackpressure) => {
let _ = ready!(this.io.poll_flush(cx, false));
}
Err(_) => {
return Poll::Ready(Ok(()));
}
}
}
}
}
#[ntex::main]
async fn main() -> io::Result<()> {
println!("Started http server: 127.0.0.1:8080");
let cores = core_affinity::get_core_ids().unwrap();
let total_cores = cores.len();
let cores = std::sync::Arc::new(std::sync::Mutex::new(cores));
// start http server
ntex::server::build()
.backlog(1024)
.bind("techempower", "0.0.0.0:8080", |cfg| {
cfg.memory_pool(PoolId::P1);
PoolId::P1.set_read_params(65535, 2048);
PoolId::P1.set_write_params(65535, 2048);
fn_service(|io| App {
io,
codec: h1::Codec::default(),
})
})?
.configure(move |cfg| {
let cores = cores.clone();
cfg.on_worker_start(move |_| {
if let Some(core) = cores.lock().unwrap().pop() {
core_affinity::set_for_current(core);
}
Ready::<_, &str>::Ok(())
});
Ok(())
})?
.workers(total_cores)
.run()
.await
}