Skip to content

Commit

Permalink
Merge pull request #1039 from sozu-proxy/fix-issue-1038
Browse files Browse the repository at this point in the history
fix perf issue when creating new worker
  • Loading branch information
FlorentinDUBOIS authored Nov 29, 2023
2 parents f0e852b + 3a4e4fd commit 8d4e023
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 63 deletions.
47 changes: 6 additions & 41 deletions bin/src/command/requests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::{BTreeMap, HashSet},
fs::File,
io::{ErrorKind, Read, Write},
io::{ErrorKind, Read},
os::unix::io::{FromRawFd, IntoRawFd},
os::unix::net::UnixStream,
time::{Duration, Instant},
Expand Down Expand Up @@ -127,52 +127,15 @@ impl CommandServer {
.with_context(|| format!("could not open file at path: {}", &path))?;

let counter = self
.save_state_to_file(&mut file)
.state
.write_requests_to_file(&mut file)
.with_context(|| "failed writing state to file")?;

info!("wrote {} commands to {}", counter, path);

Ok(Some(Success::SaveState(counter, path.into())))
}

pub fn save_state_to_file(&mut self, file: &mut File) -> anyhow::Result<usize> {
let mut counter = 0usize;
let requests = self.state.generate_requests();

let result: anyhow::Result<usize> = (move || {
for request in requests {
let message = WorkerRequest::new(format!("SAVE-{counter}"), request);

file.write_all(
&serde_json::to_string(&message)
.map(|s| s.into_bytes())
.unwrap_or_default(),
)
.with_context(|| {
format!(
"Could not add this instruction line to the saved state file: {message:?}"
)
})?;

file.write_all(&b"\n\0"[..])
.with_context(|| "Could not add new line to the saved state file")?;

if counter % 1000 == 0 {
info!("writing command {}", counter);
file.sync_all()
.with_context(|| "Failed to sync the saved state file")?;
}
counter += 1;
}
file.sync_all()
.with_context(|| "Failed to sync the saved state file")?;

Ok(counter)
})();

result.with_context(|| "Could not write the state onto the state file")
}

pub async fn load_state(
&mut self,
client_id: Option<String>,
Expand Down Expand Up @@ -1237,10 +1200,12 @@ impl CommandServer {
"Saving state to file",
)
.await;

let mut file = File::create(&path)
.with_context(|| "Could not create file to automatically save the state")?;

self.save_state_to_file(&mut file)
self.state
.write_requests_to_file(&mut file)
.with_context(|| format!("could not save state automatically to {path}"))?;
}
}
Expand Down
14 changes: 8 additions & 6 deletions bin/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use sozu_command_lib::{
logging::target_to_backend,
proto::command::{request::RequestType, Request, RunState, Status, WorkerInfo},
ready::Ready,
request::WorkerRequest,
request::{read_requests_from_file, WorkerRequest},
response::WorkerResponse,
scm_socket::{Listeners, ScmSocket},
state::ConfigState,
Expand Down Expand Up @@ -223,8 +223,9 @@ pub fn begin_worker_process(
error!("Could not block the worker-to-main channel: {}", e);
}

let configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) };
let config_state: ConfigState = serde_json::from_reader(configuration_state_file)
let mut configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) };

let initial_state = read_requests_from_file(&mut configuration_state_file)
.with_context(|| "could not parse configuration state data")?;

let worker_config = worker_to_main_channel
Expand Down Expand Up @@ -282,7 +283,7 @@ pub fn begin_worker_process(
worker_to_main_channel,
worker_to_main_scm_socket,
worker_config,
config_state,
initial_state,
true,
)
.with_context(|| "Could not create server from config")?;
Expand Down Expand Up @@ -312,8 +313,9 @@ pub fn fork_main_into_worker(
tempfile().with_context(|| "could not create temporary file for configuration state")?;
util::disable_close_on_exec(state_file.as_raw_fd())?;

serde_json::to_writer(&mut state_file, state)
.with_context(|| "could not write upgrade data to temporary file")?;
state
.write_requests_to_file(&mut state_file)
.with_context(|| "Could not write state to file")?;

state_file
.rewind()
Expand Down
59 changes: 59 additions & 0 deletions command/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use std::{
error,
fmt::{self, Display},
fs::File,
io::Read,
net::SocketAddr,
str::FromStr,
};

use nom::{HexDisplay, Offset};

use crate::{
buffer::fixed::Buffer,
parser::parse_several_requests,
proto::command::{
request::RequestType, LoadBalancingAlgorithms, PathRuleKind, Request, RequestHttpFrontend,
RulePosition,
Expand All @@ -19,6 +25,10 @@ pub enum RequestError {
InvalidSocketAddress { address: String, error: String },
#[error("invalid value {value} for field '{name}'")]
InvalidValue { name: String, value: i32 },
#[error("Could not read requests from file: {0}")]
FileError(std::io::Error),
#[error("Could not parse requests: {0}")]
ParseError(String),
}

impl Request {
Expand Down Expand Up @@ -123,6 +133,55 @@ impl fmt::Display for WorkerRequest {
}
}

pub fn read_requests_from_file(file: &mut File) -> Result<Vec<WorkerRequest>, RequestError> {
let mut acc = Vec::new();
let mut buffer = Buffer::with_capacity(200000);
loop {
let previous = buffer.available_data();

let bytes_read = file
.read(buffer.space())
.map_err(|e| RequestError::FileError(e))?;

buffer.fill(bytes_read);

if buffer.available_data() == 0 {
debug!("Empty buffer");
break;
}

let mut offset = 0usize;
match parse_several_requests::<WorkerRequest>(buffer.data()) {
Ok((i, requests)) => {
if !i.is_empty() {
debug!("could not parse {} bytes", i.len());
if previous == buffer.available_data() {
break;
}
}
offset = buffer.data().offset(i);

acc.push(requests);
}
Err(nom::Err::Incomplete(_)) => {
if buffer.available_data() == buffer.capacity() {
error!(
"message too big, stopping parsing:\n{}",
buffer.data().to_hex(16)
);
break;
}
}
Err(parse_error) => {
return Err(RequestError::ParseError(parse_error.to_string()));
}
}
buffer.consume(offset);
}
let requests = acc.into_iter().flatten().collect();
Ok(requests)
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ProxyDestinations {
pub to_http_proxy: bool,
Expand Down
36 changes: 36 additions & 0 deletions command/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::{
btree_map::Entry as BTreeMapEntry, hash_map::DefaultHasher, BTreeMap, BTreeSet, HashMap,
HashSet,
},
fs::File,
hash::{Hash, Hasher},
io::Write,
iter::{repeat, FromIterator},
net::SocketAddr,
};
Expand All @@ -23,6 +25,7 @@ use crate::{
},
display::format_request_type,
},
request::WorkerRequest,
response::{Backend, HttpFrontend, TcpFrontend},
ObjectKind,
};
Expand Down Expand Up @@ -56,6 +59,8 @@ pub enum StateError {
"Could not convert the frontend to an insertable one. Frontend: {frontend} error: {error}"
)]
FrontendConversion { frontend: String, error: String },
#[error("Could not write state to file: {0}")]
FileError(std::io::Error),
}

