Skip to content
This repository was archived by the owner on Dec 9, 2023. It is now read-only.

Commit bf93fe1

Browse files
committed
add process_disclosure API
1 parent d9de2fe commit bf93fe1

File tree

7 files changed

+150
-7
lines changed

7 files changed

+150
-7
lines changed

rpc/src/client.rs

+16-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::collections::BTreeSet;
1212
use std::thread::sleep;
1313
use std::time::Duration;
1414

15-
use bitcoin::OutPoint;
15+
use bitcoin::{OutPoint, Txid};
1616
use internet2::addr::{NodeAddr, ServiceAddr};
1717
use internet2::ZmqSocketType;
1818
use lnpbp::chain::Chain;
@@ -260,6 +260,21 @@ impl Client {
260260
}
261261
}
262262
}
263+
264+
pub fn process_disclosure(
265+
&mut self,
266+
txid: Txid,
267+
progress: impl Fn(String),
268+
) -> Result<bool, Error> {
269+
self.request(RpcMsg::ProcessDisclosure(txid))?;
270+
loop {
271+
match self.response()?.failure_to_error()? {
272+
RpcMsg::Success(_) => return Ok(true),
273+
RpcMsg::Progress(info) => progress(info),
274+
_ => return Err(Error::UnexpectedServerResponse),
275+
}
276+
}
277+
}
263278
}
264279

