Skip to content
This repository was archived by the owner on Dec 9, 2023. It is now read-only.

add process_disclosure API #207

Merged
merged 2 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion rpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::collections::BTreeSet;
use std::thread::sleep;
use std::time::Duration;

use bitcoin::OutPoint;
use bitcoin::{OutPoint, Txid};
use internet2::addr::{NodeAddr, ServiceAddr};
use internet2::ZmqSocketType;
use lnpbp::chain::Chain;
Expand Down Expand Up @@ -260,6 +260,21 @@ impl Client {
}
}
}

pub fn process_disclosure(
&mut self,
txid: Txid,
progress: impl Fn(String),
) -> Result<bool, Error> {
self.request(RpcMsg::ProcessDisclosure(txid))?;
loop {
match self.response()?.failure_to_error()? {
RpcMsg::Success(_) => return Ok(true),
RpcMsg::Progress(info) => progress(info),
_ => return Err(Error::UnexpectedServerResponse),
}
}
}
}

pub struct Handler {
Expand Down
3 changes: 3 additions & 0 deletions rpc/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ pub enum RpcMsg {
#[display("accept_transfer(...)")]
ConsumeTransfer(AcceptReq<TransferConsignment>),

#[display("process_disclosure({0})")]
ProcessDisclosure(Txid),

#[display(inner)]
Transfer(TransferReq),

Expand Down
89 changes: 84 additions & 5 deletions src/bucketd/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,17 @@ pub enum StashError {
/// stash data storage.
AnchorAbsent(Txid),

/// bundle data for txid {0} is absent
/// bundle data for contract {0} txid {1} is absent
///
/// It may happen due to RGB Node bug, or indicate internal stash inconsistency and compromised
/// stash data storage.
BundleAbsent(Txid),
BundleAbsent(ContractId, Txid),

/// disclosure for txid {0} is absent
///
/// It may happen due to RGB Node bug, or indicate internal stash inconsistency and compromised
/// stash data storage.
DisclosureAbsent(Txid),

/// the anchor is not related to the contract
///
Expand Down Expand Up @@ -256,7 +262,8 @@ impl Runtime {
}
let data = TransitionBundle::with(revealed, concealed)
.expect("enough data should be available to create bundle");
self.store.store_sten(db::BUNDLES, witness_txid, &data)?;
let chunk_id = ChunkId::with_fixed_fragments(contract_id, witness_txid);
self.store.store_sten(db::BUNDLES, chunk_id, &data)?;
}
for extension in consignment.state_extensions() {
let node_id = extension.node_id();
Expand All @@ -282,6 +289,77 @@ impl Runtime {
Ok(status)
}

pub(super) fn process_disclosure(&mut self, txid: Txid) -> Result<(), DaemonError> {
let disclosure: Disclosure = self
.store
.retrieve_sten(db::DISCLOSURES, txid)?
.ok_or(StashError::DisclosureAbsent(txid))?;

for (_anchor_id, (anchor, bundle_map)) in disclosure.anchored_bundles() {
for (contract_id, bundle) in bundle_map {
let mut state: ContractState = self
.store
.retrieve_sten(db::CONTRACTS, *contract_id)?
.ok_or(StashError::StateAbsent(*contract_id))?;
trace!("Starting with contract state {:?}", state);

let bundle_id = bundle.bundle_id();
let witness_txid = anchor.txid;
debug!("Processing anchored bundle {} for txid {}", bundle_id, witness_txid);
trace!("Anchor: {:?}", anchor);
trace!("Bundle: {:?}", bundle);
self.store.store_sten(db::ANCHORS, anchor.txid, anchor)?;
let concealed: BTreeMap<NodeId, BTreeSet<u16>> =
bundle.concealed_iter().map(|(id, set)| (*id, set.clone())).collect();
let mut revealed: BTreeMap<Transition, BTreeSet<u16>> = bmap!();
for (transition, inputs) in bundle.revealed_iter() {
let node_id = transition.node_id();
let transition_type = transition.transition_type();
debug!("Processing state transition {}", node_id);
trace!("State transition: {:?}", transition);

state.add_transition(witness_txid, transition);
trace!("Contract state now is {:?}", state);

trace!("Storing state transition data");
revealed.insert(transition.clone(), inputs.clone());
self.store.store_merge(db::TRANSITIONS, node_id, transition.clone())?;
self.store.store_sten(db::TRANSITION_WITNESS, node_id, &witness_txid)?;

trace!("Indexing transition");
let index_id = ChunkId::with_fixed_fragments(*contract_id, transition_type);
self.store.insert_into_set(
db::CONTRACT_TRANSITIONS,
index_id,
node_id.into_array(),
)?;

self.store.store_sten(db::NODE_CONTRACTS, node_id, contract_id)?;

for seal in transition.filter_revealed_seals() {
let index_id = ChunkId::with_fixed_fragments(
seal.txid.expect("seal should contain revealed txid"),
seal.vout,
);
self.store.insert_into_set(
db::OUTPOINTS,
index_id,
node_id.into_array(),
)?;
}
}
let data = TransitionBundle::with(revealed, concealed)
.expect("enough data should be available to create bundle");
let chunk_id = ChunkId::with_fixed_fragments(*contract_id, witness_txid);
self.store.store_sten(db::BUNDLES, chunk_id, &data)?;

self.store.store_sten(db::CONTRACTS, *contract_id, &state)?;
}
}

Ok(())
}

pub(super) fn compose_consignment<T: ConsignmentType>(
&mut self,
contract_id: ContractId,
Expand Down Expand Up @@ -453,9 +531,10 @@ impl Collector {
let anchor: Anchor<lnpbp4::MerkleBlock> = store
.retrieve_sten(db::ANCHORS, witness_txid)?
.ok_or(StashError::AnchorAbsent(witness_txid))?;
let chunk_id = ChunkId::with_fixed_fragments(contract_id, witness_txid);
let bundle: TransitionBundle = store
.retrieve_sten(db::BUNDLES, witness_txid)?
.ok_or(StashError::BundleAbsent(witness_txid))?;
.retrieve_sten(db::BUNDLES, chunk_id)?
.ok_or(StashError::BundleAbsent(contract_id, witness_txid))?;
let anchor = anchor.to_merkle_proof(contract_id)?;
self.anchored_bundles.insert(witness_txid, (anchor, bundle));
&mut self.anchored_bundles.get_mut(&witness_txid).expect("stdlib is broken").1
Expand Down
26 changes: 24 additions & 2 deletions src/bucketd/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::time::Duration;

use amplify::num::u24;
use bitcoin::secp256k1::rand::random;
use bitcoin::OutPoint;
use bitcoin::{OutPoint, Txid};
use commit_verify::ConsensusCommit;
use electrum_client::Client as ElectrumClient;
use internet2::addr::NodeAddr;
Expand All @@ -39,7 +39,7 @@ use strict_encoding::{MediumVec, StrictEncode};

use crate::bus::{
BusMsg, ConsignReq, CtlMsg, DaemonId, Endpoints, FinalizeTransferReq, OutpointStateReq,
ProcessReq, Responder, ServiceBus, ServiceId, ValidityResp,
ProcessDisclosureReq, ProcessReq, Responder, ServiceBus, ServiceId, ValidityResp,
};
use crate::{Config, DaemonError, LaunchError};

Expand Down Expand Up @@ -178,6 +178,9 @@ impl Runtime {
}) => {
self.handle_consignment(endpoints, client_id, consignment, force)?;
}
CtlMsg::ProcessDisclosure(ProcessDisclosureReq { client_id, txid }) => {
self.handle_disclosure(endpoints, client_id, txid)?;
}
CtlMsg::ProcessTransferContainer(container_id) => {
self.handle_container(endpoints, container_id)?;
}
Expand Down Expand Up @@ -310,6 +313,25 @@ impl Runtime {
Ok(())
}

fn handle_disclosure(
&mut self,
endpoints: &mut Endpoints,
client_id: ClientId,
txid: Txid,
) -> Result<(), DaemonError> {
match self.process_disclosure(txid) {
Err(err) => {
let _ = self.send_rpc(endpoints, client_id, err);
self.send_ctl(endpoints, ServiceId::rgbd(), CtlMsg::ProcessingFailed)?
}
Ok(_) => {
let _ = self.send_rpc(endpoints, client_id, RpcMsg::success());
self.send_ctl(endpoints, ServiceId::rgbd(), CtlMsg::ProcessingComplete)?
}
}
Ok(())
}

fn handle_consign_contract(
&mut self,
endpoints: &mut Endpoints,
Expand Down
12 changes: 11 additions & 1 deletion src/bus/ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use std::collections::BTreeSet;

use bitcoin::OutPoint;
use bitcoin::{OutPoint, Txid};
use internet2::addr::NodeAddr;
use microservices::esb::ClientId;
use psbt::Psbt;
Expand All @@ -36,6 +36,9 @@ pub enum CtlMsg {
#[display("process_transfer({0})")]
ProcessTransfer(ProcessReq<TransferConsignment>),

#[display("process_disclosure({0})")]
ProcessDisclosure(ProcessDisclosureReq),

#[display("process_transfer_container({0})")]
ProcessTransferContainer(ContainerId),

Expand Down Expand Up @@ -70,6 +73,13 @@ pub struct ProcessReq<T: ConsignmentType> {
pub force: bool,
}

#[derive(Clone, Debug, Display, StrictEncode, StrictDecode)]
#[display("{client_id}, txid = {txid}, ...")]
pub struct ProcessDisclosureReq {
pub client_id: ClientId,
pub txid: Txid,
}

#[derive(Clone, Debug, Display, StrictEncode, StrictDecode)]
#[display("{client_id}, {contract_id}, ...")]
pub struct ConsignReq<T: ConsignmentType> {
Expand Down
3 changes: 2 additions & 1 deletion src/bus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use rgb_rpc::RpcMsg;
use storm_ext::ExtMsg as StormMsg;

pub use self::ctl::{
ConsignReq, CtlMsg, FinalizeTransferReq, OutpointStateReq, ProcessReq, ValidityResp,
ConsignReq, CtlMsg, FinalizeTransferReq, OutpointStateReq, ProcessDisclosureReq, ProcessReq,
ValidityResp,
};
pub use self::services::{DaemonId, ServiceId};
pub(crate) use self::services::{Endpoints, Responder, ServiceBus};
Expand Down
18 changes: 16 additions & 2 deletions src/rgbd/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::collections::{BTreeSet, VecDeque};

use amplify::Wrapper;
use bitcoin::hashes::Hash;
use bitcoin::OutPoint;
use bitcoin::{OutPoint, Txid};
use internet2::addr::NodeAddr;
use internet2::ZmqSocketType;
use lnpbp::chain::Chain;
Expand All @@ -34,7 +34,7 @@ use storm_rpc::AddressedMsg;
use crate::bucketd::StashError;
use crate::bus::{
BusMsg, ConsignReq, CtlMsg, DaemonId, Endpoints, FinalizeTransferReq, OutpointStateReq,
ProcessReq, Responder, ServiceBus, ServiceId,
ProcessDisclosureReq, ProcessReq, Responder, ServiceBus, ServiceId,
};
use crate::db::ChunkHolder;
use crate::rgbd::daemons::Daemon;
Expand Down Expand Up @@ -254,6 +254,9 @@ impl Runtime {
}) => {
self.accept_transfer(endpoints, client_id, transfer, force)?;
}
RpcMsg::ProcessDisclosure(txid) => {
self.process_disclosure(endpoints, client_id, txid)?;
}

RpcMsg::Transfer(TransferReq {
consignment,
Expand Down Expand Up @@ -507,6 +510,17 @@ impl Runtime {
self.pick_or_start(endpoints, client_id)
}

fn process_disclosure(
&mut self,
endpoints: &mut Endpoints,
client_id: ClientId,
txid: Txid,
) -> Result<(), DaemonError> {
self.ctl_queue
.push_back(CtlMsg::ProcessDisclosure(ProcessDisclosureReq { client_id, txid }));
self.pick_or_start(endpoints, client_id)
}

fn process_transfer(
&mut self,
endpoints: &mut Endpoints,
Expand Down