|
1 | | -use hyper::service::{make_service_fn, service_fn}; |
2 | | -use hyper::{Body, Method, Request, Response, Server, StatusCode}; |
| 1 | +use http_body_util::Full; |
| 2 | +use hyper::body; |
| 3 | +use hyper::body::Bytes; |
| 4 | + |
| 5 | +use hyper::server::conn::http1; |
| 6 | +use hyper::service::service_fn; |
| 7 | +use hyper::{Method, Request, Response, StatusCode}; |
| 8 | +use hyper_util::rt::TokioIo; |
3 | 9 | use log::{debug, error, info}; |
4 | 10 | use phf::phf_map; |
5 | 11 | use std::collections::HashMap; |
6 | 12 | use std::fmt; |
7 | 13 | use std::net::SocketAddr; |
8 | 14 | use std::sync::atomic::Ordering; |
9 | 15 | use std::sync::Arc; |
| 16 | +use tokio::net::TcpListener; |
10 | 17 |
|
11 | 18 | use crate::config::Address; |
12 | 19 | use crate::pool::{get_all_pools, PoolIdentifier}; |
@@ -243,7 +250,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> { |
243 | 250 | } |
244 | 251 | } |
245 | 252 |
|
246 | | -async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> { |
| 253 | +async fn prometheus_stats( |
| 254 | + request: Request<body::Incoming>, |
| 255 | +) -> Result<Response<Full<Bytes>>, hyper::http::Error> { |
247 | 256 | match (request.method(), request.uri().path()) { |
248 | 257 | (&Method::GET, "/metrics") => { |
249 | 258 | let mut lines = Vec::new(); |
@@ -374,14 +383,35 @@ fn push_server_stats(lines: &mut Vec<String>) { |
374 | 383 | } |
375 | 384 |
|
376 | 385 | pub async fn start_metric_server(http_addr: SocketAddr) { |
377 | | - let http_service_factory = |
378 | | - make_service_fn(|_conn| async { Ok::<_, hyper::Error>(service_fn(prometheus_stats)) }); |
379 | | - let server = Server::bind(&http_addr).serve(http_service_factory); |
| 386 | + let listener = TcpListener::bind(http_addr); |
| 387 | + let listener = match listener.await { |
| 388 | + Ok(listener) => listener, |
| 389 | + Err(e) => { |
| 390 | + error!("Failed to bind prometheus server to HTTP address: {}.", e); |
| 391 | + return; |
| 392 | + } |
| 393 | + }; |
380 | 394 | info!( |
381 | 395 | "Exposing prometheus metrics on http://{}/metrics.", |
382 | 396 | http_addr |
383 | 397 | ); |
384 | | - if let Err(e) = server.await { |
385 | | - error!("Failed to run HTTP server: {}.", e); |
| 398 | + loop { |
| 399 | + let stream = match listener.accept().await { |
| 400 | + Ok((stream, _)) => stream, |
| 401 | + Err(e) => { |
| 402 | + error!("Error accepting connection: {}", e); |
| 403 | + continue; |
| 404 | + } |
| 405 | + }; |
| 406 | + let io = TokioIo::new(stream); |
| 407 | + |
| 408 | + tokio::task::spawn(async move { |
| 409 | + if let Err(err) = http1::Builder::new() |
| 410 | + .serve_connection(io, service_fn(prometheus_stats)) |
| 411 | + .await |
| 412 | + { |
| 413 | + eprintln!("Error serving HTTP connection for metrics: {:?}", err); |
| 414 | + } |
| 415 | + }); |
386 | 416 | } |
387 | 417 | } |
0 commit comments