impl From<DecodeError> for StateError {
Expand Down Expand Up @@ -1367,6 +1372,37 @@ impl ConfigState {
tcp_listeners: self.tcp_listeners.clone(),
}
}

/// generate requests necessary to recreate the state,
/// write them in a JSON form in a file, separated by \n\0,
/// returns the number of written requests
pub fn write_requests_to_file(&self, file: &mut File) -> Result<usize, StateError> {
let mut counter = 0usize;
let requests = self.generate_requests();

for request in requests {
let message = WorkerRequest::new(format!("SAVE-{counter}"), request);

file.write_all(
&serde_json::to_string(&message)
.map(|s| s.into_bytes())
.unwrap_or_default(),
)
.map_err(StateError::FileError)?;

file.write_all(&b"\n\0"[..])
.map_err(StateError::FileError)?;

if counter % 1000 == 0 {
info!("writing command {}", counter);
file.sync_all().map_err(StateError::FileError)?;
}
counter += 1;
}
file.sync_all().map_err(StateError::FileError)?;

Ok(counter)
}
}

fn parse_socket_address(address: &str) -> Result<SocketAddr, StateError> {
Expand Down
21 changes: 18 additions & 3 deletions e2e/src/sozu/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,19 @@ impl Worker {
.send_listeners(&listeners)
.expect("could not send listeners");

let initial_state = state
.generate_requests()
.into_iter()
.map(|request| WorkerRequest {
id: "initial_state".to_string(),
content: request,
})
.collect();
let server = Server::try_new_from_config(
cmd_worker_to_main,
scm_worker_to_main,
config,
state,
initial_state,
false,
)
.expect("could not create sozu worker");
Expand Down Expand Up @@ -139,7 +147,14 @@ impl Worker {
.expect("could not send listeners");

let thread_config = config.to_owned();
let thread_state = state.to_owned();
let initial_state = state
.generate_requests()
.into_iter()
.map(|request| WorkerRequest {
id: "initial_state".to_string(),
content: request,
})
.collect();
let thread_name = name.to_owned();
let thread_scm_worker_to_main = scm_worker_to_main.to_owned();

Expand All @@ -157,7 +172,7 @@ impl Worker {
cmd_worker_to_main,
thread_scm_worker_to_main,
thread_config,
thread_state,
initial_state,
false,
)
.expect("could not create sozu worker");
Expand Down
20 changes: 7 additions & 13 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl Server {
worker_to_main_channel: ProxyChannel,
worker_to_main_scm: ScmSocket,
config: Config,
config_state: ConfigState,
initial_state: Vec<WorkerRequest>,
expects_initial_status: bool,
) -> anyhow::Result<Self> {
let event_loop = Poll::new().with_context(|| "could not create event loop")?;
Expand Down Expand Up @@ -331,7 +331,7 @@ impl Server {
Some(https),
None,
server_config,
Some(config_state),
Some(initial_state),
expects_initial_status,
)
}
Expand All @@ -348,7 +348,7 @@ impl Server {
https: Option<https::HttpsProxy>,
tcp: Option<tcp::TcpProxy>,
server_config: ServerConfig,
config_state: Option<ConfigState>,
initial_state: Option<Vec<WorkerRequest>>,
expects_initial_status: bool,
) -> anyhow::Result<Self> {
FEATURES.with(|_features| {
Expand Down Expand Up @@ -438,16 +438,10 @@ impl Server {
};

// initialize the worker with the state we got from a file
if let Some(state) = config_state {
for (counter, request) in state.generate_requests().iter().enumerate() {
let id = format!("INIT-{counter}");
let worker_request = WorkerRequest {
id,
content: request.to_owned(),
};

trace!("generating initial config request: {:#?}", worker_request);
server.notify_proxys(worker_request);
if let Some(requests) = initial_state {
for request in requests {
trace!("generating initial config request: {:#?}", request);
server.notify_proxys(request);
}

// do not send back answers to the initialization messages
Expand Down

0 comments on commit 8d4e023

Please sign in to comment.