Skip to content

Commit

Permalink
[sui-proxy/timeouts+errors] (#19454)
Browse files Browse the repository at this point in the history
## Description
add an on_error to the trace layer
add a timeout to all routes, configurable via env vars or using the
default.

## Test Plan
locally

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ x ] Nodes (Validators and Full nodes): nodes sending metrics will
have an enforced timeout of 20 seconds. If metrics cannot be sent in
that time, the sui-proxy metrics server will disconnect the client. This
will not affect the blockchain or any related activity. It only applies
to metrics transmission.
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
suiwombat authored Oct 1, 2024
1 parent 782461f commit c9af6ea
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 8 deletions.
26 changes: 20 additions & 6 deletions crates/sui-proxy/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use sui_tls::{
use tokio::signal;
use tower::ServiceBuilder;
use tower_http::{
trace::{DefaultOnResponse, TraceLayer},
timeout::TimeoutLayer,
trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer},
LatencyUnit,
};
use tracing::{info, Level};
Expand Down Expand Up @@ -112,16 +113,29 @@ pub fn app(
.layer(Extension(Arc::new(allower)));
}
router
// Enforce on all routes.
// If the request does not complete within the specified timeout it will be aborted
// and a 408 Request Timeout response will be sent.
.layer(TimeoutLayer::new(Duration::from_secs(var!(
"NODE_CLIENT_TIMEOUT",
20
))))
.layer(Extension(relay))
.layer(Extension(labels))
.layer(Extension(client))
.layer(
ServiceBuilder::new().layer(
TraceLayer::new_for_http().on_response(
DefaultOnResponse::new()
.level(Level::INFO)
.latency_unit(LatencyUnit::Seconds),
),
TraceLayer::new_for_http()
.on_response(
DefaultOnResponse::new()
.level(Level::INFO)
.latency_unit(LatencyUnit::Seconds),
)
.on_failure(
DefaultOnFailure::new()
.level(Level::ERROR)
.latency_unit(LatencyUnit::Seconds),
),
),
)
}
Expand Down
139 changes: 137 additions & 2 deletions crates/sui-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,32 @@ mod tests {
axum::serve(listener, app).await.unwrap();
}

/// axum_acceptor is a basic e2e test that creates a mock remote_write post endpoint and has a simple
async fn run_dummy_remote_write_very_slow(listener: TcpListener) {
/// i accept everything, send me the trash, but i will sleep and never return before a timeout
/// this is for testing slow clients and this is the easiest way to do so without adding a special
/// route in the server to do so
async fn handler() -> StatusCode {
// Simulate a route that hangs while waiting for a client to send data
// but the server itself doesn't delay its processing
tokio::time::sleep(Duration::from_secs(60)).await; // A very long sleep
StatusCode::OK
}

// build our application with a route
let app = Router::new().route("/v1/push", post(handler));

// run it
listener.set_nonblocking(true).unwrap();
let listener = tokio::net::TcpListener::from_std(listener).unwrap();
axum::serve(listener, app).await.unwrap();
}

/// test_axum_acceptor is a basic e2e test that creates a mock remote_write post endpoint and has a simple
/// sui-node client that posts data to the proxy using the protobuf format. The server processes this
/// data and sends it to the mock remote_write which accepts everything. Future work is to make this more
/// robust and expand the scope of coverage, probabaly moving this test elsewhere and renaming it.
#[tokio::test]
async fn axum_acceptor() {
async fn test_axum_acceptor() {
// generate self-signed certificates
let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
Expand Down Expand Up @@ -175,4 +195,119 @@ mod tests {
assert_eq!("created", body);
assert_eq!(status, reqwest::StatusCode::CREATED);
}

/// this is a long test to ensure we are timing out clients that are slow
#[tokio::test]
async fn test_client_timeout() {
// generate self-signed certificates
let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());

// create a fake rpc server
let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
let dummy_remote_write_url = format!(
"http://localhost:{}/v1/push",
dummy_remote_write_address.port()
);

let _dummy_remote_write = tokio::spawn(async move {
run_dummy_remote_write_very_slow(dummy_remote_write_listener).await
});

// init the tls config and allower
let mut allower = SuiNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
let tls_config = ClientCertVerifier::new(
allower.clone(),
sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
)
.rustls_server_config(
vec![server_priv_cert.rustls_certificate()],
server_priv_cert.rustls_private_key(),
)
.unwrap();

let client = admin::make_reqwest_client(
RemoteWriteConfig {
url: dummy_remote_write_url.to_owned(),
username: "bar".into(),
password: "foo".into(),
..Default::default()
},
"dummy user agent",
);

// this will affect other tests if they are run in parallel, but we only have two tests, so it shouldn't be an issue (yet)
// even still, the other tests complete very fast so those tests would also need to slow down by orders and orders to be
// bothered by this env var
std::env::set_var("NODE_CLIENT_TIMEOUT", "5");

let app = admin::app(
Labels {
network: "unittest-network".into(),
inventory_hostname: "ansible_inventory_name".into(),
},
client,
HistogramRelay::new(),
Some(allower.clone()),
);

let listener = std::net::TcpListener::bind("localhost:0").unwrap();
let server_address = listener.local_addr().unwrap();
let server_url = format!(
"https://localhost:{}/publish/metrics",
server_address.port()
);

let acceptor = TlsAcceptor::new(tls_config);
let _server = tokio::spawn(async move {
admin::server(listener, app, Some(acceptor)).await.unwrap();
});

// build a client
let client = reqwest::Client::builder()
.add_root_certificate(server_priv_cert.reqwest_certificate())
.identity(client_priv_cert.reqwest_identity())
.https_only(true)
.build()
.unwrap();

// Client request is rejected because it isn't in the allowlist
client.get(&server_url).send().await.unwrap_err();

// Insert the client's public key into the allowlist and verify the request is successful
allower.get_sui_mut().write().unwrap().insert(
client_pub_key.to_owned(),
peers::AllowedPeer {
name: "some-node".into(),
public_key: client_pub_key.to_owned(),
},
);

let mf = create_metric_family(
"foo_metric",
"some help this is",
None,
RepeatedField::from_vec(vec![create_metric_counter(
RepeatedField::from_vec(create_labels(vec![("some", "label")])),
create_counter(2046.0),
)]),
);

let mut buf = vec![];
let encoder = prometheus::ProtobufEncoder::new();
encoder.encode(&[mf], &mut buf).unwrap();

let res = client
.post(&server_url)
.header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
.body(buf)
.send()
.await
.expect("expected a successful post with a self-signed certificate");
let status = res.status();
assert_eq!(status, StatusCode::REQUEST_TIMEOUT);
// Clean up the environment variable
std::env::remove_var("NODE_CLIENT_TIMEOUT");
}
}

0 comments on commit c9af6ea

Please sign in to comment.