Skip to content

Commit

Permalink
receive payjoin
Browse files Browse the repository at this point in the history
  • Loading branch information
DanGould committed Jan 16, 2024
1 parent c068595 commit db8e415
Show file tree
Hide file tree
Showing 15 changed files with 773 additions and 1 deletion.
85 changes: 85 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ futures = "0.3.30"
url = "2.5.0"
rand = "0.8.5"
bdk = "0.29.0"
payjoin = { version = "0.13.0", features = ["base64", "receive"] }
lazy_static = "1.4.0"
opentelemetry = { version = "0.21.0" }
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] }
serde_with = "3.4.0"
electrum-client = "0.18.0"
reqwest = { version = "0.11.23", default-features = false, features = ["json", "rustls-tls"] }
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = { version = "0.24", optional = true }
tonic_lnd = { version = "0.5.0", features = ["tracing"] }
async-trait = "0.1"
base64 = "0.21.5"
Expand Down
1 change: 1 addition & 0 deletions src/api/server/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ impl From<PayoutQueue> for proto::PayoutQueue {
consolidate_deprecated_keychains: payout_queue.config.consolidate_deprecated_keychains,
cpfp_payouts_after_mins: payout_queue.config.cpfp_payouts_after_mins,
cpfp_payouts_after_blocks: payout_queue.config.cpfp_payouts_after_blocks,
can_payjoin_preempt: payout_queue.config.can_payjoin_preempt,
force_min_change_sats: payout_queue.config.force_min_change_sats.map(u64::from),
});
proto::PayoutQueue {
Expand Down
16 changes: 16 additions & 0 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
job,
ledger::*,
outbox::*,
payjoin::{config::*, *},
payout::*,
payout_queue::*,
primitives::*,
Expand Down Expand Up @@ -97,6 +98,14 @@ impl App {
config.jobs.respawn_all_outbox_handlers_delay,
)
.await?;
let pj = PayjoinReceiver::new(
PayjoinConfig { listen_port: 8088 },
addresses.clone(),
utxos.clone(),
wallets.clone(),
config.blockchain.network,
);
Self::spawn_payjoin_receiver(pj).await?;
let app = Self {
outbox,
profiles: Profiles::new(&pool),
Expand Down Expand Up @@ -1067,4 +1076,11 @@ impl App {
});
Ok(())
}

async fn spawn_payjoin_receiver(pj: PayjoinReceiver) -> Result<(), ApplicationError> {
tokio::spawn(async move {
crate::payjoin::start(pj).await;
});
Ok(())
}
}
56 changes: 56 additions & 0 deletions src/job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,21 @@ pub async fn spawn_process_payout_queue(
.await
}

pub async fn spawn_payjoin_payout_queue(
pool: &sqlx::PgPool,
data: impl Into<ProcessPayoutQueueData>,
) -> Result<ProcessPayoutQueueData, JobError> {
let data = data.into();
onto_account_main_channel(
pool,
data.account_id,
Uuid::new_v4(),
"payjoin_payout_queue",
data,
)
.await
}

#[job(name = "schedule_process_payout_queue")]
async fn schedule_process_payout_queue(mut current_job: CurrentJob) -> Result<(), JobError> {
let pool = current_job.pool().clone();
Expand Down Expand Up @@ -296,6 +311,47 @@ async fn process_payout_queue(
Ok(())
}

#[job(name = "payjoin_payout_queue")]
async fn payjoin_payout_queue(
mut current_job: CurrentJob,
payouts: Payouts,
wallets: Wallets,
utxos: Utxos,
payout_queues: PayoutQueues,
batches: Batches,
fees_client: FeesClient,
) -> Result<(), JobError> {
let pool = current_job.pool().clone();
JobExecutor::builder(&mut current_job)
.initial_retry_delay(std::time::Duration::from_secs(2))
.build()
.expect("couldn't build JobExecutor")
.execute(|data| async move {
let data: ProcessPayoutQueueData = data.expect("no ProcessPayoutQueueData available");
let (data, res) = process_payout_queue::execute_payjoin(
pool,
payouts,
wallets,
payout_queues,
batches,
utxos,
data,
fees_client,
)
.await?;
if let Some((mut tx, wallet_ids)) = res {
for id in wallet_ids {
spawn_batch_wallet_accounting(&mut tx, (&data, id)).await?;
}
spawn_batch_signing(tx, &data).await?;
}

Ok::<_, JobError>(data)
})
.await?;
Ok(())
}

#[job(
name = "batch_wallet_accounting",
channel_name = "wallet_accounting",
Expand Down
Loading

0 comments on commit db8e415

Please sign in to comment.