Skip to content

Commit

Permalink
Fix TCP connection hanging on backend connection error
Browse files Browse the repository at this point in the history
- Add handle_connection_result to centralize connection behavior for TCP
- Add similar function to centralize connection behavior of kawa_h1
- Close TCP connections uppon any connection error

Signed-off-by: Eloi DEMOLIS <eloi.demolis@clever-cloud.com>
  • Loading branch information
Wonshtrum committed Jan 23, 2024
1 parent 9d5788c commit 1710f8a
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 55 deletions.
54 changes: 34 additions & 20 deletions lib/src/protocol/kawa_h1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,8 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
proxy,
)
.map_err(|backend_error| {
// some backend errors are actually retryable
// TODO: maybe retry or return a different default answer
self.set_answer(DefaultAnswerStatus::Answer503, None);
BackendConnectionError::Backend(backend_error)
})?;
Expand Down Expand Up @@ -1538,16 +1540,10 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L

// trigger a backend reconnection
self.close_backend(proxy.clone(), metrics);
match self.connect_to_backend(session.clone(), proxy.clone(), metrics) {
// reuse connection or send a default answer, we can continue
Ok(BackendConnectAction::Reuse) => {}
Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
// we must wait for an event
return SessionResult::Continue;
}
Err(connection_error) => {
warn!("Error connecting to backend: {}", connection_error)
}
let connection_result =
self.connect_to_backend(session.clone(), proxy.clone(), metrics);
if let Some(session_result) = handle_connection_result(connection_result) {
return session_result;
}
} else {
metrics.backend_connected();
Expand Down Expand Up @@ -1593,16 +1589,10 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
match state_result {
StateResult::Continue => {}
StateResult::ConnectBackend => {
match self.connect_to_backend(session.clone(), proxy.clone(), metrics) {
// reuse connection or send a default answer, we can continue
Ok(BackendConnectAction::Reuse) => {}
Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
// we must wait for an event
return SessionResult::Continue;
}
Err(connection_error) => {
warn!("Error connecting to backend: {}", connection_error)
}
let connection_result =
self.connect_to_backend(session.clone(), proxy.clone(), metrics);
if let Some(session_result) = handle_connection_result(connection_result) {
return session_result;
}
}
StateResult::CloseBackend => unreachable!(),
Expand Down Expand Up @@ -1821,6 +1811,30 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> SessionState
}
}

fn handle_connection_result(
connection_result: Result<BackendConnectAction, BackendConnectionError>,
) -> Option<SessionResult> {
match connection_result {
// reuse connection or send a default answer, we can continue
Ok(BackendConnectAction::Reuse) => None,
Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
// we must wait for an event
Some(SessionResult::Continue)
}
Err(connection_error) => {
error!("Error connecting to backend: {}", connection_error);
// All BackendConnectionError already set a default answer
// the session must continue to serve it
// - NotFound: not used for http (only tcp)
// - RetrieveClusterError: 301/400/401/404,
// - MaxConnectionRetries: 503,
// - Backend: 503,
// - MaxSessionsMemory: not checked in connect_to_backend (TODO: check it?)
None
}
}
}

/// Save the HTTP status code of the backend response
fn save_http_status_metric(status: Option<u16>, context: LogContext) {
if let Some(status) = status {
Expand Down
63 changes: 28 additions & 35 deletions lib/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,39 +583,20 @@ impl TcpSession {

// trigger a backend reconnection
self.close_backend();
match self.connect_to_backend(session.clone()) {
// reuse connection or send a default answer, we can continue
Ok(BackendConnectAction::Reuse) => {}
Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
// stop here, we must wait for an event
return StateResult::Continue;
}
// TODO: should we return CloseSession here?
Err(connection_error) => {
error!("Error connecting to backend: {}", connection_error)
}
let connection_result = self.connect_to_backend(session.clone());
if let Some(state_result) = handle_connection_result(connection_result) {
return state_result;
}
} else if self.back_readiness().unwrap().event != Ready::EMPTY {
self.reset_connection_attempt();
let back_token = self.backend_token.unwrap();
self.container_backend_timeout.set(back_token);
// Why is this here? this artificially reset the connection time for no apparent reasons
// TODO: maybe remove this?
self.backend_connected = BackendConnectionStatus::Connecting(Instant::now());

self.set_back_connected(BackendConnectionStatus::Connected);
}
} else if back_connected == BackendConnectionStatus::NotConnected {
match self.connect_to_backend(session.clone()) {
// reuse connection or send a default answer, we can continue
Ok(BackendConnectAction::Reuse) => {}
Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
// we must wait for an event
return StateResult::Continue;
}
Err(connection_error) => {
error!("Error connecting to backend: {}", connection_error)
}
let connection_result = self.connect_to_backend(session.clone());
if let Some(state_result) = handle_connection_result(connection_result) {
return state_result;
}
}

Expand Down Expand Up @@ -668,16 +649,9 @@ impl TcpSession {

match order {
StateResult::ConnectBackend => {
match self.connect_to_backend(session.clone()) {
// reuse connection or send a default answer, we can continue
Ok(BackendConnectAction::Reuse) => {}
Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
// we must wait for an event
return StateResult::Continue;
}
Err(connection_error) => {
error!("Error connecting to backend: {}", connection_error)
}
let connection_result = self.connect_to_backend(session.clone());
if let Some(state_result) = handle_connection_result(connection_result) {
return state_result;
}
}
StateResult::Continue => {}
Expand Down Expand Up @@ -1174,6 +1148,25 @@ impl TcpListener {
}
}

fn handle_connection_result(
connection_result: Result<BackendConnectAction, BackendConnectionError>,
) -> Option<StateResult> {
match connection_result {
// reuse connection or send a default answer, we can continue
Ok(BackendConnectAction::Reuse) => None,
Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
// we must wait for an event
Some(StateResult::Continue)
}
Err(connection_error) => {
error!("Error connecting to backend: {}", connection_error);
// in case of BackendConnectionError::Backend(BackendError::ConnectionFailures(..))
// we may want to retry instead of closing
Some(StateResult::CloseBackend)
}
}
}

#[derive(Debug)]
pub struct ClusterConfiguration {
proxy_protocol: Option<ProxyProtocolConfig>,
Expand Down

0 comments on commit 1710f8a

Please sign in to comment.