Skip to content

Commit

Permalink
Continue frame handling:
Browse files Browse the repository at this point in the history
- Implement Priority, PushPromise, GoAway and Continuation frame parsing
- Add kawa_h1::HttpContext to Streams
- Separate headers handling into pkawa module (will be relocated to
  Kawa)
- Use kawa_h1::editor callbacks to edit h2 headers and HttpContext

Signed-off-by: Eloi DEMOLIS <eloi.demolis@clever-cloud.com>
  • Loading branch information
Wonshtrum committed Aug 22, 2023
1 parent 085d85a commit fb53d65
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 94 deletions.
86 changes: 46 additions & 40 deletions lib/src/protocol/mux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@ use std::{
rc::{Rc, Weak},
};

use kawa::h1::ParserCallbacks;
use mio::{net::TcpStream, Token};
use rusty_ulid::Ulid;
use sozu_command::ready::Ready;

mod parser;
mod pkawa;
mod serializer;

use crate::{
https::HttpsListener,
pool::{Checkout, Pool},
protocol::SessionState,
protocol::{mux::parser::error_code_to_str, SessionState},
socket::{FrontRustls, SocketHandler, SocketResult},
AcceptError, L7Proxy, ProxySession, Readiness, SessionMetrics, SessionResult, StateResult,
};

use super::http::editor::HttpContext;

/// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer
type GenericHttpStream = kawa::Kawa<Checkout>;
type StreamId = u32;
Expand Down Expand Up @@ -85,10 +89,11 @@ pub struct ConnectionH2<Front: SocketHandler> {
}

pub struct Stream {
pub request_id: Ulid,
// pub request_id: Ulid,
pub window: i32,
pub front: GenericHttpStream,
pub back: GenericHttpStream,
pub context: HttpContext,
}

impl Stream {
Expand Down Expand Up @@ -229,10 +234,27 @@ impl Context {
None => return Err(AcceptError::BufferCapacityReached),
};
Ok(Stream {
request_id,
window: window as i32,
front: GenericHttpStream::new(kawa::Kind::Request, kawa::Buffer::new(front_buffer)),
back: GenericHttpStream::new(kawa::Kind::Response, kawa::Buffer::new(back_buffer)),
context: HttpContext {
keep_alive_backend: false,
keep_alive_frontend: false,
sticky_session_found: None,
method: None,
authority: None,
path: None,
status: None,
reason: None,
user_agent: None,
closing: false,
id: request_id,
protocol: crate::Protocol::HTTPS,
public_address: "0.0.0.0:80".parse().unwrap(),
session_address: None,
sticky_name: "SOZUBALANCEID".to_owned(),
sticky_session: None,
},
})
}

Expand Down Expand Up @@ -381,8 +403,16 @@ impl SessionState for Mux {
let mut b = [0; 1024];
let (size, status) = s.socket_read(&mut b);
println!("{size} {status:?} {:?}", &b[..size]);
for stream in &self.context.streams.others {
kawa::debug_kawa(&stream.front);
for stream in &mut self.context.streams.others {
let kawa = &mut stream.front;
kawa::debug_kawa(kawa);
kawa.prepare(&mut kawa::h1::BlockConverter);
let out = kawa.as_io_slice();
let mut writer = std::io::BufWriter::new(Vec::new());
let amount = writer.write_vectored(&out).unwrap();
println!("amount: {amount}\n{}", unsafe {
std::str::from_utf8_unchecked(writer.buffer())
});
}
}
}
Expand Down Expand Up @@ -496,10 +526,10 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
} else {
self.create_stream(header.stream_id, context)
};
let stream_id = if header.frame_type == parser::FrameType::Headers {
0
} else {
let stream_id = if header.frame_type == parser::FrameType::Data {
stream_id
} else {
0
};
println!("{} {} {:#?}", header.stream_id, stream_id, self.streams);
self.expect = Some((stream_id as usize, header.payload_len as usize));
Expand Down Expand Up @@ -570,36 +600,12 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
let global_stream_id = self.streams.get(&headers.stream_id).unwrap();
let kawa = context.streams.zero.front(self.position);
let buffer = headers.header_block_fragment.data(kawa.storage.buffer());
// this is Kawa's Territory, it may be rewritten in Kawa and called as:
// kawa::h2::handle_header(buffer, kawa, EditorCallBacks)
let kawa = context.streams.others[*global_stream_id - 1].front(self.position);
println!("{buffer:?}");
context
.decoder
.decode_with_cb(buffer, |k, v| {
// Proof of concept that we can decompress stream 0 fragments into another stream
// this is incomplete as pseudo headers should be sorted out and stored as a kawa::StatusLine
let start = kawa.storage.end as u32;
kawa.storage.write(&k).unwrap();
kawa.storage.write(&v).unwrap();
let len_key = k.len() as u32;
let len_val = v.len() as u32;
kawa.push_block(kawa::Block::Header(kawa::Pair {
key: kawa::Store::Slice(kawa::repr::Slice {
start,
len: len_key,
}),
val: kawa::Store::Slice(kawa::repr::Slice {
start: start + len_key,
len: len_val,
}),
}));
})
.unwrap();
// everything has been parsed
kawa.storage.head = kawa.storage.end;
let stream = &mut context.streams.others[*global_stream_id - 1];
let kawa = &mut stream.front;
pkawa::handle_header(kawa, buffer, &mut context.decoder);
stream.context.on_headers(kawa);
}
parser::Frame::Priority => todo!(),
parser::Frame::Priority(priority) => (),

Check warning on line 608 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Build documentation

unused variable: `priority`

Check warning on line 608 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

unused variable: `priority`

Check warning on line 608 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

unused variable: `priority`

Check warning on line 608 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

unused variable: `priority`

Check warning on line 608 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

unused variable: `priority`

Check warning on line 608 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

unused variable: `priority`

Check warning on line 608 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

unused variable: `priority`
parser::Frame::RstStream(_) => todo!(),
parser::Frame::Settings(settings) => {
for setting in settings.settings {
Expand All @@ -615,14 +621,14 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
}
println!("{:#?}", self.settings);
}
parser::Frame::PushPromise => todo!(),
parser::Frame::PushPromise(_) => todo!(),
parser::Frame::Ping(_) => todo!(),
parser::Frame::GoAway => todo!(),
parser::Frame::GoAway(goaway) => panic!("{}", error_code_to_str(goaway.error_code)),
parser::Frame::WindowUpdate(update) => {
let global_stream_id = *self.streams.get(&update.stream_id).unwrap();
context.streams.get(global_stream_id).window += update.increment as i32;
}
parser::Frame::Continuation => todo!(),
parser::Frame::Continuation(_) => todo!(),
}
}
}
Expand Down
Loading

0 comments on commit fb53d65

Please sign in to comment.