diff --git a/asset/asset.go b/asset/asset.go index e62218c1e..b71a34515 100644 --- a/asset/asset.go +++ b/asset/asset.go @@ -245,6 +245,11 @@ func (i ID) String() string { return hex.EncodeToString(i[:]) } +// IsEqual returns true if the ID matches the provided ID. +func (i ID) IsEqual(a ID) bool { + return bytes.Equal(i[:], a[:]) +} + // Record returns a TLV record that can be used to encode/decode an ID to/from a // TLV stream. // @@ -469,6 +474,16 @@ func (s *Specifier) UnwrapToPtr() (*ID, *btcec.PublicKey) { return s.UnwrapIdToPtr(), s.UnwrapGroupKeyToPtr() } +// AssertNotEmpty checks whether the specifier is empty, returning an error if +// so. +func (s *Specifier) AssertNotEmpty() error { + if !s.HasId() && !s.HasGroupPubKey() { + return fmt.Errorf("asset specifier is empty") + } + + return nil +} + // Type denotes the asset types supported by the Taproot Asset protocol. type Type uint8 diff --git a/itest/rfq_test.go b/itest/rfq_test.go index b45651d4c..f6e505181 100644 --- a/itest/rfq_test.go +++ b/itest/rfq_test.go @@ -424,6 +424,112 @@ func testRfqAssetSellHtlcIntercept(t *harnessTest) { require.NoError(t.t, err) } +// testRfqNegotiationGroupKey checks that two nodes can negotiate and register +// quotes based on a specifier that only uses a group key. +func testRfqNegotiationGroupKey(t *harnessTest) { + // Initialize a new test scenario. + ts := newRfqTestScenario(t) + + // Mint an asset with Alice's tapd node. + rpcAssets := MintAssetsConfirmBatch( + t.t, t.lndHarness.Miner().Client, ts.AliceTapd, + []*mintrpc.MintAssetRequest{issuableAssets[0]}, + ) + + mintedAssetGroupKey := rpcAssets[0].AssetGroup.TweakedGroupKey + + ctxb := context.Background() + ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout) + defer cancel() + + // Subscribe to Alice's RFQ events stream. + aliceEventNtfns, err := ts.AliceTapd.SubscribeRfqEventNtfns( + ctxb, &rfqrpc.SubscribeRfqEventNtfnsRequest{}, + ) + require.NoError(t.t, err) + + // Alice sends a sell order to Bob for some amount of the newly minted + // asset. + askAmt := uint64(42000) + sellOrderExpiry := uint64(time.Now().Add(24 * time.Hour).Unix()) + + // We first try to add a sell order without specifying the asset skip + // flag. That should result in an error, since we only have a normal + // channel and not an asset channel. + sellReq := &rfqrpc.AddAssetSellOrderRequest{ + AssetSpecifier: &rfqrpc.AssetSpecifier{ + Id: &rfqrpc.AssetSpecifier_GroupKey{ + GroupKey: mintedAssetGroupKey, + }, + }, + PaymentMaxAmt: askAmt, + Expiry: sellOrderExpiry, + + // Here we explicitly specify Bob as the destination + // peer for the sell order. This will prompt Alice's + // tapd node to send a request for quote message to + // Bob's node. + PeerPubKey: ts.BobLnd.PubKey[:], + + TimeoutSeconds: uint32(rfqTimeout.Seconds()), + } + _, err = ts.AliceTapd.AddAssetSellOrder(ctxt, sellReq) + require.ErrorContains( + t.t, err, "no asset channel balance found", + ) + + // Now we set the skip flag and we shouldn't get an error anymore. + sellReq.SkipAssetChannelCheck = true + _, err = ts.AliceTapd.AddAssetSellOrder(ctxt, sellReq) + require.NoError(t.t, err, "unable to upsert asset sell order") + + // Wait until Alice receives an incoming sell quote accept message (sent + // from Bob) RFQ event notification. + BeforeTimeout(t.t, func() { + event, err := aliceEventNtfns.Recv() + require.NoError(t.t, err) + + _, ok := event.Event.(*rfqrpc.RfqEvent_PeerAcceptedSellQuote) + require.True(t.t, ok, "unexpected event: %v", event) + }, rfqTimeout) + + // We now repeat the same flow, where Alice is making a BuyOrderRequest. + assetMaxAmt := uint64(1000) + buyOrderExpiry := sellOrderExpiry + + buyReq := &rfqrpc.AddAssetBuyOrderRequest{ + AssetSpecifier: &rfqrpc.AssetSpecifier{ + Id: &rfqrpc.AssetSpecifier_GroupKey{ + GroupKey: mintedAssetGroupKey, + }, + }, + AssetMaxAmt: assetMaxAmt, + Expiry: buyOrderExpiry, + PeerPubKey: ts.BobLnd.PubKey[:], + TimeoutSeconds: uint32(rfqTimeout.Seconds()), + } + + _, err = ts.AliceTapd.AddAssetBuyOrder(ctxt, buyReq) + require.ErrorContains( + t.t, err, "no asset channel balance found", + ) + + // Now we set the skip flag and we shouldn't get an error anymore. + buyReq.SkipAssetChannelCheck = true + _, err = ts.AliceTapd.AddAssetBuyOrder(ctxt, buyReq) + require.NoError(t.t, err) + + // Wait until Alice receives an incoming buy quote accept message (sent + // from Bob) RFQ event notification. + BeforeTimeout(t.t, func() { + event, err := aliceEventNtfns.Recv() + require.NoError(t.t, err) + + _, ok := event.Event.(*rfqrpc.RfqEvent_PeerAcceptedBuyQuote) + require.True(t.t, ok, "unexpected event: %v", event) + }, rfqTimeout) +} + // rfqTestScenario is a struct which holds test scenario helper infra. type rfqTestScenario struct { testHarness *harnessTest diff --git a/itest/test_list_on_test.go b/itest/test_list_on_test.go index 02f6dd0a0..f478d2924 100644 --- a/itest/test_list_on_test.go +++ b/itest/test_list_on_test.go @@ -315,7 +315,10 @@ var testCases = []*testCase{ name: "rfq asset sell htlc intercept", test: testRfqAssetSellHtlcIntercept, }, - + { + name: "rfq negotiation group key", + test: testRfqNegotiationGroupKey, + }, { name: "multi signature on all levels", test: testMultiSignature, diff --git a/rfq/manager.go b/rfq/manager.go index 34e205518..fbc75db99 100644 --- a/rfq/manager.go +++ b/rfq/manager.go @@ -1,7 +1,9 @@ package rfq import ( + "bytes" "context" + "encoding/hex" "encoding/json" "fmt" "sync" @@ -61,6 +63,14 @@ type ( SellAcceptMap map[SerialisedScid]rfqmsg.SellAccept ) +// GroupLookup is an interface that helps us look up a group of an asset based +// on the asset ID. +type GroupLookup interface { + // QueryAssetGroup fetches the group information of an asset, if it + // belongs in a group. + QueryAssetGroup(context.Context, asset.ID) (*asset.AssetGroup, error) +} + // ManagerCfg is a struct that holds the configuration parameters for the RFQ // manager. type ManagerCfg struct { @@ -84,6 +94,10 @@ type ManagerCfg struct { // determine the available channels for routing. ChannelLister ChannelLister + // GroupLookup is an interface that helps us querry asset groups by + // asset IDs. + GroupLookup GroupLookup + // AliasManager is the SCID alias manager. This component is injected // into the manager once lnd and tapd are hooked together. AliasManager ScidAliasManager @@ -165,6 +179,12 @@ type Manager struct { SerialisedScid, rfqmsg.SellAccept, ] + // assetIDToGroup is a map that helps us quickly perform an in-memory + // look up of the group an asset belongs to. Since this information is + // static and generated during minting, it is not possible for an asset + // to change groups. + assetIDToGroup lnutils.SyncMap[asset.ID, []byte] + // subscribers is a map of components that want to be notified on new // events, keyed by their subscription ID. subscribers lnutils.SyncMap[uint64, *fn.EventReceiver[fn.Event]] @@ -539,18 +559,7 @@ func (m *Manager) addScidAlias(scidAlias uint64, assetSpecifier asset.Specifier, return c.PubKeyBytes == peer }, localChans) - // Identify the correct channel to use as the base SCID for the alias - // by inspecting the asset data in the custom channel data. - assetID, err := assetSpecifier.UnwrapIdOrErr() - if err != nil { - return fmt.Errorf("asset ID must be specified when adding "+ - "alias: %w", err) - } - - var ( - assetIDStr = assetID.String() - baseSCID uint64 - ) + var baseSCID uint64 for _, localChan := range peerChannels { if len(localChan.CustomChannelData) == 0 { continue @@ -566,7 +575,29 @@ func (m *Manager) addScidAlias(scidAlias uint64, assetSpecifier asset.Specifier, for _, channelAsset := range assetData.Assets { gen := channelAsset.AssetInfo.AssetGenesis - if gen.AssetID == assetIDStr { + assetIDBytes, err := hex.DecodeString( + gen.AssetID, + ) + if err != nil { + return fmt.Errorf("error "+ + "decoding asset ID: %w", err) + } + + var assetID asset.ID + copy(assetID[:], assetIDBytes) + + match, err := m.AssetMatchesSpecifier( + ctxb, assetSpecifier, assetID, + ) + if err != nil { + return err + } + + // TODO(george): Instead of returning the first result, + // try to pick the best channel for what we're trying to + // do (receive/send). Binding a baseSCID means we're + // also binding the asset liquidity on that channel. + if match { baseSCID = localChan.ChannelID break } @@ -583,8 +614,8 @@ func (m *Manager) addScidAlias(scidAlias uint64, assetSpecifier asset.Specifier, // At this point, if the base SCID is still not found, we return an // error. We can't map the SCID alias to a base SCID. if baseSCID == 0 { - return fmt.Errorf("add alias: base SCID not found for asset: "+ - "%v", assetID) + return fmt.Errorf("add alias: base SCID not found for %v", + assetSpecifier) } log.Debugf("Adding SCID alias %d for base SCID %d", scidAlias, baseSCID) @@ -917,6 +948,63 @@ func (m *Manager) RemoveSubscriber( return nil } +// GetAssetGroupKey retrieves the group key of an asset based on its ID. +func (m *Manager) GetAssetGroupKey(ctx context.Context, + id asset.ID) ([]byte, error) { + + // First, see if we have already queried our DB for this ID. + v, ok := m.assetIDToGroup.Load(id) + if ok { + return v, nil + } + + // Perform the DB query. + group, err := m.cfg.GroupLookup.QueryAssetGroup(ctx, id) + if err != nil { + return nil, err + } + + // If the asset does not belong to a group, return early with no error + // or response. + if group == nil || group.GroupKey == nil { + return nil, nil + } + + groupKeyBytes := group.GroupKey.GroupPubKey.SerializeCompressed() + + // Store the result for future calls. + m.assetIDToGroup.Store(id, groupKeyBytes) + + return groupKeyBytes, nil +} + +// AssetMatchesSpecifier checks if the provided asset satisfies the provided +// specifier. If the specifier includes a group key, we will check if the asset +// belongs to that group. +func (m *Manager) AssetMatchesSpecifier(ctx context.Context, + specifier asset.Specifier, id asset.ID) (bool, error) { + + switch { + case specifier.HasGroupPubKey(): + group, err := m.GetAssetGroupKey(ctx, id) + if err != nil { + return false, err + } + + specifierGK := specifier.UnwrapGroupKeyToPtr() + + return bytes.Equal(group, specifierGK.SerializeCompressed()), + nil + + case specifier.HasId(): + specifierID := specifier.UnwrapIdToPtr() + + return specifierID.IsEqual(id), nil + } + + return false, fmt.Errorf("specifier is empty") +} + // publishSubscriberEvent publishes an event to all subscribers. func (m *Manager) publishSubscriberEvent(event fn.Event) { // Iterate over the subscribers and deliver the event to each one. diff --git a/rfqmsg/buy_request.go b/rfqmsg/buy_request.go index c40c82115..9beccf1f2 100644 --- a/rfqmsg/buy_request.go +++ b/rfqmsg/buy_request.go @@ -159,11 +159,9 @@ func NewBuyRequestFromWire(wireMsg WireMessage, // Validate ensures that the buy request is valid. func (q *BuyRequest) Validate() error { // Ensure that the asset specifier is set. - // - // TODO(ffranr): For now, the asset ID must be set. We do not currently - // support group keys. - if !q.AssetSpecifier.HasId() { - return fmt.Errorf("asset id not specified in BuyRequest") + err := q.AssetSpecifier.AssertNotEmpty() + if err != nil { + return err } // Ensure that the message version is supported. @@ -173,7 +171,7 @@ func (q *BuyRequest) Validate() error { } // Ensure that the suggested asset rate has not expired. - err := fn.MapOptionZ(q.AssetRateHint, func(rate AssetRate) error { + err = fn.MapOptionZ(q.AssetRateHint, func(rate AssetRate) error { if rate.Expiry.Before(time.Now()) { return fmt.Errorf("suggested asset rate has expired") } diff --git a/rfqmsg/sell_request.go b/rfqmsg/sell_request.go index 8324e7efe..f9e3e427b 100644 --- a/rfqmsg/sell_request.go +++ b/rfqmsg/sell_request.go @@ -152,12 +152,10 @@ func NewSellRequestFromWire(wireMsg WireMessage, // Validate ensures that the quote request is valid. func (q *SellRequest) Validate() error { - // Ensure that the asset specifier is set. - // - // TODO(ffranr): For now, the asset ID must be set. We do not currently - // support group keys. - if !q.AssetSpecifier.HasId() { - return fmt.Errorf("asset id not specified in SellRequest") + // Ensure that the asset specifier is not empty. + err := q.AssetSpecifier.AssertNotEmpty() + if err != nil { + return err } // Ensure that the message version is supported. diff --git a/rpcserver.go b/rpcserver.go index cdb471d39..cb43928ff 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -6569,16 +6569,10 @@ func (r *rpcServer) checkPeerChannel(ctx context.Context, peer route.Vertex, // For any other case, we'll want to make sure there is a channel with // a non-zero balance of the given asset to carry the order. default: - assetID, err := specifier.UnwrapIdOrErr() - if err != nil { - return fmt.Errorf("cannot check asset channel, " + - "missing asset ID") - } - // If we don't get an error here, it means we do have an asset // channel with the peer. The intention doesn't matter as we're // just checking whether a channel exists. - _, err = r.rfqChannel(ctx, assetID, &peer, NoIntention) + _, err := r.rfqChannel(ctx, specifier, &peer, NoIntention) if err != nil { return fmt.Errorf("error checking asset channel: %w", err) @@ -7217,9 +7211,11 @@ func (r *rpcServer) SendPayment(req *tchrpc.SendPaymentRequest, peerPubKey = &parsedKey } + specifier := asset.NewSpecifierFromId(assetID) + // We can now query the asset channels we have. assetChan, err := r.rfqChannel( - ctx, assetID, peerPubKey, SendIntention, + ctx, specifier, peerPubKey, SendIntention, ) if err != nil { return fmt.Errorf("error finding asset channel to "+ @@ -7510,9 +7506,11 @@ func (r *rpcServer) AddInvoice(ctx context.Context, peerPubKey = &parsedKey } + specifier := asset.NewSpecifierFromId(assetID) + // We can now query the asset channels we have. assetChan, err := r.rfqChannel( - ctx, assetID, peerPubKey, ReceiveIntention, + ctx, specifier, peerPubKey, ReceiveIntention, ) if err != nil { return nil, fmt.Errorf("error finding asset channel to use: %w", @@ -7799,46 +7797,46 @@ const ( // rfqChannel returns the channel to use for RFQ operations. If a peer public // key is specified, the channels are filtered by that peer. If there are -// multiple channels for the same asset, the user must specify the peer public -// key. -func (r *rpcServer) rfqChannel(ctx context.Context, id asset.ID, +// multiple channels for the same specifier, the user must specify the peer +// public key. +func (r *rpcServer) rfqChannel(ctx context.Context, specifier asset.Specifier, peerPubKey *route.Vertex, - intention chanIntention) (*channelWithAsset, error) { + intention chanIntention) (*channelWithSpecifier, error) { - balances, err := r.computeChannelAssetBalance(ctx) + balances, err := r.computeChannelAssetBalance(ctx, specifier) if err != nil { return nil, fmt.Errorf("error computing available asset "+ "channel balance: %w", err) } - assetBalances, haveBalance := balances[id] - if !haveBalance || len(assetBalances) == 0 { - return nil, fmt.Errorf("no asset channel balance found for "+ - "asset %s", id.String()) + if len(balances) == 0 { + return nil, fmt.Errorf("no asset channel balance found for %s", + specifier.String()) } // If a peer public key was specified, we always want to use that to // filter the asset channels. if peerPubKey != nil { - assetBalances = fn.Filter( - assetBalances, func(c channelWithAsset) bool { + balances = fn.Filter( + balances, func(c channelWithSpecifier) bool { return c.channelInfo.PubKeyBytes == *peerPubKey }, ) } switch { - // If there are multiple asset channels for the same asset, we need to - // ask the user to specify the peer public key. Otherwise, we don't know - // who to ask for a quote. - case len(assetBalances) > 1 && peerPubKey == nil: + // If there are multiple asset channels for the same specifier, we need + // to ask the user to specify the peer public key. Otherwise, we don't + // know who to ask for a quote. + case len(balances) > 1 && peerPubKey == nil: return nil, fmt.Errorf("multiple asset channels found for "+ - "asset %s, please specify the peer pubkey", id.String()) + "%s, please specify the peer pubkey", + specifier.String()) // We don't have any channels with that asset ID and peer. - case len(assetBalances) == 0: - return nil, fmt.Errorf("no asset channel found for asset %s "+ - "and peer %s", id.String(), peerPubKey.String()) + case len(balances) == 0: + return nil, fmt.Errorf("no asset channel found for %v", + specifier) } // If the user specified a peer public key, and we still have multiple @@ -7846,13 +7844,13 @@ func (r *rpcServer) rfqChannel(ctx context.Context, id asset.ID, // the same peer, as we ruled out the rest of the cases above. // Initialize best balance to first channel of the list. - bestBalance := assetBalances[0] + bestBalance := balances[0] switch intention { case ReceiveIntention: // If the intention is to receive, return the channel // with the best remote balance. - fn.ForEach(assetBalances, func(b channelWithAsset) { + fn.ForEach(balances, func(b channelWithSpecifier) { if b.assetInfo.RemoteBalance > bestBalance.assetInfo.RemoteBalance { @@ -7863,7 +7861,7 @@ func (r *rpcServer) rfqChannel(ctx context.Context, id asset.ID, case SendIntention: // If the intention is to send, return the channel with // the best local balance. - fn.ForEach(assetBalances, func(b channelWithAsset) { + fn.ForEach(balances, func(b channelWithSpecifier) { if b.assetInfo.LocalBalance > bestBalance.assetInfo.LocalBalance { @@ -7879,28 +7877,67 @@ func (r *rpcServer) rfqChannel(ctx context.Context, id asset.ID, return &bestBalance, nil } -// channelWithAsset is a helper struct that combines the information of a single -// asset within a channel with the channels' general information. -type channelWithAsset struct { - // assetInfo is the information about one of the assets in a channel. - assetInfo rfqmsg.JsonAssetChanInfo +// channelWithSpecifier is a helper struct that combines the information of an +// asset specifier that is satisfied by a channel with the channels' general +// information. +type channelWithSpecifier struct { + // specifier is the asset specifier that is satisfied by this channels' + // assets. + specifier asset.Specifier // channelInfo is the information about the channel the asset is // committed to. channelInfo lndclient.ChannelInfo + + // assetInfo contains the asset related info of the channel. + assetInfo rfqmsg.JsonAssetChanInfo } // computeChannelAssetBalance computes the total local and remote balance for -// each asset channel. -func (r *rpcServer) computeChannelAssetBalance( - ctx context.Context) (map[asset.ID][]channelWithAsset, error) { +// each asset channel that matches the provided asset specifier. +func (r *rpcServer) computeChannelAssetBalance(ctx context.Context, + specifier asset.Specifier) ([]channelWithSpecifier, error) { activeChannels, err := r.cfg.Lnd.Client.ListChannels(ctx, true, false) if err != nil { return nil, fmt.Errorf("unable to fetch channels: %w", err) } - channelsByID := make(map[asset.ID][]channelWithAsset) + // specifierFilter is a helper function that checks if the assets of a + // channel satisfy the provided asset specifier. + specifierFilter := func( + assets []rfqmsg.JsonAssetChanInfo) (bool, error) { + + for assetIdx := range assets { + assetOutput := assets[assetIdx] + assetGen := assetOutput.AssetInfo.AssetGenesis + assetIDBytes, err := hex.DecodeString( + assetGen.AssetID, + ) + if err != nil { + return false, fmt.Errorf("error "+ + "decoding asset ID: %w", err) + } + + var assetID asset.ID + copy(assetID[:], assetIDBytes) + + match, err := r.cfg.RfqManager.AssetMatchesSpecifier( + ctx, specifier, assetID, + ) + if err != nil { + return false, err + } + + if !match { + return false, nil + } + } + + return true, nil + } + + channels := make([]channelWithSpecifier, 0) for chanIdx := range activeChannels { openChan := activeChannels[chanIdx] if len(openChan.CustomChannelData) == 0 { @@ -7914,27 +7951,33 @@ func (r *rpcServer) computeChannelAssetBalance( "data: %w", err) } - for assetIdx := range assetData.Assets { - assetOutput := assetData.Assets[assetIdx] - assetIDStr := assetOutput.AssetInfo.AssetGenesis.AssetID - assetIDBytes, err := hex.DecodeString(assetIDStr) - if err != nil { - return nil, fmt.Errorf("error decoding asset "+ - "ID: %w", err) - } - var assetID asset.ID - copy(assetID[:], assetIDBytes) + // Check if the assets of this channel match the provided + // specifier. + pass, err := specifierFilter(assetData.Assets) + if err != nil { + return nil, err + } - channelsByID[assetID] = append( - channelsByID[assetID], channelWithAsset{ - assetInfo: assetOutput, - channelInfo: openChan, - }, - ) + // Since the assets of the channel passed the above filter, we're safe + // to aggregate their info to be represented as a single entity. + var aggrAssetInfo rfqmsg.JsonAssetChanInfo + + for _, assetInfo := range assetData.Assets { + aggrAssetInfo.Capacity += assetInfo.Capacity + aggrAssetInfo.LocalBalance += assetInfo.LocalBalance + aggrAssetInfo.RemoteBalance += assetInfo.RemoteBalance + } + + if pass { + channels = append(channels, channelWithSpecifier{ + specifier: specifier, + channelInfo: openChan, + assetInfo: aggrAssetInfo, + }) } } - return channelsByID, nil + return channels, nil } // getInboundPolicy returns the policy of the given channel that points towards diff --git a/tapcfg/server.go b/tapcfg/server.go index a91d51a75..f1d878c18 100644 --- a/tapcfg/server.go +++ b/tapcfg/server.go @@ -405,6 +405,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, HtlcSubscriber: lndRouterClient, PriceOracle: priceOracle, ChannelLister: walletAnchor, + GroupLookup: tapdbAddrBook, AliasManager: lndRouterClient, // nolint: lll AcceptPriceDeviationPpm: rfqCfg.AcceptPriceDeviationPpm,