Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Two phase commit for mobile packet verifier #700

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mobile_packet_verifier/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ helium-crypto = {workspace = true, features = ["sqlx-postgres", "multisig", "sol
metrics = {workspace = true}
poc-metrics = {path = "../metrics"}
prost = {workspace = true}
solana-sdk = {workspace = true}
serde = {workspace = true}
sqlx = {workspace = true}
solana = {path = "../solana"}
Expand Down
12 changes: 12 additions & 0 deletions mobile_packet_verifier/migrations/7_two_phase_commit.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TYPE solana_transaction AS (
signature TEXT NOT NULL,
amount BIGINT NOT NULL,
time_of_submission TIMESTAMPTZ NOT NULL
);

CREATE TABLE payer_totals (
payer TEXT PRIMARY KEY,
total_dcs BIGINT NOT NULL,
txn solana_transaction
);

14 changes: 13 additions & 1 deletion mobile_packet_verifier/src/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,25 @@ pub async fn accumulate_sessions(
"#
)
.bind(event.pub_key)
.bind(event.payer)
.bind(&event.payer)
.bind(event.upload_bytes as i64)
.bind(event.download_bytes as i64)
.bind(report.report.rewardable_bytes as i64)
.bind(curr_file_ts)
.execute(&mut *conn)
.await?;
sqlx::query(
r#"
INSERT INTO payer_totals (payer, total_dcs)
VALUES ($1, $2)
ON CONFLICT (payer) DO UPDATE SET
total_dcs = total_dcs + EXCLUDED.total_dcs
"#,
)
.bind(event.payer)
.bind(crate::bytes_to_dc(event.upload_bytes + event.download_bytes) as i64)
.execute(&mut *conn)
.await?;
}

Ok(())
Expand Down
165 changes: 95 additions & 70 deletions mobile_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use chrono::{DateTime, Utc};
use chrono::{DateTime, Duration, Utc};
use file_store::{file_sink::FileSinkClient, traits::TimestampEncode};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::packet_verifier::ValidDataTransferSession;
use solana::SolanaNetwork;
use solana::{GetSignature, SolanaNetwork};
use solana_sdk::signature::Signature;
use sqlx::{FromRow, Pool, Postgres};
use std::collections::HashMap;

#[derive(FromRow)]
pub struct DataTransferSession {
Expand All @@ -17,17 +17,19 @@ pub struct DataTransferSession {
last_timestamp: DateTime<Utc>,
}

#[derive(Default)]
#[derive(FromRow)]
pub struct PayerTotals {
total_dcs: u64,
sessions: Vec<DataTransferSession>,
payer: PublicKeyBinary,
total_dcs: i64,
txn: Option<SolanaTransaction>,
maplant marked this conversation as resolved.
Show resolved Hide resolved
}

impl PayerTotals {
fn push_sess(&mut self, sess: DataTransferSession) {
self.total_dcs += bytes_to_dc(sess.rewardable_bytes as u64);
self.sessions.push(sess);
}
#[derive(sqlx::Type)]
#[sqlx(type_name = "solana_transaction")]
pub struct SolanaTransaction {
signature: String,
amount: i64,
time_of_submission: DateTime<Utc>,
}

pub struct Burner<S> {
Expand All @@ -50,6 +52,8 @@ pub enum BurnError<E> {
FileStoreError(#[from] file_store::Error),
#[error("sql error: {0}")]
SqlError(#[from] sqlx::Error),
#[error("Chrono error: {0}")]
ChronoError(#[from] chrono::OutOfRangeError),
#[error("solana error: {0}")]
SolanaError(E),
}
Expand All @@ -59,60 +63,99 @@ where
S: SolanaNetwork,
{
pub async fn burn(&self, pool: &Pool<Postgres>) -> Result<(), BurnError<S::Error>> {
// Fetch all of the sessions
let sessions: Vec<DataTransferSession> =
sqlx::query_as("SELECT * FROM data_transfer_sessions")
.fetch_all(pool)
.await?;

// Fetch all of the sessions and group by the payer
let mut payer_totals = HashMap::<PublicKeyBinary, PayerTotals>::new();
for session in sessions.into_iter() {
payer_totals
.entry(session.payer.clone())
.or_default()
.push_sess(session);
}
// Fetch all of the payer totals:
let totals: Vec<PayerTotals> = sqlx::query_as("SELECT * FROM payer_totals")
.fetch_all(pool)
.await?;

for (
for PayerTotals {
payer,
PayerTotals {
total_dcs,
sessions,
},
) in payer_totals.into_iter()
total_dcs,
txn,
} in totals
{
let payer_balance = self
.solana
.payer_balance(&payer)
.await
.map_err(BurnError::SolanaError)?;

if payer_balance < total_dcs {
tracing::warn!(%payer, %payer_balance, %total_dcs, "Payer does not have enough balance to burn dcs");
continue;
}

tracing::info!(%total_dcs, %payer, "Burning DC");
if self.burn_data_credits(&payer, total_dcs).await.is_err() {
// We have failed to burn data credits:
metrics::counter!("burned", total_dcs, "payer" => payer.to_string(), "success" => "false");
continue;
let mut total_dcs = total_dcs as u64;

// Check if there is a pending transaction
if let Some(SolanaTransaction {
signature,
amount,
time_of_submission,
}) = txn
{
// Sleep for at least a minute since the time of submission to
// give the transaction plenty of time to be confirmed:
let time_since_submission = Utc::now() - time_of_submission;
if Duration::minutes(1) > time_since_submission {
jeffgrunewald marked this conversation as resolved.
Show resolved Hide resolved
tokio::time::sleep((Duration::minutes(1) - time_since_submission).to_std()?)
jeffgrunewald marked this conversation as resolved.
Show resolved Hide resolved
.await;
}

let signature: Signature = signature.parse().unwrap();
if self
.solana
.confirm_transaction(&signature)
.await
.map_err(BurnError::SolanaError)?
{
// This transaction has been confirmed. Subtract the amount confirmed from
// the total amount burned and remove the transaction.
total_dcs -= amount as u64;
}
// If the transaction has not been confirmed, we still want to remove the transaction.
// The total_dcs column remains the same.
sqlx::query("UPDATE payer_totals SET txn = NULL, total_dcs = $2 WHERE payer = $1")
.bind(&payer)
.bind(total_dcs as i64)
.execute(pool)
.await?;
}

// We succesfully managed to burn data credits:

metrics::counter!("burned", total_dcs, "payer" => payer.to_string(), "success" => "true");
// Get the current sessions we need to write, before creating any new transactions
let sessions: Vec<DataTransferSession> =
sqlx::query_as("SELECT * FROM data_transfer_session WHERE payer = $1")
.bind(&payer)
.fetch_all(pool)
.await?;

// Delete from the data transfer session and write out to S3
// Create a new transaction for the given amount, if there is any left.
// If total_dcs is zero, that means we need to clear out the current sessions as they are paid for.
if total_dcs != 0 {
let txn = self
.solana
.make_burn_transaction(&payer, total_dcs)
.await
.map_err(BurnError::SolanaError)?;
sqlx::query("UPDATE payer_totals SET txn = $2 WHERE payer = $1")
.bind(&payer)
.bind(SolanaTransaction {
signature: txn.get_signature().to_string(),
amount: total_dcs as i64,
jeffgrunewald marked this conversation as resolved.
Show resolved Hide resolved
time_of_submission: Utc::now(),
})
.execute(pool)
.await?;
// Attempt to execute the transaction
if self.solana.submit_transaction(&txn).await.is_err() {
// We have failed to burn data credits:
metrics::counter!("burned", total_dcs, "payer" => payer.to_string(), "success" => "false");
continue;
}
}

// Submit the sessions
sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1")
.bind(&payer)
.execute(pool)
.await?;

sqlx::query("DELETE FROM payer_totals WHERE payer = $1")
.bind(&payer)
.execute(pool)
maplant marked this conversation as resolved.
Show resolved Hide resolved
.await?;

for session in sessions {
let num_dcs = bytes_to_dc(session.rewardable_bytes as u64);
let num_dcs = crate::bytes_to_dc(session.rewardable_bytes as u64);
self.valid_sessions
.write(
ValidDataTransferSession {
Expand All @@ -133,22 +176,4 @@ where

Ok(())
}

async fn burn_data_credits(
&self,
payer: &PublicKeyBinary,
amount: u64,
) -> Result<(), S::Error> {
let txn = self.solana.make_burn_transaction(payer, amount).await?;
self.solana.submit_transaction(&txn).await?;
Ok(())
}
}

const BYTES_PER_DC: u64 = 20_000;

fn bytes_to_dc(bytes: u64) -> u64 {
let bytes = bytes.max(BYTES_PER_DC);
// Integer div/ceil from: https://stackoverflow.com/a/2745086
(bytes + BYTES_PER_DC - 1) / BYTES_PER_DC
}
8 changes: 8 additions & 0 deletions mobile_packet_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,11 @@ pub mod burner;
pub mod daemon;
pub mod event_ids;
pub mod settings;

const BYTES_PER_DC: u64 = 20_000;

pub fn bytes_to_dc(bytes: u64) -> u64 {
let bytes = bytes.max(BYTES_PER_DC);
// Integer div/ceil from: https://stackoverflow.com/a/2745086
(bytes + BYTES_PER_DC - 1) / BYTES_PER_DC
}