Skip to content

Commit

Permalink
chore: Post process position state on inbound dlc message
Browse files Browse the repository at this point in the history
Instead of processing the position state on the outbound dlc message. IMHO the logic is easier to understand that way and the code is more cohesive that way. (at least all the code that needs to be refactored is now in one place :)
  • Loading branch information
holzeis committed Jan 16, 2024
1 parent f316aea commit 14a2169
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ CREATE TYPE "Message_Type_Type" AS ENUM (
);

CREATE TABLE "dlc_messages" (
-- We need to store the hash as TEXT as the BIGINT type overflows on some u64 values breaking the hash value.
message_hash TEXT PRIMARY KEY NOT NULL,
inbound BOOLEAN NOT NULL,
peer_id TEXT NOT NULL,
Expand Down
5 changes: 0 additions & 5 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,6 @@ async fn main() -> Result<()> {

let dlc_handler = DlcHandler::new(pool.clone(), node.clone());
let _handle =
// this handles sending outbound dlc messages as well as keeping track of what
// dlc messages have already been processed and what was the last outbound dlc message
// so it can be resend on reconnect.
//
// this does not handle the incoming dlc messages!
dlc_handler::spawn_handling_dlc_messages(dlc_handler, node_event_handler.subscribe());

let event_handler = CoordinatorEventHandler::new(node.clone(), Some(node_event_sender));
Expand Down
12 changes: 5 additions & 7 deletions coordinator/src/db/dlc_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub(crate) struct DlcMessage {
pub timestamp: OffsetDateTime,
}

pub(crate) fn get(conn: &mut PgConnection, message_hash: u64) -> QueryResult<Option<DlcMessage>> {
pub(crate) fn get(conn: &mut PgConnection, message_hash: &str) -> QueryResult<Option<DlcMessage>> {
dlc_messages::table
.filter(dlc_messages::message_hash.eq(message_hash.to_string()))
.first::<DlcMessage>(conn)
Expand All @@ -82,9 +82,9 @@ pub(crate) fn insert(
impl From<ln_dlc_node::dlc_message::DlcMessage> for DlcMessage {
fn from(value: ln_dlc_node::dlc_message::DlcMessage) -> Self {
Self {
message_hash: value.message_hash.to_string(),
message_hash: value.message_hash,
peer_id: value.peer_id.to_string(),
message_type: MessageType::from(value.clone().message_type),
message_type: MessageType::from(value.message_type),
timestamp: value.timestamp,
inbound: value.inbound,
}
Expand Down Expand Up @@ -117,11 +117,9 @@ impl From<ln_dlc_node::dlc_message::DlcMessageType> for MessageType {
impl From<DlcMessage> for ln_dlc_node::dlc_message::DlcMessage {
fn from(value: DlcMessage) -> Self {
Self {
message_hash: u64::from_str(&value.message_hash).expect("valid u64"),
message_hash: value.message_hash,
inbound: value.inbound,
message_type: ln_dlc_node::dlc_message::DlcMessageType::from(
value.clone().message_type,
),
message_type: ln_dlc_node::dlc_message::DlcMessageType::from(value.message_type),
peer_id: PublicKey::from_str(&value.peer_id).expect("valid public key"),
timestamp: value.timestamp,
}
Expand Down
2 changes: 1 addition & 1 deletion coordinator/src/db/last_outbound_dlc_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub(crate) fn upsert(
) -> Result<()> {
let values = (
last_outbound_dlc_messages::peer_id.eq(peer_id.to_string()),
last_outbound_dlc_messages::message_hash.eq(sdm.generate_hash().to_string()),
last_outbound_dlc_messages::message_hash.eq(sdm.generate_hash()),
last_outbound_dlc_messages::message.eq(sdm.message),
);
let affected_rows = diesel::insert_into(last_outbound_dlc_messages::table)
Expand Down
5 changes: 5 additions & 0 deletions coordinator/src/dlc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ impl DlcHandler {
}
}

/// [`spawn_handling_dlc_messages`] handles sending outbound dlc messages as well as keeping track
/// of what dlc messages have already been processed and what was the last outbound dlc message
/// so it can be resend on reconnect.
///
/// It does not handle the incoming dlc messages!
pub fn spawn_handling_dlc_messages(
dlc_handler: DlcHandler,
mut receiver: broadcast::Receiver<NodeEvent>,
Expand Down
46 changes: 30 additions & 16 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,21 @@ impl Node {
}
}

/// [`process_dlc_message`] processes incoming dlc channel messages and updates the 10101
/// position accordingly.
/// - Any other message will be ignored.
/// - Any dlc channel message that has already been processed will be skipped.
///
/// If an offer is received [`ChannelMessage::Offer`], [`ChannelMessage::SettleOffer`],
/// [`ChannelMessage::CollaborativeCloseOffer`] [`ChannelMessage::RenewOffer`] will get
/// automatically accepted. Unless the maturity date of the offer is already outdated.
///
/// FIXME(holzeis): This function manipulates different data objects in different data sources
/// and should use a transaction to make all changes atomic. Not doing so risks of ending up in
/// an inconsistent state. One way of fixing that could be to
/// (1) use a single data source for the 10101 data and the rust-dlc data.
/// (2) wrap the function into a db transaction which can be atomically rolled back on error or
/// committed on success.
fn process_dlc_message(&self, node_id: PublicKey, msg: Message) -> Result<()> {
tracing::info!(
from = %node_id,
Expand All @@ -608,7 +623,7 @@ impl Node {
let mut conn = self.pool.get()?;
let serialized_inbound_message = SerializedDlcMessage::try_from(&msg)?;
let inbound_msg = DlcMessage::new(node_id, serialized_inbound_message, true)?;
match db::dlc_messages::get(&mut conn, inbound_msg.message_hash)? {
match db::dlc_messages::get(&mut conn, &inbound_msg.message_hash)? {
Some(_) => {
tracing::debug!(%node_id, kind=%dlc_message_name(&msg), "Received message that has already been processed, skipping.");
return Ok(());
Expand Down Expand Up @@ -675,28 +690,27 @@ impl Node {
self.inner
.accept_dlc_channel_collaborative_close(close_offer.channel_id)?;
}
ChannelMessage::Accept(accept_channel) => {
let channel_id_hex_string = accept_channel.temporary_channel_id.to_hex();
tracing::info!(
channel_id = channel_id_hex_string,
node_id = node_id.to_string(),
"DLC channel open protocol was accepted"
);
let mut connection = self.pool.get()?;
db::positions::Position::update_proposed_position(
&mut connection,
node_id.to_string(),
PositionState::Open,
)?;
}
_ => {}
};

resp
}
};

if let Some(Message::Channel(ChannelMessage::Sign(sign_channel))) = &resp {
let channel_id_hex_string = sign_channel.channel_id.to_hex();
tracing::info!(
channel_id = channel_id_hex_string,
node_id = node_id.to_string(),
"DLC channel open protocol was finalized"
);
let mut connection = self.pool.get()?;
db::positions::Position::update_proposed_position(
&mut connection,
node_id.to_string(),
PositionState::Open,
)?;
}

if let Some(msg) = resp {
tracing::info!(
to = %node_id,
Expand Down
17 changes: 9 additions & 8 deletions crates/ln-dlc-node/src/dlc_message.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use anyhow::Result;
use bitcoin::hashes::hex::ToHex;
use bitcoin::secp256k1::PublicKey;
use dlc_messages::ChannelMessage;
use dlc_messages::Message;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hash;
use std::hash::Hasher;
use sha2::digest::FixedOutput;
use sha2::Digest;
use sha2::Sha256;
use time::OffsetDateTime;
use ureq::serde_json;

#[derive(Clone)]
pub struct DlcMessage {
pub message_hash: u64,
pub message_hash: String,
pub inbound: bool,
pub peer_id: PublicKey,
pub message_type: DlcMessageType,
Expand Down Expand Up @@ -42,10 +43,10 @@ pub struct SerializedDlcMessage {
}

impl SerializedDlcMessage {
pub fn generate_hash(&self) -> u64 {
let mut hasher = DefaultHasher::new();
self.hash(&mut hasher);
hasher.finish()
pub fn generate_hash(&self) -> String {
let mut hasher = Sha256::new();
hasher.update(self.message.as_bytes());
hasher.finalize_fixed().to_hex()
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/ln-dlc-node/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl Node<TenTenOneInMemoryStorage, InMemoryStore> {
Ok(NodeEvent::Connected { .. }) => {} // ignored
Err(_) => {
tracing::error!(
"Failed to receive message from node even handler channel."
"Failed to receive message from node event handler channel."
);
break;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
CREATE TABLE "dlc_messages" (
-- We need to store the hash as TEXT as the BIGINT type overflows on some u64 values breaking the hash value.
message_hash TEXT PRIMARY KEY NOT NULL,
inbound BOOLEAN NOT NULL,
peer_id TEXT NOT NULL,
Expand Down
8 changes: 4 additions & 4 deletions mobile/native/src/db/dlc_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub enum MessageType {
impl DlcMessage {
pub(crate) fn get(
conn: &mut SqliteConnection,
message_hash: u64,
message_hash: &str,
) -> QueryResult<Option<ln_dlc_node::dlc_message::DlcMessage>> {
let result = schema::dlc_messages::table
.filter(schema::dlc_messages::message_hash.eq(message_hash.to_string()))
Expand All @@ -77,9 +77,9 @@ impl DlcMessage {
impl From<ln_dlc_node::dlc_message::DlcMessage> for DlcMessage {
fn from(value: ln_dlc_node::dlc_message::DlcMessage) -> Self {
Self {
message_hash: value.message_hash.to_string(),
message_hash: value.clone().message_hash,
peer_id: value.peer_id.to_string(),
message_type: MessageType::from(value.clone().message_type),
message_type: MessageType::from(value.message_type),
timestamp: value.timestamp.unix_timestamp(),
inbound: value.inbound,
}
Expand Down Expand Up @@ -115,7 +115,7 @@ impl From<DlcMessage> for ln_dlc_node::dlc_message::DlcMessage {
ln_dlc_node::dlc_message::DlcMessageType::from(value.clone().message_type);

Self {
message_hash: u64::from_str(&value.message_hash).expect("valid u64"),
message_hash: value.message_hash,
inbound: value.inbound,
message_type: dlc_message_type,
peer_id: PublicKey::from_str(&value.peer_id).expect("valid public key"),
Expand Down
46 changes: 30 additions & 16 deletions mobile/native/src/ln_dlc/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,21 @@ impl Node {
}
}

/// [`process_dlc_message`] processes incoming dlc channel messages and updates the 10101
/// position accordingly.
/// - Any other message will be ignored.
/// - Any dlc channel message that has already been processed will be skipped.
///
/// If an offer is received [`ChannelMessage::Offer`], [`ChannelMessage::SettleOffer`],
/// [`ChannelMessage::RenewOffer`] will get automatically accepted. Unless the maturity date of
/// the offer is already outdated.
///
/// FIXME(holzeis): This function manipulates different data objects in different data sources
/// and should use a transaction to make all changes atomic. Not doing so risks of ending up in
/// an inconsistent state. One way of fixing that could be to
/// (1) use a single data source for the 10101 data and the rust-dlc data.
/// (2) wrap the function into a db transaction which can be atomically rolled back on error or
/// committed on success.
fn process_dlc_message(&self, node_id: PublicKey, msg: Message) -> Result<()> {
tracing::info!(
from = %node_id,
Expand All @@ -152,13 +167,13 @@ impl Node {
Message::OnChain(_) | Message::SubChannel(_) => {
tracing::warn!("Ignoring unexpected dlc message.");
None
},
}
Message::Channel(channel_msg) => {
let inbound_msg = {
let mut conn = db::connection()?;
let serialized_inbound_message = SerializedDlcMessage::try_from(&msg)?;
let inbound_msg = DlcMessage::new(node_id, serialized_inbound_message, true)?;
match db::dlc_messages::DlcMessage::get(&mut conn, inbound_msg.message_hash)? {
match db::dlc_messages::DlcMessage::get(&mut conn, &inbound_msg.message_hash)? {
Some(_) => {
tracing::debug!(%node_id, kind=%dlc_message_name(&msg), "Received message that has already been processed, skipping.");
return Ok(());
Expand Down Expand Up @@ -267,6 +282,19 @@ impl Node {
BackgroundTask::RecoverDlc(TaskStatus::Success),
));
}
ChannelMessage::SettleConfirm(_) => {
tracing::debug!("Position based on DLC channel is being closed");

let filled_order = order::handler::order_filled()?;

position::handler::update_position_after_dlc_closure(Some(filled_order))
.context("Failed to update position after DLC closure")?;

// In case of a restart.
event::publish(&EventInternal::BackgroundNotification(
BackgroundTask::RecoverDlc(TaskStatus::Success),
));
}
_ => (),
}

Expand All @@ -281,20 +309,6 @@ impl Node {

if let Some(msg) = resp {
self.send_dlc_message(node_id, msg.clone())?;

if let Message::Channel(ChannelMessage::SettleFinalize(_)) = msg {
tracing::debug!("Position based on DLC channel is being closed");

let filled_order = order::handler::order_filled()?;

position::handler::update_position_after_dlc_closure(Some(filled_order))
.context("Failed to update position after DLC closure")?;

// In case of a restart.
event::publish(&EventInternal::BackgroundNotification(
BackgroundTask::RecoverDlc(TaskStatus::Success),
));
}
}

Ok(())
Expand Down

0 comments on commit 14a2169

Please sign in to comment.