Skip to content

Commit

Permalink
create ConfigState::write_requests_to_file
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Nov 27, 2023
1 parent 757a6f1 commit b754391
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 41 deletions.
47 changes: 6 additions & 41 deletions bin/src/command/requests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::{BTreeMap, HashSet},
fs::File,
io::{ErrorKind, Read, Write},
io::{ErrorKind, Read},
os::unix::io::{FromRawFd, IntoRawFd},
os::unix::net::UnixStream,
time::{Duration, Instant},
Expand Down Expand Up @@ -127,52 +127,15 @@ impl CommandServer {
.with_context(|| format!("could not open file at path: {}", &path))?;

let counter = self
.save_state_to_file(&mut file)
.state
.write_requests_to_file(&mut file)
.with_context(|| "failed writing state to file")?;

info!("wrote {} commands to {}", counter, path);

Ok(Some(Success::SaveState(counter, path.into())))
}

pub fn save_state_to_file(&mut self, file: &mut File) -> anyhow::Result<usize> {
let mut counter = 0usize;
let requests = self.state.generate_requests();

let result: anyhow::Result<usize> = (move || {
for request in requests {
let message = WorkerRequest::new(format!("SAVE-{counter}"), request);

file.write_all(
&serde_json::to_string(&message)
.map(|s| s.into_bytes())
.unwrap_or_default(),
)
.with_context(|| {
format!(
"Could not add this instruction line to the saved state file: {message:?}"
)
})?;

file.write_all(&b"\n\0"[..])
.with_context(|| "Could not add new line to the saved state file")?;

if counter % 1000 == 0 {
info!("writing command {}", counter);
file.sync_all()
.with_context(|| "Failed to sync the saved state file")?;
}
counter += 1;
}
file.sync_all()
.with_context(|| "Failed to sync the saved state file")?;

Ok(counter)
})();

result.with_context(|| "Could not write the state onto the state file")
}

pub async fn load_state(
&mut self,
client_id: Option<String>,
Expand Down Expand Up @@ -1237,10 +1200,12 @@ impl CommandServer {
"Saving state to file",
)
.await;

let mut file = File::create(&path)
.with_context(|| "Could not create file to automatically save the state")?;

self.save_state_to_file(&mut file)
self.state
.write_requests_to_file(&mut file)
.with_context(|| format!("could not save state automatically to {path}"))?;
}
}
Expand Down
36 changes: 36 additions & 0 deletions command/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::{
btree_map::Entry as BTreeMapEntry, hash_map::DefaultHasher, BTreeMap, BTreeSet, HashMap,
HashSet,
},
fs::File,
hash::{Hash, Hasher},
io::Write,
iter::{repeat, FromIterator},
net::SocketAddr,
};
Expand All @@ -23,6 +25,7 @@ use crate::{
},
display::format_request_type,
},
request::WorkerRequest,
response::{Backend, HttpFrontend, TcpFrontend},
ObjectKind,
};
Expand Down Expand Up @@ -56,6 +59,8 @@ pub enum StateError {
"Could not convert the frontend to an insertable one. Frontend: {frontend} error: {error}"
)]
FrontendConversion { frontend: String, error: String },
#[error("Could not write state to file: {0}")]
FileError(std::io::Error),
}

impl From<DecodeError> for StateError {
Expand Down Expand Up @@ -1367,6 +1372,37 @@ impl ConfigState {
tcp_listeners: self.tcp_listeners.clone(),
}
}

/// generate requests necessary to recreate the state,
/// write them in a JSON form in a file, separated by \n\0,
/// returns the number of written requests
pub fn write_requests_to_file(&self, file: &mut File) -> Result<usize, StateError> {
let mut counter = 0usize;
let requests = self.generate_requests();

for request in requests {
let message = WorkerRequest::new(format!("SAVE-{counter}"), request);

file.write_all(
&serde_json::to_string(&message)
.map(|s| s.into_bytes())
.unwrap_or_default(),
)
.map_err(StateError::FileError)?;

file.write_all(&b"\n\0"[..])
.map_err(StateError::FileError)?;

if counter % 1000 == 0 {
info!("writing command {}", counter);
file.sync_all().map_err(StateError::FileError)?;
}
counter += 1;
}
file.sync_all().map_err(StateError::FileError)?;

Ok(counter)
}
}

fn parse_socket_address(address: &str) -> Result<SocketAddr, StateError> {
Expand Down

0 comments on commit b754391

Please sign in to comment.