Skip to content

Commit

Permalink
Merge pull request #1013 from sozu-proxy/devel/edemolis/fix/early_con…
Browse files Browse the repository at this point in the history
…nect

Fix early connect trials
  • Loading branch information
FlorentinDUBOIS authored Oct 24, 2023
2 parents 573b8dd + 76e0e7d commit 12c3234
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 42 deletions.
76 changes: 69 additions & 7 deletions e2e/src/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,65 @@ pub fn try_head() -> State {
State::Success
}

pub fn try_status_header_split() -> State {
let front_address = create_local_address();

let (config, listeners, state) = Worker::empty_config();
let (mut worker, mut backends) =
setup_sync_test("SPLIT", config, listeners, state, front_address, 1, false);
let mut backend = backends.pop().unwrap();

backend.connect();

let mut client = Client::new("client", front_address, "");

client.connect();

let mut accepted = false;
for (i, chunk) in [
"POST /api HTTP/1.",
"1\r\n",
"Host: localhost\r\n",
"Content-Length",
":",
" 1",
"0",
"\r",
"\n\r",
"\n012",
"34567",
"89",
]
.iter()
.enumerate()
{
println!("{accepted} {i} {chunk:?}");
client.set_request(*chunk);
client.send();
if !accepted {
println!("accepting");
accepted = backend.accept(0);
if accepted {
assert_eq!(i, 9);
}
backend.send(0);
}
if accepted {
println!("receiving");
let request = backend.receive(0);
println!("request: {request:?}");
}
}
backend.send(0);
let response = client.receive();
println!("response: {response:?}");
assert!(response.is_some());

worker.hard_stop();
worker.wait_for_server_stop();
State::Success
}

fn try_wildcard() -> State {
use sozu_command_lib::proto::command::{PathRule, RulePosition};
let front_address = create_local_address();
Expand Down Expand Up @@ -1472,12 +1531,7 @@ fn try_wildcard() -> State {
let mut client = Client::new(
"client",
front_address,
http_request(
"POST",
"/api",
format!("ping"),
"www.sozu.io",
),
http_request("POST", "/api", format!("ping"), "www.sozu.io"),
);

backend0.connect();
Expand Down Expand Up @@ -1709,7 +1763,15 @@ fn test_head() {
#[test]
fn test_wildcard() {
assert_eq!(
repeat_until_error_or(1, "Hostname with wildcard", try_wildcard),
repeat_until_error_or(2, "Hostname with wildcard", try_wildcard),
State::Success
);
}

#[test]
fn test_status_header_split() {
assert_eq!(
repeat_until_error_or(2, "Status line and Headers split", try_status_header_split),
State::Success
);
}
5 changes: 5 additions & 0 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ impl ProxySession for HttpSession {
}

if self.state.failed() {
match self.state.marker() {
StateMarker::Expect => incr!("http.upgrade.expect.failed"),
StateMarker::Http => incr!("http.upgrade.http.failed"),
StateMarker::WebSocket => incr!("http.upgrade.ws.failed"),
}
return;
}

Expand Down
7 changes: 7 additions & 0 deletions lib/src/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,13 @@ impl ProxySession for HttpsSession {
}

if self.state.failed() {
match self.state.marker() {
StateMarker::Expect => incr!("https.upgrade.expect.failed"),
StateMarker::Handshake => incr!("https.upgrade.handshake.failed"),
StateMarker::Http => incr!("https.upgrade.http.failed"),
StateMarker::WebSocket => incr!("https.upgrade.wss.failed"),
StateMarker::Http2 => incr!("https.upgrade.http2.failed"),
}
return;
}

Expand Down
65 changes: 33 additions & 32 deletions lib/src/protocol/kawa_h1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
"could not reset front timeout {:?}",
self.configured_frontend_timeout
);
self.print_state(&format!("{:?}", self.context.protocol));
self.print_state(self.protocol_string());
}

if let SessionStatus::DefaultAnswer(_, _, _) = self.status {
Expand Down Expand Up @@ -331,47 +331,44 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
pub fn readable_parse(&mut self, _metrics: &mut SessionMetrics) -> StateResult {
trace!("==============readable_parse");
let was_initial = self.request_stream.is_initial();
let was_not_proxying = !self.request_stream.is_main_phase();

kawa::h1::parse(&mut self.request_stream, &mut self.context);
// kawa::debug_kawa(&self.request_stream);

if was_initial && !self.request_stream.is_initial() {
// if it was the first request, the front timeout duration
// was set to request_timeout, which is much lower. For future
// requests on this connection, we can wait a bit more
self.container_frontend_timeout
.set_duration(self.configured_frontend_timeout);
gauge_add!("http.active_requests", 1);
incr!("http.requests");
}

if self.request_stream.is_error() {
incr!("http.frontend_parse_errors");
// was_initial is maybe too restrictive
// from what I understand the only condition should be:
// "did we already sent byte?"
if was_initial {
if self.response_stream.consumed {
return StateResult::CloseSession;
} else {
self.set_answer(DefaultAnswerStatus::Answer400, None);
// gauge_add!("http.active_requests", 1);
return StateResult::Continue;
} else {
return StateResult::CloseSession;
}
}

if self.request_stream.is_main_phase() {
self.backend_readiness.interest.insert(Ready::WRITABLE);
// if it was the first request, the front timeout duration
// was set to request_timeout, which is much lower. For future
// requests on this connection, we can wait a bit more
self.container_frontend_timeout
.set_duration(self.configured_frontend_timeout);
if was_not_proxying {
// Sozu tries to connect only once all the headers were gathered and edited
// this could be improved
trace!("============== HANDLE CONNECTION!");
return StateResult::ConnectBackend;
}
}
if self.request_stream.is_terminated() {
self.frontend_readiness.interest.remove(Ready::READABLE);
}

if was_initial {
if let kawa::StatusLine::Request { .. } = self.request_stream.detached.status_line {
trace!("============== HANDLE CONNECTION!");
return StateResult::ConnectBackend;
}
}
StateResult::Continue
}

Expand Down Expand Up @@ -457,6 +454,13 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
_ => (),
}

if !(self.request_stream.is_terminated() && self.request_stream.is_completed()) {
error!("Response terminated before request, this case is not handled properly yet");
incr!("http.early_response_close");
// FIXME: this will cause problems with pipelining
// return StateResult::CloseSession;
}

// FIXME: we could get smarter about this
// with no keepalive on backend, we could open a new backend ConnectionError
// with no keepalive on front but keepalive on backend, we could have
Expand Down Expand Up @@ -574,7 +578,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
"could not reset back timeout {:?}",
self.configured_backend_timeout
);
self.print_state(&format!("{:?}", self.context.protocol));
self.print_state(self.protocol_string());
}

