Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Framing refactor: framing.rs api #1033

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benches/benches/src/sv2/iai_sv2_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn client_sv2_setup_connection_serialize_deserialize() {
let mut dst = vec![0; size];
frame.serialize(&mut dst);
let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap();
let type_ = frame.get_header().unwrap().msg_type().clone();
let type_ = frame.header().msg_type().clone();
let payload = frame.payload().unwrap();
black_box(AnyMessage::try_from((type_, payload)));
}
Expand Down Expand Up @@ -77,7 +77,7 @@ fn client_sv2_open_channel_serialize_deserialize() {
let mut dst = vec![0; size];
frame.serialize(&mut dst);
let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap();
let type_ = frame.get_header().unwrap().msg_type().clone();
let type_ = frame.header().msg_type().clone();
let payload = frame.payload().unwrap();
black_box(AnyMessage::try_from((type_, payload)));
}
Expand Down Expand Up @@ -127,7 +127,7 @@ fn client_sv2_mining_message_submit_standard_serialize_deserialize() {
let mut dst = vec![0; size];
frame.serialize(&mut dst);
let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap();
let type_ = frame.get_header().unwrap().msg_type().clone();
let type_ = frame.header().msg_type().clone();
let payload = frame.payload().unwrap();
black_box(AnyMessage::try_from((type_, payload)));
}
Expand Down
2 changes: 1 addition & 1 deletion examples/interop-cpp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ mod main_ {
let buffer = decoder.writable();
stream.read_exact(buffer).unwrap();
if let Ok(mut f) = decoder.next_frame() {
let msg_type = f.get_header().unwrap().msg_type();
let msg_type = f.header().msg_type();
let payload = f.payload().unwrap();
let message: Sv2Message = (msg_type, payload).try_into().unwrap();
match message {
Expand Down
4 changes: 2 additions & 2 deletions protocols/v2/framing-sv2/src/framing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ impl<T: Serialize + GetSize, B: AsMut<[u8]> + AsRef<[u8]>> Sv2Frame<T, B> {
}

/// `Sv2Frame` always returns `Some(self.header)`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should update these comments so that they are coherent with the new behavior of this function

updating the comments is especially important given that these are used on the Rust Docs, so they need to follow these changes accordingly

pub fn get_header(&self) -> Option<crate::header::Header> {
Some(self.header)
pub fn header(&self) -> crate::header::Header {
self.header
}

/// Tries to build a `Sv2Frame` from raw bytes, assuming they represent a serialized `Sv2Frame` frame (`Self.serialized`).
Expand Down
27 changes: 12 additions & 15 deletions protocols/v2/sv2-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,7 @@ pub extern "C" fn next_frame(decoder: *mut DecoderWrapper) -> CResult<CSv2Messag

match decoder.0.next_frame() {
Ok(mut f) => {
let msg_type = match f.get_header() {
Some(header) => header.msg_type(),
None => return CResult::Err(Sv2Error::InvalidSv2Frame),
};
let msg_type = f.header().msg_type();
let payload = match f.payload() {
Some(payload) => payload,
None => return CResult::Err(Sv2Error::InvalidSv2Frame),
Expand Down Expand Up @@ -763,7 +760,7 @@ mod tests {

let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let msg_type = decoded.header().msg_type();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Expand Down Expand Up @@ -815,7 +812,7 @@ mod tests {
let mut decoded = decoder.next_frame().unwrap();

// Extract payload of the frame which is the NewTemplate message
let msg_type = decoded.get_header().unwrap().msg_type();
let msg_type = decoded.header().msg_type();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Expand Down Expand Up @@ -863,7 +860,7 @@ mod tests {

let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let msg_type = decoded.header().msg_type();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Expand Down Expand Up @@ -913,7 +910,7 @@ mod tests {

let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let msg_type = decoded.header().msg_type();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Expand Down Expand Up @@ -963,7 +960,7 @@ mod tests {

let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let msg_type = decoded.header().msg_type();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Expand Down Expand Up @@ -1008,7 +1005,7 @@ mod tests {

let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let msg_type = decoded.header().msg_type();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Expand Down Expand Up @@ -1053,7 +1050,7 @@ mod tests {

let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let msg_type = decoded.header().msg_type();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Expand Down Expand Up @@ -1111,7 +1108,7 @@ mod tests {

let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let msg_type = decoded.header().msg_type();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Expand Down Expand Up @@ -1147,7 +1144,7 @@ mod tests {

let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let msg_type = decoded.header().msg_type();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Expand Down Expand Up @@ -1196,7 +1193,7 @@ mod tests {

let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let msg_type = decoded.header().msg_type();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Expand Down Expand Up @@ -1245,7 +1242,7 @@ mod tests {

let mut decoded = decoder.next_frame().unwrap();

let msg_type = decoded.get_header().unwrap().msg_type();
let msg_type = decoded.header().msg_type();
let payload = decoded.payload().unwrap();
let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap();
let decoded_message = match decoded_message {
Expand Down
4 changes: 2 additions & 2 deletions roles/jd-client/src/lib/downstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl DownstreamMiningNode {

/// Parse the received message and relay it to the right upstream
pub async fn next(self_mutex: &Arc<Mutex<Self>>, mut incoming: StdFrame) {
let message_type = incoming.get_header().unwrap().msg_type();
let message_type = incoming.header().msg_type();
let payload = incoming.payload().unwrap();

let routing_logic = roles_logic_sv2::routing_logic::MiningRoutingLogic::None;
Expand Down Expand Up @@ -707,7 +707,7 @@ pub async fn listen_for_downstream_mining(
);

let mut incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let message_type = incoming.header().msg_type();
let payload = incoming.payload().unwrap();
let routing_logic = roles_logic_sv2::routing_logic::CommonRoutingLogic::None;
let node = Arc::new(Mutex::new(node));
Expand Down
2 changes: 1 addition & 1 deletion roles/jd-client/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl JobDeclarator {
let receiver = self_mutex.safe_lock(|d| d.receiver.clone()).unwrap();
loop {
let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let message_type = incoming.header().msg_type();
let payload = incoming.payload().unwrap();
let next_message_to_send =
ParseServerJobDeclarationMessages::handle_message_job_declaration(
Expand Down
2 changes: 1 addition & 1 deletion roles/jd-client/src/lib/job_declarator/setup_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl SetupConnectionHandler {

let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap();

let message_type = incoming.get_header().unwrap().msg_type();
let message_type = incoming.header().msg_type();
let payload = incoming.payload().unwrap();
ParseUpstreamCommonMessages::handle_message_common(
Arc::new(Mutex::new(SetupConnectionHandler {})),
Expand Down
8 changes: 4 additions & 4 deletions roles/jd-client/src/lib/template_receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl TemplateRx {
let received = handle_result!(tx_status.clone(), receiver.recv().await);
let mut frame: StdFrame =
handle_result!(tx_status.clone(), received.try_into());
let message_type = frame.get_header().unwrap().msg_type();
let message_type = frame.header().msg_type();
let payload = frame.payload().expect("No payload set");

let next_message_to_send =
Expand Down Expand Up @@ -280,7 +280,7 @@ impl TemplateRx {
_ => {
error!("{:?}", frame);
error!("{:?}", frame.payload());
error!("{:?}", frame.get_header());
error!("{:?}", frame.header());
std::process::exit(1);
}
}
Expand All @@ -289,14 +289,14 @@ impl TemplateRx {
error!("{:?}", m);
error!("{:?}", frame);
error!("{:?}", frame.payload());
error!("{:?}", frame.get_header());
error!("{:?}", frame.header());
std::process::exit(1);
}
Err(e) => {
error!("{:?}", e);
error!("{:?}", frame);
error!("{:?}", frame.payload());
error!("{:?}", frame.get_header());
error!("{:?}", frame.header());
std::process::exit(1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl SetupConnectionHandler {
.expect("Connection to TP closed!")
.try_into()
.expect("Failed to parse incoming SetupConnectionResponse");
let message_type = incoming.get_header().unwrap().msg_type();
let message_type = incoming.header().msg_type();
let payload = incoming.payload().unwrap();
ParseUpstreamCommonMessages::handle_message_common(
Arc::new(Mutex::new(SetupConnectionHandler {})),
Expand Down
15 changes: 2 additions & 13 deletions roles/jd-client/src/lib/upstream_sv2/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,7 @@ impl Upstream {
};

// Gets the binary frame message type from the message header
let message_type = if let Some(header) = incoming.get_header() {
header.msg_type()
} else {
return Err(framing_sv2::Error::ExpectedHandshakeFrame.into());
};
let message_type = incoming.header().msg_type();
// Gets the message payload
let payload = match incoming.payload() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a place where clippy would complain to use if let Some . But if clippy is happy I'm as well. Maybe since you are at it you could just remove the above comment.

Some(payload) => payload,
Expand Down Expand Up @@ -329,14 +325,7 @@ impl Upstream {
let mut incoming: StdFrame = handle_result!(tx_status, incoming.try_into());
// On message receive, get the message type from the message header and get the
// message payload
let message_type =
incoming
.get_header()
.ok_or(super::super::error::Error::FramingSv2(
framing_sv2::Error::ExpectedSv2Frame,
));

let message_type = handle_result!(tx_status, message_type).msg_type();
let message_type = incoming.header().msg_type();

let payload = incoming.payload().expect("Payload not found");

Expand Down
5 changes: 1 addition & 4 deletions roles/jd-server/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,7 @@ impl JobDeclaratorDownstream {
match recv.recv().await {
Ok(message) => {
let mut frame: StdFrame = handle_result!(tx_status, message.try_into());
let header = frame
.get_header()
.ok_or_else(|| JdsError::Custom(String::from("No header set")));
let header = handle_result!(tx_status, header);
let header = frame.header();
let message_type = header.msg_type();
let payload = match frame.payload() {
Some(p) => p,
Expand Down
4 changes: 2 additions & 2 deletions roles/mining-proxy/src/lib/downstream_mining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl DownstreamMiningNode {

/// Parse the received message and relay it to the right upstream
pub async fn next(self_mutex: Arc<Mutex<Self>>, mut incoming: StdFrame) {
let message_type = incoming.get_header().unwrap().msg_type();
let message_type = incoming.header().msg_type();
let payload = incoming.payload().unwrap();

let routing_logic = super::get_routing_logic();
Expand Down Expand Up @@ -452,7 +452,7 @@ pub async fn listen_for_downstream_mining(

let mut incoming: StdFrame =
node.receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let message_type = incoming.header().msg_type();
let payload = incoming.payload().unwrap();
let routing_logic = super::get_common_routing_logic();
let node = Arc::new(Mutex::new(node));
Expand Down
8 changes: 4 additions & 4 deletions roles/mining-proxy/src/lib/upstream_mining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ impl UpstreamMiningNode {
.unwrap()
.unwrap();

let message_type = response.get_header().unwrap().msg_type();
let message_type = response.header().msg_type();
let payload = response.payload().unwrap();
match (message_type, payload).try_into() {
Ok(CommonMessages::SetupConnectionSuccess(_)) => {
Expand Down Expand Up @@ -575,7 +575,7 @@ impl UpstreamMiningNode {
}

pub async fn next(self_mutex: Arc<Mutex<Self>>, mut incoming: StdFrame) {
let message_type = incoming.get_header().unwrap().msg_type();
let message_type = incoming.header().msg_type();
let payload = incoming.payload().unwrap();

let routing_logic = super::get_routing_logic();
Expand Down Expand Up @@ -613,7 +613,7 @@ impl UpstreamMiningNode {
.unwrap()
.unwrap();

let message_type = response.get_header().unwrap().msg_type();
let message_type = response.header().msg_type();
let payload = response.payload().unwrap();
match (message_type, payload).try_into() {
Ok(CommonMessages::SetupConnectionSuccess(m)) => {
Expand Down Expand Up @@ -859,7 +859,7 @@ impl UpstreamMiningNode {
// #[cfg(test)]
// #[allow(unused)]
// pub async fn next_faster(&mut self, mut incoming: StdFrame) {
// let message_type = incoming.get_header().unwrap().msg_type();
// let message_type = incoming.header().msg_type();

// // When a channel is opened we need to setup the channel id in order to relay next messages
// // to the right Downstream
Expand Down
5 changes: 1 addition & 4 deletions roles/pool/src/lib/mining_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,7 @@ impl Downstream {
}

pub async fn next(self_mutex: Arc<Mutex<Self>>, mut incoming: StdFrame) -> PoolResult<()> {
let message_type = incoming
.get_header()
.ok_or_else(|| PoolError::Custom(String::from("No header set")))?
.msg_type();
let message_type = incoming.header().msg_type();
let payload = match incoming.payload() {
Some(p) => p,
None => return Err(PoolError::Custom(String::from("No payload set"))),
Fi3 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
5 changes: 1 addition & 4 deletions roles/pool/src/lib/mining_pool/setup_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ impl SetupConnectionHandler {
}
};

let message_type = incoming
.get_header()
.ok_or_else(|| PoolError::Custom(String::from("No header set")))?
.msg_type();
let message_type = incoming.header().msg_type();
let payload = match incoming.payload() {
Some(p) => p,
None => return Err(PoolError::Custom(String::from("No payload set"))),
Expand Down
5 changes: 1 addition & 4 deletions roles/pool/src/lib/template_receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,7 @@ impl TemplateRx {
.try_into()
.map_err(|e| PoolError::Codec(codec_sv2::Error::FramingSv2Error(e)))
);
let message_type_res = message_from_tp
.get_header()
.ok_or_else(|| PoolError::Custom(String::from("No header set")));
let message_type = handle_result!(status_tx, message_type_res).msg_type();
let message_type = message_from_tp.header().msg_type();
let payload = match message_from_tp.payload() {
Some(p) => p,
None => {
Expand Down
5 changes: 1 addition & 4 deletions roles/pool/src/lib/template_receiver/setup_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ impl SetupConnectionHandler {
.await?
.try_into()
.map_err(|e| PoolError::Codec(codec_sv2::Error::FramingSv2Error(e)))?;
let message_type = incoming
.get_header()
.ok_or_else(|| PoolError::Custom(String::from("No header set")))?
.msg_type();
let message_type = incoming.header().msg_type();
let payload = match incoming.payload() {
Some(p) => p,
None => return Err(PoolError::Custom(String::from("No payload set"))),
Expand Down
4 changes: 2 additions & 2 deletions roles/test-utils/mining-device/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl SetupConnectionHandler {
info!("Setup connection sent to {}", address);

let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let message_type = incoming.header().msg_type();
let payload = incoming.payload().unwrap();
ParseUpstreamCommonMessages::handle_message_common(
self_,
Expand Down Expand Up @@ -315,7 +315,7 @@ impl Device {

loop {
let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let message_type = incoming.header().msg_type();
let payload = incoming.payload().unwrap();
let next = Device::handle_message_mining(
self_mutex.clone(),
Expand Down
Loading