Skip to content

Commit

Permalink
Merge pull request #71 from wthrajat/cbor
Browse files Browse the repository at this point in the history
 feature: Use `serde_cbor` for network message encoding
  • Loading branch information
rajarshimaitra authored Jan 22, 2024
2 parents d413ffc + 5fe3094 commit 928d2ee
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 63 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ bip39 = {version = "1.0.1", features = ["rand"] }
bitcoin = {version = "0.30", features = ["rand"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11.2"
tokio = { version = "1.16.1", features = ["full"] }
log = "^0.4"
env_logger = "0.7"
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ If you're interested in contributing to the project, explore the [open issues](h
- [ ] Complete all unit tests in modules.
- [ ] Achieve >80% crate level test coverage ratio (including integration tests).
- [ ] Clean up and integrate fidelity bonds with maker banning.
- [ ] Switch to binary encoding for wallet data storage and network messages.
- [x] Switch to binary encoding for network messages.
- [ ] Switch to binary encoding for wallet data.
- [ ] Make tor detectable and connectable by default for Maker and Taker. And Tor configs to their config lists.
- [ ] Sketch a simple `AddressBook` server. Tor must. This is for MVP. Later on we will move to more decentralized address server architecture.
- [ ] Turn maker server into a `makerd` binary, and a `maker-cli` rpc controller app, with MVP API.
Expand Down
8 changes: 4 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use crate::protocol::error::ContractError;
#[derive(Debug)]
pub enum NetError {
IO(std::io::Error),
Json(serde_json::Error),
ReachedEOF,
ConnectionTimedOut,
Cbor(serde_cbor::Error),
}

impl From<std::io::Error> for NetError {
Expand All @@ -17,9 +17,9 @@ impl From<std::io::Error> for NetError {
}
}

impl From<serde_json::Error> for NetError {
fn from(value: serde_json::Error) -> Self {
Self::Json(value)
impl From<serde_cbor::Error> for NetError {
fn from(value: serde_cbor::Error) -> Self {
Self::Cbor(value)
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/maker/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use crate::{protocol::error::ContractError, wallet::WalletError};
#[derive(Debug)]
pub enum MakerError {
IO(std::io::Error),
Json(serde_json::Error),
UnexpectedMessage { expected: String, got: String },
General(&'static str),
MutexPossion,
Secp(secp256k1::Error),
ContractError(ContractError),
Wallet(WalletError),
Deserialize(serde_cbor::Error),
}

impl From<std::io::Error> for MakerError {
Expand All @@ -25,9 +25,9 @@ impl From<std::io::Error> for MakerError {
}
}

impl From<serde_json::Error> for MakerError {
fn from(value: serde_json::Error) -> Self {
Self::Json(value)
impl From<serde_cbor::Error> for MakerError {
fn from(value: serde_cbor::Error) -> Self {
Self::Deserialize(value)
}
}

Expand Down
55 changes: 37 additions & 18 deletions src/maker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use std::{
use bitcoin::Network;
use bitcoind::bitcoincore_rpc::RpcApi;
use tokio::{
io::{AsyncBufReadExt, BufReader},
net::TcpListener,
io::{AsyncReadExt, BufReader},
net::{tcp::ReadHalf, TcpListener},
select,
sync::mpsc,
time::sleep,
Expand Down Expand Up @@ -185,21 +185,20 @@ pub async fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
)
.await
{
log::error!("io error sending first message: {:?}", e);
log::error!("IO error sending first message: {:?}", e);
return;
}
log::info!("[{}] ===> MakerHello", maker_clone.config.port);

loop {
let mut line = String::new();
select! {
readline_ret = reader.read_line(&mut line) => {
match readline_ret {
Ok(0) => {
let message = select! {
read_result = read_taker_message(&mut reader) => {
match read_result {
Ok(None) => {
log::info!("[{}] Connection closed by peer", maker_clone.config.port);
break;
}
Ok(_) => (),
},
Ok(Some(msg)) => msg,
Err(e) => {
log::error!("error reading from socket: {:?}", e);
break;
Expand All @@ -212,31 +211,51 @@ pub async fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
},
};

line = line.trim_end().to_string();
let message: TakerToMakerMessage = serde_json::from_str(&line).unwrap();
log::info!("[{}] <=== {} ", maker_clone.config.port, message);

let message_result: Result<Option<MakerToTakerMessage>, MakerError> =
let reply: Result<Option<MakerToTakerMessage>, MakerError> =
handle_message(&maker_clone, &mut connection_state, message, addr.ip()).await;

match message_result {
match reply {
Ok(reply) => {
if let Some(message) = reply {
log::info!("[{}] ===> {} ", maker_clone.config.port, message);
log::debug!("{:#?}", message);
if let Err(e) = send_message(&mut socket_writer, &message).await {
log::error!("closing due to io error sending message: {:?}", e);
break;
log::error!("Closing due to IO error in sending message: {:?}", e);
continue;
}
}
//if reply is None then dont send anything to client
// if reply is None then don't send anything to client
}
Err(err) => {
server_loop_comms_tx.send(err).await.unwrap();
break;
}
};
}
}
});
}
}

/// Reads a Taker Message.
async fn read_taker_message(
reader: &mut BufReader<ReadHalf<'_>>,
) -> Result<Option<TakerToMakerMessage>, MakerError> {
let read_result = reader.read_u32().await;
// If its EOF, return None
if read_result
.as_ref()
.is_err_and(|e| e.kind() == std::io::ErrorKind::UnexpectedEof)
{
return Ok(None);
}
let length = read_result?;
if length == 0 {
return Ok(None);
}
let mut buffer = vec![0; length as usize];
reader.read_exact(&mut buffer).await?;
let message: TakerToMakerMessage = serde_cbor::from_slice(&buffer)?;
Ok(Some(message))
}
2 changes: 0 additions & 2 deletions src/protocol/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ pub struct PrivKeyHandover {

/// All messages sent from Taker to Maker.
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "method", rename_all = "lowercase")]
pub enum TakerToMakerMessage {
/// Protocol Handshake.
TakerHello(TakerHello),
Expand Down Expand Up @@ -290,7 +289,6 @@ pub struct ContractSigsForRecvr {

/// All messages sent from Maker to Taker.
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "method", rename_all = "lowercase")]
pub enum MakerToTakerMessage {
/// Protocol Handshake.
MakerHello(MakerHello),
Expand Down
2 changes: 2 additions & 0 deletions src/taker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,9 @@ impl Taker {
let (maker, funding_txs) = loop {
// Fail early if not enough good makers in the list to satisfy swap requirements.
let untried_maker_count = self.offerbook.get_all_untried().len();

if untried_maker_count < self.ongoing_swap_state.swap_params.maker_count as usize {
log::info!("We don't have enough makers to satisfy the swap requirements!");
return Err(TakerError::NotEnoughMakersInOfferBook);
}
let maker = self.choose_next_maker()?.clone();
Expand Down
9 changes: 3 additions & 6 deletions src/taker/offers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,9 @@ pub async fn sync_offerbook_with_addresses(
let offers_writer = offers_writer_m.clone();
let taker_config: TakerConfig = config.clone();
tokio::spawn(async move {
if let Err(_e) = offers_writer
.send(download_maker_offer(addr, taker_config).await)
.await
{
panic!("mpsc failed");
}
let offer = download_maker_offer(addr, taker_config).await;
log::info!("Received Maker Offer: {:?}", offer);
offers_writer.send(offer).await.unwrap();
});
}
let mut result = Vec::<OfferAndAddress>::new();
Expand Down
31 changes: 15 additions & 16 deletions src/taker/routines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{
},
Hash160,
},
utill::{read_message, send_message},
utill::{read_maker_message, send_message},
};

use super::{
Expand Down Expand Up @@ -84,7 +84,7 @@ pub async fn handshake_maker<'a>(
}),
)
.await?;
let makerhello = match read_message(&mut socket_reader).await {
let makerhello = match read_maker_message(&mut socket_reader).await {
Ok(MakerToTakerMessage::MakerHello(m)) => m,
Ok(any) => {
return Err(ProtocolError::WrongMessage {
Expand Down Expand Up @@ -142,7 +142,7 @@ pub(crate) async fn req_sigs_for_sender_once<S: SwapCoin>(
}),
)
.await?;
let contract_sigs_for_sender = match read_message(&mut socket_reader).await {
let contract_sigs_for_sender = match read_maker_message(&mut socket_reader).await {
Ok(MakerToTakerMessage::RespContractSigsForSender(m)) => {
if m.sigs.len() != outgoing_swapcoins.len() {
return Err(ProtocolError::WrongNumOfSigs {
Expand Down Expand Up @@ -201,7 +201,7 @@ pub(crate) async fn req_sigs_for_recvr_once<S: SwapCoin>(
}),
)
.await?;
let contract_sigs_for_recvr = match read_message(&mut socket_reader).await {
let contract_sigs_for_recvr = match read_maker_message(&mut socket_reader).await {
Ok(MakerToTakerMessage::RespContractSigsForRecvr(m)) => {
if m.sigs.len() != incoming_swapcoins.len() {
return Err(ProtocolError::WrongNumOfSigs {
Expand Down Expand Up @@ -281,7 +281,7 @@ pub(crate) async fn send_proof_of_funding_and_init_next_hop(
}),
)
.await?;
let contract_sigs_as_recvr_and_sender = match read_message(socket_reader).await {
let contract_sigs_as_recvr_and_sender = match read_maker_message(socket_reader).await {
Ok(MakerToTakerMessage::ReqContractSigsAsRecvrAndSender(m)) => {
if m.receivers_contract_txs.len() != tmi.funding_tx_infos.len() {
return Err(ProtocolError::WrongNumOfContractTxs {
Expand Down Expand Up @@ -416,7 +416,7 @@ pub(crate) async fn send_hash_preimage_and_get_private_keys(
}),
)
.await?;
let privkey_handover = match read_message(socket_reader).await {
let privkey_handover = match read_maker_message(socket_reader).await {
Ok(MakerToTakerMessage::RespPrivKeyHandover(m)) => {
if m.multisig_privkeys.len() != receivers_multisig_redeemscripts.len() {
return Err(ProtocolError::WrongNumOfPrivkeys {
Expand Down Expand Up @@ -452,16 +452,15 @@ async fn download_maker_offer_attempt_once(addr: &MakerAddress) -> Result<Offer,
)
.await?;

let offer = match read_message(&mut socket_reader).await {
Ok(MakerToTakerMessage::RespOffer(m)) => m,
Ok(any) => {
return Err(ProtocolError::WrongMessage {
let msg = read_maker_message(&mut socket_reader).await?;
let offer = match msg {
MakerToTakerMessage::RespOffer(offer) => offer,
msg => {
return Err(TakerError::Protocol(ProtocolError::WrongMessage {
expected: "RespOffer".to_string(),
received: format!("{}", any),
}
.into());
received: format!("{}", msg),
}))
}
Err(e) => return Err(e.into()),
};

log::debug!(target: "offerbook", "Obtained offer from {}", addr);
Expand All @@ -480,7 +479,7 @@ pub async fn download_maker_offer(
match ret {
Ok(offer) => return Some(OfferAndAddress { offer, address }),
Err(e) => {
log::debug!(target: "offerbook",
log::warn!(
"Failed to request offer from maker {}, \
reattempting... error={:?}",
address,
Expand All @@ -496,7 +495,7 @@ pub async fn download_maker_offer(
}
},
_ = sleep(Duration::from_secs(config.first_connect_attempt_timeout_sec)) => {
log::debug!(target: "offerbook",
log::warn!(
"Timeout for request offer from maker {}, reattempting...",
address
);
Expand Down
21 changes: 9 additions & 12 deletions src/utill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{

use serde_json::Value;
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
io::{AsyncReadExt, AsyncWriteExt, BufReader},
net::tcp::{ReadHalf, WriteHalf},
};

Expand Down Expand Up @@ -91,23 +91,20 @@ pub async fn send_message(
socket_writer: &mut WriteHalf<'_>,
message: &impl serde::Serialize,
) -> Result<(), NetError> {
let mut message_bytes = serde_json::to_vec(message).map_err(std::io::Error::from)?;
message_bytes.push(b'\n');
socket_writer.write_all(&message_bytes).await?;
let message_cbor = serde_cbor::ser::to_vec(message).map_err(NetError::Cbor)?;
socket_writer.write_u32(message_cbor.len() as u32).await?;
socket_writer.write_all(&message_cbor).await?;
Ok(())
}

/// Read a Maker Message.
pub async fn read_message(
pub async fn read_maker_message(
reader: &mut BufReader<ReadHalf<'_>>,
) -> Result<MakerToTakerMessage, NetError> {
let mut line = String::new();
let n = reader.read_line(&mut line).await?;
if n == 0 {
return Err(NetError::ReachedEOF);
}
let message: MakerToTakerMessage = serde_json::from_str(&line)?;
log::debug!("<== {:#?}", message);
let length = reader.read_u32().await?;
let mut buffer = vec![0; length as usize];
reader.read_exact(&mut buffer).await?;
let message: MakerToTakerMessage = serde_cbor::from_slice(&buffer)?;
Ok(message)
}

Expand Down

0 comments on commit 928d2ee

Please sign in to comment.