if let SessionStatus::DefaultAnswer(_, _, _) = self.status {
Expand All @@ -600,7 +604,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
self.frontend_readiness.interest.insert(Ready::WRITABLE);
} else {
// server has filled its buffer and we can't empty it
// FIXME: what error code should we use?
// FIXME: should we send 507 Insufficient Storage ?
self.set_answer(DefaultAnswerStatus::Answer502, None);
}
return SessionResult::Continue;
Expand All @@ -609,7 +613,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
let (size, socket_state) = backend_socket.socket_read(self.response_stream.storage.space());
debug!(
"{}\tBACK [{}<-{:?}]: read {} bytes",
"ctx", // self.log_context(),
"ctx", // FIXME: self.log_context(),
self.frontend_token.0,
self.backend_token,
size
Expand Down Expand Up @@ -651,21 +655,16 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L

pub fn backend_readable_parse(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
trace!("==============backend_readable_parse");
let was_initial = self.response_stream.is_initial();

kawa::h1::parse(&mut self.response_stream, &mut self.context);
// kawa::debug_kawa(&self.response_stream);

if self.response_stream.is_error() {
incr!("http.backend_parse_errors");
// was_initial is maybe too restrictive
// from what I understand the only condition should be:
// "did we already sent byte?"
if was_initial {
if self.response_stream.consumed {
return SessionResult::Close;
} else {
self.set_answer(DefaultAnswerStatus::Answer502, None);
return SessionResult::Continue;
} else {
return SessionResult::Close;
}
}

Expand Down Expand Up @@ -782,6 +781,8 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
}
pub fn log_request_error(&mut self, metrics: &mut SessionMetrics, message: &str) {
incr!("http.errors");
error!("{} Could not process request properly got: {}", self.log_context(), message);
self.print_state(self.protocol_string());
self.log_request(metrics, Some(message));
}

Expand Down Expand Up @@ -1580,7 +1581,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
);
incr!("http.infinite_loop.error");

self.print_state(&format!("{:?}", self.context.protocol));
self.print_state(self.protocol_string());

return SessionResult::Close;
}
Expand Down Expand Up @@ -1704,7 +1705,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> SessionState
fn print_state(&self, context: &str) {
error!(
"\
{} Session(Pipe)
{} Session(Kawa)
\tFrontend:
\t\ttoken: {:?}\treadiness: {:?}\tstate: {:?}
\tBackend:
Expand Down
6 changes: 3 additions & 3 deletions lib/src/protocol/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {

pub fn log_request_error(&self, metrics: &SessionMetrics, message: &str) {
incr!("pipe.errors");
warn!("could not process request properly got: {}", message);
self.print_state(&self.log_context().to_string());
error!("{} Could not process request properly got: {}", self.log_context(), message);
self.print_state(self.protocol_string());
self.log_request(metrics, Some(message));
}

Expand Down Expand Up @@ -712,7 +712,7 @@ impl<Front: SocketHandler, L: ListenerHandler> SessionState for Pipe<Front, L> {
);
incr!("http.infinite_loop.error");

self.print_state(&format!("{:?}", self.protocol));
self.print_state(self.protocol_string());

return SessionResult::Close;
}
Expand Down
6 changes: 6 additions & 0 deletions lib/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,12 @@ impl ProxySession for TcpSession {
}

if self.state.failed() {
match self.state.marker() {
StateMarker::Pipe => incr!("tcp.upgrade.pipe.failed"),
StateMarker::SendProxyProtocol => incr!("tcp.upgrade.send.failed"),
StateMarker::RelayProxyProtocol => incr!("tcp.upgrade.relay.failed"),
StateMarker::ExpectProxyProtocol => incr!("tcp.upgrade.expect.failed"),
}
return;
}

Expand Down

0 comments on commit 12c3234

Please sign in to comment.