Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix/peek loop #3948

Merged
merged 33 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3519398
Use dedicated deserializer for client-binder msg receive
Ben-PH Apr 28, 2023
125c1f3
Use helper method to decode data in known-len component of msg frm cl…
Ben-PH Apr 28, 2023
e64dd15
In client-binder, use helper method instead of a dedicated deser-struct
Ben-PH Apr 28, 2023
7ae357b
Remove redundant TODO
Ben-PH Apr 28, 2023
ca3a5c6
Bootstrap/constify (#3883)
Ben-PH Apr 28, 2023
03321ee
Use timeout-updating read-loop. Use peek_exact_timeout helper
Ben-PH Apr 28, 2023
ad38081
Send message to client in single write_all
Ben-PH Apr 28, 2023
d7c46a0
Document new helper methods
Ben-PH Apr 28, 2023
89b5c60
Make the servers `send_timeout` more readable
Ben-PH Apr 28, 2023
4aff56a
Revert the use of peek to a read-exact
Ben-PH May 3, 2023
041662c
Use traits to implement read_exact without code duplication
Ben-PH May 3, 2023
4c7a5b5
Remove unused peek implementation
Ben-PH May 3, 2023
01fa710
Comment the new methods
Ben-PH May 3, 2023
d006490
Clear clippy lints
Ben-PH May 3, 2023
472ed63
Client sends message in a single write
Ben-PH May 3, 2023
9c6055c
Implement and use the traits for the server binder
Ben-PH May 3, 2023
5676e9b
Module rearrangement
Ben-PH May 3, 2023
17eafbd
Merge remote-tracking branch 'origin/testnet_22' into bootstrap/use-r…
Ben-PH May 3, 2023
93c43a3
`cargo fmt --all`
Ben-PH May 3, 2023
c92b93d
Self review
Ben-PH May 3, 2023
35a5c2d
Merge remote-tracking branch 'origin/testnet_22' into bootstrap/messa…
Ben-PH May 3, 2023
497e76e
Merge remote-tracking branch 'origin/bootstrap/use-read-loop' into bo…
Ben-PH May 3, 2023
4775dc6
Create new branch testnet 23
AurelienFT May 4, 2023
8493469
Update version in constants
AurelienFT May 4, 2023
9e5abd5
Delete file that somehow missed the deletion-memo
Ben-PH May 4, 2023
9f8bd55
Merge branch 'testnet_23' into bootstrap/message-deser
Ben-PH May 4, 2023
54aa34c
Clear clippy lints
Ben-PH May 4, 2023
1e65518
Bootstrap/use read loop (#3885)
Ben-PH May 4, 2023
9b867ed
Merge branch 'testnet_23' into bootstrap/message-deser
Ben-PH May 4, 2023
0102c2e
rename peek and friends to reflect the reversion of peek usage
Ben-PH May 4, 2023
3f7b7b4
Use read_exact_timeout (should have been included in use-read-loop)
Ben-PH May 4, 2023
ff7a558
Use an in-file constant for compile-time known values
Ben-PH May 4, 2023
9caa928
Merge branch 'main' into hotfix/peek-loop
Ben-PH May 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ env:
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
# Quick tests on each commit/PR
sanity:
Expand Down
56 changes: 56 additions & 0 deletions massa-bootstrap/src/bindings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
mod client;
mod server;
use std::{
io::{self, ErrorKind},
time::{Duration, Instant},
};

pub(crate) use client::*;
pub(crate) use server::*;

trait BindingReadExact: io::Read {
/// similar to std::io::Read::read_exact, but with a timeout that is function-global instead of per-individual-read
fn read_exact_timeout(
&mut self,
buf: &mut [u8],
deadline: Option<Instant>,
) -> Result<(), (std::io::Error, usize)> {
let mut count = 0;
self.set_read_timeout(None).map_err(|err| (err, count))?;
while count < buf.len() {
// update the timeout
if let Some(deadline) = deadline {
let dur = deadline.saturating_duration_since(Instant::now());
if dur.is_zero() {
return Err((
std::io::Error::new(ErrorKind::TimedOut, "deadline has elapsed"),
count,
));
}
self.set_read_timeout(Some(dur))
.map_err(|err| (err, count))?;
}

// do the read
match self.read(&mut buf[count..]) {
Ok(0) => break,
Ok(n) => {
count += n;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err((e, count)),
}
}
if count != buf.len() {
Err((
std::io::Error::new(ErrorKind::UnexpectedEof, "failed to fill whole buffer"),
count,
))
} else {
Ok(())
}
}

/// Internal helper
fn set_read_timeout(&mut self, duration: Option<Duration>) -> Result<(), std::io::Error>;
}
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
// Copyright (c) 2022 MASSA LABS <info@massa.net>

use crate::bindings::BindingReadExact;
use crate::error::BootstrapError;
use crate::messages::{
BootstrapClientMessage, BootstrapClientMessageSerializer, BootstrapServerMessage,
BootstrapServerMessageDeserializer,
};
use crate::settings::BootstrapClientConfig;
use massa_hash::Hash;
use massa_models::config::{MAX_BOOTSTRAP_MESSAGE_SIZE, MAX_BOOTSTRAP_MESSAGE_SIZE_BYTES};
use massa_models::serialization::{DeserializeMinBEInt, SerializeMinBEInt};
use massa_models::version::{Version, VersionSerializer};
use massa_serialization::{DeserializeError, Deserializer, Serializer};
use massa_signature::{PublicKey, Signature, SIGNATURE_SIZE_BYTES};
use rand::{rngs::StdRng, RngCore, SeedableRng};
use std::{
io::{Read, Write},
net::TcpStream,
time::Duration,
};
use std::time::Instant;
use std::{io::Write, net::TcpStream, time::Duration};

/// Bootstrap client binder
pub struct BootstrapClientBinder {
// max_bootstrap_message_size: u32,
size_field_len: usize,
remote_pubkey: PublicKey,
duplex: TcpStream,
prev_message: Option<Hash>,
version_serializer: VersionSerializer,
cfg: BootstrapClientConfig,
}

const KNOWN_PREFIX_LEN: usize = SIGNATURE_SIZE_BYTES + MAX_BOOTSTRAP_MESSAGE_SIZE_BYTES;
/// The known-length component of a message to be received.
struct ServerMessageLeader {
sig: Signature,
msg_len: u32,
}

impl BootstrapClientBinder {
/// Creates a new `WriteBinder`.
///
Expand All @@ -37,9 +41,7 @@ impl BootstrapClientBinder {
/// * limit: limit max bytes per second (up and down)
#[allow(clippy::too_many_arguments)]
pub fn new(duplex: TcpStream, remote_pubkey: PublicKey, cfg: BootstrapClientConfig) -> Self {
let size_field_len = u32::be_bytes_min_length(cfg.max_bootstrap_message_size);
BootstrapClientBinder {
size_field_len,
remote_pubkey,
duplex,
prev_message: None,
Expand Down Expand Up @@ -69,34 +71,20 @@ impl BootstrapClientBinder {
Ok(())
}

// TODO: use a proper (de)serializer: https://github.com/massalabs/massa/pull/3745#discussion_r1169733161
/// Reads the next message.
pub fn next_timeout(
&mut self,
duration: Option<Duration>,
) -> Result<BootstrapServerMessage, BootstrapError> {
self.duplex.set_read_timeout(duration)?;

// peek the signature and message len
let peek_len = SIGNATURE_SIZE_BYTES + self.size_field_len;
let mut peek_buff = vec![0u8; peek_len];
// problematic if we can only peek some of it
while self.duplex.peek(&mut peek_buff)? < peek_len {
// TODO: Backoff spin of some sort
}
let deadline = duration.map(|d| Instant::now() + d);

// construct the signature from the peek
let sig_array = peek_buff.as_slice()[0..SIGNATURE_SIZE_BYTES]
.try_into()
.expect("logic error in array manipulations");
let sig = Signature::from_bytes(&sig_array)?;
// read the known-len component of the message
let mut known_len_buff = [0u8; KNOWN_PREFIX_LEN];
// TODO: handle a partial read
self.read_exact_timeout(&mut known_len_buff, deadline)
.map_err(|(err, _consumed)| err)?;

// construct the message len from the peek
let msg_len = u32::from_be_bytes_min(
&peek_buff[SIGNATURE_SIZE_BYTES..],
self.cfg.max_bootstrap_message_size,
)?
.0;
let ServerMessageLeader { sig, msg_len } = self.decode_msg_leader(&known_len_buff)?;

// Update this bindings "most recently received" message hash, retaining the replaced value
let message_deserializer = BootstrapServerMessageDeserializer::new((&self.cfg).into());
Expand All @@ -106,11 +94,13 @@ impl BootstrapClientBinder {

let message = {
if let Some(prev_msg) = prev_msg {
// Consume the stream, and discard the peek
let mut stream_bytes = vec![0u8; peek_len + (msg_len as usize)];
// TODO: under the hood, this isn't actually atomic. For now, we use the ostrich algorithm.
self.duplex.read_exact(&mut stream_bytes[..])?;
let msg_bytes = &mut stream_bytes[peek_len..];
// Consume the rest of the message from the stream
let mut stream_bytes = vec![0u8; msg_len as usize];

// TODO: handle a partial read
self.read_exact_timeout(&mut stream_bytes[..], deadline)
.map_err(|(e, _consumed)| e)?;
let msg_bytes = &mut stream_bytes[..];

// prepend the received message with the previous messages hash, and derive the new hash.
// TODO: some sort of recovery if this fails?
Expand All @@ -124,11 +114,13 @@ impl BootstrapClientBinder {
.map_err(|err| BootstrapError::DeserializeError(format!("{}", err)))?;
msg
} else {
// Consume the stream and discard the peek
let mut stream_bytes = vec![0u8; peek_len + msg_len as usize];
// TODO: under the hood, this isn't actually atomic. For now, we use the ostrich algorithm.
self.duplex.read_exact(&mut stream_bytes[..])?;
let sig_msg_bytes = &mut stream_bytes[peek_len..];
// Consume the rest of the message from the stream
let mut stream_bytes = vec![0u8; msg_len as usize];

// TODO: handle a partial read
self.read_exact_timeout(&mut stream_bytes[..], deadline)
.map_err(|(e, _)| e)?;
let sig_msg_bytes = &mut stream_bytes[..];

// Compute the hash and verify
let msg_hash = Hash::compute_from(sig_msg_bytes);
Expand Down Expand Up @@ -158,6 +150,7 @@ impl BootstrapClientBinder {
BootstrapError::GeneralError(format!("bootstrap message too large to encode: {}", e))
})?;

let mut write_buf = Vec::new();
if let Some(prev_message) = self.prev_message {
// there was a previous message
let prev_message = prev_message.to_bytes();
Expand All @@ -169,24 +162,55 @@ impl BootstrapClientBinder {
hash_data.extend(&msg_bytes);
self.prev_message = Some(Hash::compute_from(&hash_data));

// send old previous message
self.duplex.write_all(prev_message)?;
// Provide the signature saved as the previous message
write_buf.extend(prev_message);
} else {
// there was no previous message

//update current previous message
// No previous message, so we set the hash-chain genesis to the hash of the first msg
self.prev_message = Some(Hash::compute_from(&msg_bytes));
}

// send message length
{
self.duplex.set_write_timeout(duration)?;
let msg_len_bytes = msg_len.to_be_bytes_min(self.cfg.max_bootstrap_message_size)?;
self.duplex.write_all(&msg_len_bytes)?;
}
// Provide the message length
self.duplex.set_write_timeout(duration)?;
let msg_len_bytes = msg_len.to_be_bytes_min(MAX_BOOTSTRAP_MESSAGE_SIZE)?;
write_buf.extend(&msg_len_bytes);

// Provide the message
write_buf.extend(&msg_bytes);

// send message
self.duplex.write_all(&msg_bytes)?;
// And send it off
self.duplex.write_all(&write_buf)?;
Ok(())
}

/// We are using this instead of of our library deserializer as the process is relatively straight forward
/// and makes error-type management cleaner
fn decode_msg_leader(
&self,
leader_buff: &[u8; SIGNATURE_SIZE_BYTES + MAX_BOOTSTRAP_MESSAGE_SIZE_BYTES],
) -> Result<ServerMessageLeader, BootstrapError> {
let sig_array = leader_buff[0..SIGNATURE_SIZE_BYTES]
.try_into()
.expect("logic error in array manipulations");
let sig = Signature::from_bytes(&sig_array)?;

// construct the message len from the leader-bufff
let msg_len = u32::from_be_bytes_min(
&leader_buff[SIGNATURE_SIZE_BYTES..],
MAX_BOOTSTRAP_MESSAGE_SIZE,
)?
.0;
Ok(ServerMessageLeader { sig, msg_len })
}
}

impl crate::bindings::BindingReadExact for BootstrapClientBinder {
fn set_read_timeout(&mut self, duration: Option<Duration>) -> Result<(), std::io::Error> {
self.duplex.set_read_timeout(duration)
}
}

impl std::io::Read for BootstrapClientBinder {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
self.duplex.read(buf)
}
}
Loading