265280
pub struct Handler {

rpc/src/messages.rs

+3
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ pub enum RpcMsg {
6868
#[display("accept_transfer(...)")]
6969
ConsumeTransfer(AcceptReq<TransferConsignment>),
7070

71+
#[display("process_disclosure({0})")]
72+
ProcessDisclosure(Txid),
73+
7174
#[display(inner)]
7275
Transfer(TransferReq),
7376

src/bucketd/processor.rs

+72
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ pub enum StashError {
7878
/// stash data storage.
7979
BundleAbsent(ContractId, Txid),
8080

81+
/// disclosure for txid {0} is absent
82+
///
83+
/// It may happen due to RGB Node bug, or indicate internal stash inconsistency and compromised
84+
/// stash data storage.
85+
DisclosureAbsent(Txid),
86+
8187
/// the anchor is not related to the contract
8288
///
8389
/// It may happen due to RGB Node bug, or indicate internal stash inconsistency and compromised
@@ -283,6 +289,72 @@ impl Runtime {
283289
Ok(status)
284290
}
285291

292+
pub(super) fn process_disclosure(
293+
&mut self,
294+
txid: Txid,
295+
) -> Result<(), DaemonError> {
296+
let disclosure: Disclosure = self.store.retrieve_sten(db::DISCLOSURES, txid)?
297+
.ok_or(StashError::DisclosureAbsent(txid))?;
298+
299+
for (_anchor_id, (anchor, bundle_map)) in disclosure.anchored_bundles() {
300+
for (contract_id, bundle) in bundle_map {
301+
let mut state: ContractState =
302+
self.store.retrieve_sten(db::CONTRACTS, *contract_id)?.ok_or(StashError::StateAbsent(*contract_id))?;
303+
trace!("Starting with contract state {:?}", state);
304+
305+
let bundle_id = bundle.bundle_id();
306+
let witness_txid = anchor.txid;
307+
debug!("Processing anchored bundle {} for txid {}", bundle_id, witness_txid);
308+
trace!("Anchor: {:?}", anchor);
309+
trace!("Bundle: {:?}", bundle);
310+
self.store.store_sten(db::ANCHORS, anchor.txid, anchor)?;
311+
let concealed: BTreeMap<NodeId, BTreeSet<u16>> =
312+
bundle.concealed_iter().map(|(id, set)| (*id, set.clone())).collect();
313+
let mut revealed: BTreeMap<Transition, BTreeSet<u16>> = bmap!();
314+
for (transition, inputs) in bundle.revealed_iter() {
315+
let node_id = transition.node_id();
316+
let transition_type = transition.transition_type();
317+
debug!("Processing state transition {}", node_id);
318+
trace!("State transition: {:?}", transition);
319+
320+
state.add_transition(witness_txid, transition);
321+
trace!("Contract state now is {:?}", state);
322+
323+
trace!("Storing state transition data");
324+
revealed.insert(transition.clone(), inputs.clone());
325+
self.store.store_merge(db::TRANSITIONS, node_id, transition.clone())?;
326+
self.store.store_sten(db::TRANSITION_WITNESS, node_id, &witness_txid)?;
327+
328+
trace!("Indexing transition");
329+
let index_id = ChunkId::with_fixed_fragments(*contract_id, transition_type);
330+
self.store.insert_into_set(
331+
db::CONTRACT_TRANSITIONS,
332+
index_id,
333+
node_id.into_array(),
334+
)?;
335+
336+
self.store.store_sten(db::NODE_CONTRACTS, node_id, contract_id)?;
337+
338+
for seal in transition.filter_revealed_seals() {
339+
let index_id = ChunkId::with_fixed_fragments(
340+
seal.txid.expect("seal should contain revealed txid"),
341+
seal.vout,
342+
);
343+
self.store.insert_into_set(db::OUTPOINTS, index_id, node_id.into_array())?;
344+
}
345+
}
346+
let data = TransitionBundle::with(revealed, concealed)
347+
.expect("enough data should be available to create bundle");
348+
let chunk_id = ChunkId::with_fixed_fragments(*contract_id, witness_txid);
349+
self.store.store_sten(db::BUNDLES, chunk_id, &data)?;
350+
351+
self.store.store_sten(db::CONTRACTS, *contract_id, &state)?;
352+
}
353+
}
354+
355+
Ok(())
356+
}
357+
286358
pub(super) fn compose_consignment<T: ConsignmentType>(
287359
&mut self,
288360
contract_id: ContractId,

src/bucketd/service.rs

+28-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::time::Duration;
1515

1616
use amplify::num::u24;
1717
use bitcoin::secp256k1::rand::random;
18-
use bitcoin::OutPoint;
18+
use bitcoin::{OutPoint, Txid};
1919
use commit_verify::ConsensusCommit;
2020
use electrum_client::Client as ElectrumClient;
2121
use internet2::addr::NodeAddr;
@@ -39,7 +39,7 @@ use strict_encoding::{MediumVec, StrictEncode};
3939

4040
use crate::bus::{
4141
BusMsg, ConsignReq, CtlMsg, DaemonId, Endpoints, FinalizeTransferReq, OutpointStateReq,
42-
ProcessReq, Responder, ServiceBus, ServiceId, ValidityResp,
42+
ProcessDisclosureReq, ProcessReq, Responder, ServiceBus, ServiceId, ValidityResp,
4343
};
4444
use crate::{Config, DaemonError, LaunchError};
4545

@@ -178,6 +178,12 @@ impl Runtime {
178178
}) => {
179179
self.handle_consignment(endpoints, client_id, consignment, force)?;
180180
}
181+
CtlMsg::ProcessDisclosure(ProcessDisclosureReq {
182+
client_id,
183+
txid,
184+
}) => {
185+
self.handle_disclosure(endpoints, client_id, txid)?;
186+
}
181187
CtlMsg::ProcessTransferContainer(container_id) => {
182188
self.handle_container(endpoints, container_id)?;
183189
}
@@ -310,6 +316,26 @@ impl Runtime {
310316
Ok(())
311317
}
312318

319+
fn handle_disclosure(
320+
&mut self,
321+
endpoints: &mut Endpoints,
322+
client_id: ClientId,
323+
txid: Txid,
324+
) -> Result<(), DaemonError> {
325+
match self.process_disclosure(txid) {
326+
Err(err) => {
327+
let _ = self.send_rpc(endpoints, client_id, err);
328+
self.send_ctl(endpoints, ServiceId::rgbd(), CtlMsg::ProcessingFailed)?
329+
}
330+
Ok(_) => {
331+
let _ =
332+
self.send_rpc(endpoints, client_id, RpcMsg::success());
333+
self.send_ctl(endpoints, ServiceId::rgbd(), CtlMsg::ProcessingComplete)?
334+
}
335+
}
336+
Ok(())
337+
}
338+
313339
fn handle_consign_contract(
314340
&mut self,
315341
endpoints: &mut Endpoints,

src/bus/ctl.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
use std::collections::BTreeSet;
1212

13-
use bitcoin::OutPoint;
13+
use bitcoin::{OutPoint, Txid};
1414
use internet2::addr::NodeAddr;
1515
use microservices::esb::ClientId;
1616
use psbt::Psbt;
@@ -36,6 +36,9 @@ pub enum CtlMsg {
3636
#[display("process_transfer({0})")]
3737
ProcessTransfer(ProcessReq<TransferConsignment>),
3838

39+
#[display("process_disclosure({0})")]
40+
ProcessDisclosure(ProcessDisclosureReq),
41+
3942
#[display("process_transfer_container({0})")]
4043
ProcessTransferContainer(ContainerId),
4144

@@ -70,6 +73,13 @@ pub struct ProcessReq<T: ConsignmentType> {
7073
pub force: bool,
7174
}
7275

76+
#[derive(Clone, Debug, Display, StrictEncode, StrictDecode)]
77+
#[display("{client_id}, txid = {txid}, ...")]
78+
pub struct ProcessDisclosureReq {
79+
pub client_id: ClientId,
80+
pub txid: Txid,
81+
}
82+
7383
#[derive(Clone, Debug, Display, StrictEncode, StrictDecode)]
7484
#[display("{client_id}, {contract_id}, ...")]
7585
pub struct ConsignReq<T: ConsignmentType> {

src/bus/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use rgb_rpc::RpcMsg;
1616
use storm_ext::ExtMsg as StormMsg;
1717

1818
pub use self::ctl::{
19-
ConsignReq, CtlMsg, FinalizeTransferReq, OutpointStateReq, ProcessReq, ValidityResp,
19+
ConsignReq, CtlMsg, FinalizeTransferReq, OutpointStateReq, ProcessDisclosureReq,
20+
ProcessReq, ValidityResp,
2021
};
2122
pub use self::services::{DaemonId, ServiceId};
2223
pub(crate) use self::services::{Endpoints, Responder, ServiceBus};

src/rgbd/service.rs

+18-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::collections::{BTreeSet, VecDeque};
1212

1313
use amplify::Wrapper;
1414
use bitcoin::hashes::Hash;
15-
use bitcoin::OutPoint;
15+
use bitcoin::{OutPoint, Txid};
1616
use internet2::addr::NodeAddr;
1717
use internet2::ZmqSocketType;
1818
use lnpbp::chain::Chain;
@@ -34,7 +34,7 @@ use storm_rpc::AddressedMsg;
3434
use crate::bucketd::StashError;
3535
use crate::bus::{
3636
BusMsg, ConsignReq, CtlMsg, DaemonId, Endpoints, FinalizeTransferReq, OutpointStateReq,
37-
ProcessReq, Responder, ServiceBus, ServiceId,
37+
ProcessDisclosureReq, ProcessReq, Responder, ServiceBus, ServiceId,
3838
};
3939
use crate::db::ChunkHolder;
4040
use crate::rgbd::daemons::Daemon;
@@ -254,6 +254,9 @@ impl Runtime {
254254
}) => {
255255
self.accept_transfer(endpoints, client_id, transfer, force)?;
256256
}
257+
RpcMsg::ProcessDisclosure(txid) => {
258+
self.process_disclosure(endpoints, client_id, txid)?;
259+
}
257260

258261
RpcMsg::Transfer(TransferReq {
259262
consignment,
@@ -507,6 +510,19 @@ impl Runtime {
507510
self.pick_or_start(endpoints, client_id)
508511
}
509512

513+
fn process_disclosure(
514+
&mut self,
515+
endpoints: &mut Endpoints,
516+
client_id: ClientId,
517+
txid: Txid,
518+
) -> Result<(), DaemonError> {
519+
self.ctl_queue.push_back(CtlMsg::ProcessDisclosure(ProcessDisclosureReq {
520+
client_id,
521+
txid,
522+
}));
523+
self.pick_or_start(endpoints, client_id)
524+
}
525+
510526
fn process_transfer(
511527
&mut self,
512528
endpoints: &mut Endpoints,

0 commit comments

Comments
 (0)