Skip to content

Commit

Permalink
async manager
Browse files Browse the repository at this point in the history
  • Loading branch information
bennyhodl committed Nov 26, 2024
1 parent 1409937 commit b2bede7
Show file tree
Hide file tree
Showing 11 changed files with 525 additions and 423 deletions.
4 changes: 3 additions & 1 deletion ddk-manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
authors = ["Crypto Garage", "benny b <ben@bitcoinbay.foundation>"]
description = "Creation and handling of Discrete Log Contracts (DLC)."
edition = "2018"
edition = "2021"
homepage = "https://github.com/bennyhodl/rust-dlc"
license-file = "../LICENSE"
name = "ddk-manager"
Expand All @@ -21,6 +21,7 @@ bitcoin = { version = "0.32.2", default-features = false }
ddk-dlc = { version = "0.7.0", default-features = false, path = "../ddk-dlc" }
ddk-messages = { version = "0.7.0", default-features = false, path = "../ddk-messages" }
ddk-trie = { version = "0.7.0", default-features = false, path = "../ddk-trie" }
futures = "0.3.31"
hex = { package = "hex-conservative", version = "0.1" }
lightning = { version = "0.0.125", default-features = false, features = ["grind_signatures"] }
log = "0.4.14"
Expand All @@ -43,6 +44,7 @@ secp256k1-zkp = {version = "0.11.0", features = ["hashes", "rand", "rand-std", "
serde = "1.0"
serde_json = "1.0"
simple-wallet = {path = "../simple-wallet"}
tokio = { version = "1.41.1", features = ["macros", "rt-multi-thread", "test-util"] }

[[bench]]
harness = false
Expand Down
5 changes: 3 additions & 2 deletions ddk-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,15 @@ pub trait Storage {
fn get_chain_monitor(&self) -> Result<Option<ChainMonitor>, Error>;
}

#[async_trait::async_trait]
/// Oracle trait provides access to oracle information.
pub trait Oracle {
/// Returns the public key of the oracle.
fn get_public_key(&self) -> XOnlyPublicKey;
/// Returns the announcement for the event with the given id if found.
fn get_announcement(&self, event_id: &str) -> Result<OracleAnnouncement, Error>;
async fn get_announcement(&self, event_id: &str) -> Result<OracleAnnouncement, Error>;
/// Returns the attestation for the event with the given id if found.
fn get_attestation(&self, event_id: &str) -> Result<OracleAttestation, Error>;
async fn get_attestation(&self, event_id: &str) -> Result<OracleAttestation, Error>;
}

/// Represents a UTXO.
Expand Down
140 changes: 93 additions & 47 deletions ddk-manager/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use ddk_messages::channel::{
};
use ddk_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation};
use ddk_messages::{AcceptDlc, Message as DlcMessage, OfferDlc, SignDlc};
use futures::stream;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use hex::DisplayHex;
use lightning::chain::chaininterface::FeeEstimator;
use lightning::ln::chan_utils::{
Expand Down Expand Up @@ -269,16 +272,13 @@ where
/// and an OfferDlc message returned.
///
/// This function will fetch the oracle announcements from the oracle.
pub fn send_offer(
pub async fn send_offer(
&self,
contract_input: &ContractInput,
counter_party: PublicKey,
) -> Result<OfferDlc, Error> {
let oracle_announcements = contract_input
.contract_infos
.iter()
.map(|x| self.get_oracle_announcements(&x.oracles))
.collect::<Result<Vec<_>, Error>>()?;
// If the oracle announcement fails to retrieve, then log and continue.
let oracle_announcements = self.oracle_announcements(contract_input).await?;

self.send_offer_with_announcements(contract_input, counter_party, oracle_announcements)
}
Expand Down Expand Up @@ -375,13 +375,13 @@ where

/// Function to call to check the state of the currently executing DLCs and
/// update them if possible.
pub fn periodic_check(&self, check_channels: bool) -> Result<(), Error> {
pub async fn periodic_check(&self, check_channels: bool) -> Result<(), Error> {
self.check_signed_contracts()?;
self.check_confirmed_contracts()?;
self.check_confirmed_contracts().await?;
self.check_preclosed_contracts()?;

if check_channels {
self.channel_checks()?;
self.channel_checks().await?;
}

Ok(())
Expand Down Expand Up @@ -470,7 +470,7 @@ where
Ok(())
}

fn get_oracle_announcements(
async fn get_oracle_announcements(
&self,
oracle_inputs: &OracleInput,
) -> Result<Vec<OracleAnnouncement>, Error> {
Expand All @@ -480,7 +480,8 @@ where
.oracles
.get(pubkey)
.ok_or_else(|| Error::InvalidParameters("Unknown oracle public key".to_string()))?;
announcements.push(oracle.get_announcement(&oracle_inputs.event_id)?.clone());
let announcement = oracle.get_announcement(&oracle_inputs.event_id).await?;
announcements.push(announcement);
}

Ok(announcements)
Expand Down Expand Up @@ -547,13 +548,13 @@ where
Ok(())
}

fn check_confirmed_contracts(&self) -> Result<(), Error> {
async fn check_confirmed_contracts(&self) -> Result<(), Error> {
for c in self.store.get_confirmed_contracts()? {
// Confirmed contracts from channel are processed in channel specific methods.
if c.channel_id.is_some() {
continue;
}
if let Err(e) = self.check_confirmed_contract(&c) {
if let Err(e) = self.check_confirmed_contract(&c).await {
error!(
"Error checking confirmed contract {}: {}",
c.accepted_contract.get_contract_id_string(),
Expand All @@ -565,7 +566,7 @@ where
Ok(())
}

fn get_closable_contract_info<'a>(
async fn get_closable_contract_info<'a>(
&'a self,
contract: &'a SignedContract,
) -> ClosableContractInfo<'a> {
Expand All @@ -581,26 +582,54 @@ where
.enumerate()
.collect();
if matured.len() >= contract_info.threshold {
let attestations: Vec<_> = matured
.iter()
.filter_map(|(i, announcement)| {
let oracle = self.oracles.get(&announcement.oracle_public_key)?;
let attestation = oracle
let attestations = stream::iter(matured.iter())
.map(|(i, announcement)| async move {
// First try to get the oracle
let oracle = match self.oracles.get(&announcement.oracle_public_key) {
Some(oracle) => oracle,
None => {
log::debug!(
"Oracle not found for key: {}",
announcement.oracle_public_key
);
return None;
}
};

// Then try to get the attestation
let attestation = match oracle
.get_attestation(&announcement.oracle_event.event_id)
.ok()?;
attestation
.validate(&self.secp, announcement)
.map_err(|_| {
.await
{
Ok(attestation) => attestation,
Err(e) => {
log::error!(
"Oracle attestation is not valid. pubkey={} event_id={}",
announcement.oracle_public_key,
announcement.oracle_event.event_id
)
})
.ok()?;
"Attestation not found for event. id={} error={}",
announcement.oracle_event.event_id,
e.to_string()
);
return None;
}
};

// Validate the attestation
if let Err(e) = attestation.validate(&self.secp, announcement) {
log::error!(
"Oracle attestation is not valid. pubkey={} event_id={}, error={:?}",
announcement.oracle_public_key,
announcement.oracle_event.event_id,
e
);
return None;
}

Some((*i, attestation))
})
.collect();
.collect::<FuturesUnordered<_>>()
.await
.filter_map(|result| async move { result }) // Filter out None values
.collect::<Vec<_>>()
.await;
if attestations.len() >= contract_info.threshold {
return Some((contract_info, adaptor_info, attestations));
}
Expand All @@ -609,8 +638,8 @@ where
None
}

fn check_confirmed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
let closable_contract_info = self.get_closable_contract_info(contract);
async fn check_confirmed_contract(&self, contract: &SignedContract) -> Result<(), Error> {
let closable_contract_info = self.get_closable_contract_info(contract).await;
if let Some((contract_info, adaptor_info, attestations)) = closable_contract_info {
let offer = &contract.accepted_contract.offered_contract;
let signer = self.signer_provider.derive_contract_signer(offer.keys_id)?;
Expand Down Expand Up @@ -910,16 +939,12 @@ where
{
/// Create a new channel offer and return the [`dlc_messages::channel::OfferChannel`]
/// message to be sent to the `counter_party`.
pub fn offer_channel(
pub async fn offer_channel(
&self,
contract_input: &ContractInput,
counter_party: PublicKey,
) -> Result<OfferChannel, Error> {
let oracle_announcements = contract_input
.contract_infos
.iter()
.map(|x| self.get_oracle_announcements(&x.oracles))
.collect::<Result<Vec<_>, Error>>()?;
let oracle_announcements = self.oracle_announcements(contract_input).await?;

let (offered_channel, offered_contract) = crate::channel_updater::offer_channel(
&self.secp,
Expand Down Expand Up @@ -1092,7 +1117,7 @@ where
/// Returns a [`RenewOffer`] message as well as the [`PublicKey`] of the
/// counter party's node to offer the establishment of a new contract in the
/// channel.
pub fn renew_offer(
pub async fn renew_offer(
&self,
channel_id: &ChannelId,
counter_payout: u64,
Expand All @@ -1101,11 +1126,7 @@ where
let mut signed_channel =
get_channel_in_state!(self, channel_id, Signed, None as Option<PublicKey>)?;

let oracle_announcements = contract_input
.contract_infos
.iter()
.map(|x| self.get_oracle_announcements(&x.oracles))
.collect::<Result<Vec<_>, Error>>()?;
let oracle_announcements = self.oracle_announcements(contract_input).await?;

let (msg, offered_contract) = crate::channel_updater::renew_offer(
&self.secp,
Expand Down Expand Up @@ -1300,7 +1321,7 @@ where
Ok(())
}

fn try_finalize_closing_established_channel(
async fn try_finalize_closing_established_channel(
&self,
signed_channel: SignedChannel,
) -> Result<(), Error> {
Expand All @@ -1327,6 +1348,7 @@ where

let (contract_info, adaptor_info, attestations) = self
.get_closable_contract_info(&confirmed_contract)
.await
.ok_or_else(|| {
Error::InvalidState("Could not get information to close contract".to_string())
})?;
Expand Down Expand Up @@ -2087,13 +2109,13 @@ where
Ok(())
}

fn channel_checks(&self) -> Result<(), Error> {
async fn channel_checks(&self) -> Result<(), Error> {
let established_closing_channels = self
.store
.get_signed_channels(Some(SignedChannelStateType::Closing))?;

for channel in established_closing_channels {
if let Err(e) = self.try_finalize_closing_established_channel(channel) {
if let Err(e) = self.try_finalize_closing_established_channel(channel).await {
error!("Error trying to close established channel: {}", e);
}
}
Expand Down Expand Up @@ -2590,6 +2612,30 @@ where
pnl,
})
}

async fn oracle_announcements(
&self,
contract_input: &ContractInput,
) -> Result<Vec<Vec<OracleAnnouncement>>, Error> {
let announcements = stream::iter(contract_input.contract_infos.iter())
.map(|x| {
let future = self.get_oracle_announcements(&x.oracles);
async move {
match future.await {
Ok(result) => Ok(result),
Err(e) => {
log::error!("Failed to get oracle announcements: {}", e);
Err(e)
}
}
}
})
.collect::<FuturesUnordered<_>>()
.await
.try_collect::<Vec<_>>()
.await?;
Ok(announcements)
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit b2bede7

Please sign in to comment.