-
Notifications
You must be signed in to change notification settings - Fork 125
/
Copy pathmanager.go
1149 lines (953 loc) · 35.4 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package rfq
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/taproot-assets/asset"
"github.com/lightninglabs/taproot-assets/fn"
"github.com/lightninglabs/taproot-assets/rfqmsg"
lfn "github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/lnutils"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
const (
// DefaultTimeout is the default timeout used for context operations.
DefaultTimeout = 30 * time.Second
// DefaultInvoiceExpiry is the default expiry time for asset invoices.
// The current value corresponds to 5 minutes.
DefaultInvoiceExpiry = time.Second * 300
// CacheCleanupInterval is the interval at which local runtime caches
// are cleaned up.
CacheCleanupInterval = 30 * time.Second
)
// ChannelLister is an interface that provides a list of channels that are
// available for routing.
type ChannelLister interface {
// ListChannels returns a list of channels that are available for
// routing.
ListChannels(ctx context.Context) ([]lndclient.ChannelInfo, error)
}
// ScidAliasManager is an interface that can add short channel ID (SCID) aliases
// to the local SCID alias store.
type ScidAliasManager interface {
// AddLocalAlias adds a database mapping from the passed alias to the
// passed base SCID.
AddLocalAlias(ctx context.Context, alias,
baseScid lnwire.ShortChannelID) error
// DeleteLocalAlias removes a mapping from the database and the
// Manager's maps.
DeleteLocalAlias(ctx context.Context, alias,
baseScid lnwire.ShortChannelID) error
}
type (
// BuyAcceptMap is a map of buy accepts, keyed by the serialised SCID.
BuyAcceptMap map[SerialisedScid]rfqmsg.BuyAccept
// SellAcceptMap is a map of sell accepts, keyed by the serialised SCID.
SellAcceptMap map[SerialisedScid]rfqmsg.SellAccept
)
// GroupLookup is an interface that helps us look up a group of an asset based
// on the asset ID.
type GroupLookup interface {
// QueryAssetGroup fetches the group information of an asset, if it
// belongs in a group.
QueryAssetGroup(context.Context, asset.ID) (*asset.AssetGroup, error)
}
// ManagerCfg is a struct that holds the configuration parameters for the RFQ
// manager.
type ManagerCfg struct {
// PeerMessenger is the peer messenger. This component provides the RFQ
// manager with the ability to send and receive raw peer messages.
PeerMessenger PeerMessenger
// HtlcInterceptor is the HTLC interceptor. This component is used to
// intercept and accept/reject HTLCs.
HtlcInterceptor HtlcInterceptor
// HtlcSubscriber is a subscriber that is used to retrieve live HTLC
// event updates.
HtlcSubscriber HtlcSubscriber
// PriceOracle is the price oracle that the RFQ manager will use to
// determine whether a quote is accepted or rejected.
PriceOracle PriceOracle
// ChannelLister is the channel lister that the RFQ manager will use to
// determine the available channels for routing.
ChannelLister ChannelLister
// GroupLookup is an interface that helps us querry asset groups by
// asset IDs.
GroupLookup GroupLookup
// AliasManager is the SCID alias manager. This component is injected
// into the manager once lnd and tapd are hooked together.
AliasManager ScidAliasManager
// AcceptPriceDeviationPpm is the price deviation in
// parts per million that is accepted by the RFQ negotiator.
//
// Example: 50,000 ppm => price deviation is set to 5% .
AcceptPriceDeviationPpm uint64
// SkipAcceptQuotePriceCheck is a flag that, when set, will cause the
// RFQ negotiator to skip price validation on incoming quote accept
// messages (this means that the price oracle will not be queried).
SkipAcceptQuotePriceCheck bool
// ErrChan is the main error channel which will be used to report back
// critical errors to the main server.
ErrChan chan<- error
}
// Manager is a struct that manages the request for quote (RFQ) system.
type Manager struct {
startOnce sync.Once
stopOnce sync.Once
// cfg holds the configuration parameters for the RFQ manager.
cfg ManagerCfg
// orderHandler is the RFQ order handler. This subsystem monitors HTLCs
// (Hash Time Locked Contracts), determining acceptance or rejection
// based on compliance with the terms of any associated quote.
orderHandler *OrderHandler
// streamHandler is the RFQ stream handler. This subsystem handles
// incoming and outgoing peer RFQ stream messages.
streamHandler *StreamHandler
// negotiator is the RFQ quote negotiator. This subsystem determines
// whether a quote is accepted or rejected.
negotiator *Negotiator
// incomingMessages is a channel which is populated with incoming
// messages.
incomingMessages chan rfqmsg.IncomingMsg
// outgoingMessages is a channel which is populated with outgoing
// messages. These are messages which are destined to be sent to peers.
outgoingMessages chan rfqmsg.OutgoingMsg
// acceptHtlcEvents is a channel which is populated with accept HTLCs
// events.
acceptHtlcEvents chan *AcceptHtlcEvent
// peerAcceptedBuyQuotes holds buy quotes for assets that our node has
// requested and that have been accepted by peer nodes. These quotes are
// exclusively used by our node for the acquisition of assets, as they
// represent agreed-upon terms for purchase transactions with our peers.
peerAcceptedBuyQuotes lnutils.SyncMap[SerialisedScid, rfqmsg.BuyAccept]
// peerAcceptedSellQuotes holds sell quotes for assets that our node has
// requested and that have been accepted by peer nodes. These quotes are
// exclusively used by our node for the sale of assets, as they
// represent agreed-upon terms for sale transactions with our peers.
peerAcceptedSellQuotes lnutils.SyncMap[
SerialisedScid, rfqmsg.SellAccept,
]
// localAcceptedBuyQuotes holds buy quotes for assets that our node has
// accepted and that have been requested by peer nodes. These quotes are
// exclusively used by our node for the acquisition of assets, as they
// represent agreed-upon terms for purchase transactions with our peers.
localAcceptedBuyQuotes lnutils.SyncMap[SerialisedScid, rfqmsg.BuyAccept]
// localAcceptedSellQuotes holds sell quotes for assets that our node
// has accepted and that have been requested by peer nodes. These quotes
// are exclusively used by our node for the sale of assets, as they
// represent agreed-upon terms for sale transactions with our peers.
localAcceptedSellQuotes lnutils.SyncMap[
SerialisedScid, rfqmsg.SellAccept,
]
// assetIDToGroup is a map that helps us quickly perform an in-memory
// look up of the group an asset belongs to. Since this information is
// static and generated during minting, it is not possible for an asset
// to change groups.
assetIDToGroup lnutils.SyncMap[asset.ID, []byte]
// subscribers is a map of components that want to be notified on new
// events, keyed by their subscription ID.
subscribers lnutils.SyncMap[uint64, *fn.EventReceiver[fn.Event]]
// subsystemErrChan is the error channel populated by subsystems.
subsystemErrChan chan error
// ContextGuard provides a wait group and main quit channel that can be
// used to create guarded contexts.
*fn.ContextGuard
}
// NewManager creates a new RFQ manager.
func NewManager(cfg ManagerCfg) (*Manager, error) {
return &Manager{
cfg: cfg,
incomingMessages: make(chan rfqmsg.IncomingMsg),
outgoingMessages: make(chan rfqmsg.OutgoingMsg),
acceptHtlcEvents: make(chan *AcceptHtlcEvent),
peerAcceptedBuyQuotes: lnutils.SyncMap[
SerialisedScid, rfqmsg.BuyAccept]{},
peerAcceptedSellQuotes: lnutils.SyncMap[
SerialisedScid, rfqmsg.SellAccept]{},
subscribers: lnutils.SyncMap[
uint64, *fn.EventReceiver[fn.Event]]{},
subsystemErrChan: make(chan error, 10),
ContextGuard: &fn.ContextGuard{
DefaultTimeout: DefaultTimeout,
Quit: make(chan struct{}),
},
}, nil
}
// startSubsystems starts the RFQ subsystems.
func (m *Manager) startSubsystems(ctx context.Context) error {
var err error
// Initialise and start the order handler.
m.orderHandler, err = NewOrderHandler(OrderHandlerCfg{
CleanupInterval: CacheCleanupInterval,
HtlcInterceptor: m.cfg.HtlcInterceptor,
HtlcSubscriber: m.cfg.HtlcSubscriber,
AcceptHtlcEvents: m.acceptHtlcEvents,
})
if err != nil {
return fmt.Errorf("error initializing RFQ order handler: %w",
err)
}
if err := m.orderHandler.Start(); err != nil {
return fmt.Errorf("unable to start RFQ order handler: %w", err)
}
// Initialise and start the peer message stream handler.
m.streamHandler, err = NewStreamHandler(
ctx, StreamHandlerCfg{
PeerMessenger: m.cfg.PeerMessenger,
IncomingMessages: m.incomingMessages,
},
)
if err != nil {
return fmt.Errorf("error initializing RFQ subsystem service: "+
"peer message stream handler: %w", err)
}
if err := m.streamHandler.Start(); err != nil {
return fmt.Errorf("unable to start RFQ subsystem service: "+
"peer message stream handler: %w", err)
}
// Initialise and start the quote negotiator.
m.negotiator, err = NewNegotiator(
// nolint: lll
NegotiatorCfg{
PriceOracle: m.cfg.PriceOracle,
OutgoingMessages: m.outgoingMessages,
AcceptPriceDeviationPpm: m.cfg.AcceptPriceDeviationPpm,
SkipAcceptQuotePriceCheck: m.cfg.SkipAcceptQuotePriceCheck,
ErrChan: m.subsystemErrChan,
},
)
if err != nil {
return fmt.Errorf("error initializing RFQ negotiator: %w",
err)
}
if err := m.negotiator.Start(); err != nil {
return fmt.Errorf("unable to start RFQ negotiator: %w", err)
}
return err
}
// handleError logs an error and sends it to the main server error channel if
// it is a critical error.
func (m *Manager) handleError(err error) {
log.Errorf("Error in RFQ manager: %v", err)
// If the error is a critical error, send it to the main server error
// channel, which will cause the daemon to shut down.
if fn.ErrorAs[*fn.CriticalError](err) {
m.cfg.ErrChan <- err
}
}
// Start attempts to start a new RFQ manager.
func (m *Manager) Start() error {
var startErr error
m.startOnce.Do(func() {
ctx, cancel := m.WithCtxQuitNoTimeout()
log.Info("Initializing RFQ subsystems")
err := m.startSubsystems(ctx)
if err != nil {
startErr = err
return
}
// Start the manager's main event loop in a separate goroutine.
m.Wg.Add(1)
go func() {
defer func() {
m.Wg.Done()
// Attempt to stop all subsystems if the main
// event loop exits.
err = m.stopSubsystems()
if err != nil {
log.Errorf("Error stopping RFQ "+
"subsystems: %v", err)
}
// The context can now be cancelled as all
// dependant components have been stopped.
cancel()
}()
log.Info("Starting RFQ manager main event loop")
m.mainEventLoop()
}()
})
return startErr
}
// Stop attempts to stop the RFQ manager.
func (m *Manager) Stop() error {
var stopErr error
m.stopOnce.Do(func() {
log.Info("Stopping RFQ system")
stopErr = m.stopSubsystems()
// Stop the main event loop.
close(m.Quit)
})
return stopErr
}
// stopSubsystems stops the RFQ subsystems.
func (m *Manager) stopSubsystems() error {
// Stop the RFQ order handler.
err := m.orderHandler.Stop()
if err != nil {
return fmt.Errorf("error stopping RFQ order handler: %w", err)
}
// Stop the RFQ stream handler.
err = m.streamHandler.Stop()
if err != nil {
return fmt.Errorf("error stopping RFQ stream handler: %w", err)
}
// Stop the RFQ quote negotiator.
err = m.negotiator.Stop()
if err != nil {
return fmt.Errorf("error stopping RFQ quote negotiator: %w",
err)
}
return nil
}
// handleIncomingMessage handles an incoming message. These are messages that
// have been received from a peer.
func (m *Manager) handleIncomingMessage(incomingMsg rfqmsg.IncomingMsg) error {
// Perform type specific handling of the incoming message.
switch msg := incomingMsg.(type) {
case *rfqmsg.BuyRequest:
err := m.negotiator.HandleIncomingBuyRequest(*msg)
if err != nil {
return fmt.Errorf("error handling incoming buy "+
"request: %w", err)
}
case *rfqmsg.BuyAccept:
// TODO(ffranr): The stream handler should ensure that the
// accept message corresponds to a request.
finaliseCallback := func(msg rfqmsg.BuyAccept,
invalidQuoteEvent fn.Option[InvalidQuoteRespEvent]) {
// If the quote is invalid, notify subscribers of the
// invalid quote event and return.
invalidQuoteEvent.WhenSome(
func(event InvalidQuoteRespEvent) {
m.publishSubscriberEvent(&event)
},
)
if invalidQuoteEvent.IsSome() {
return
}
// The quote request has been accepted. Store accepted
// quote so that it can be used to send a payment by our
// lightning node.
scid := msg.ShortChannelId()
m.peerAcceptedBuyQuotes.Store(scid, msg)
// Since we're going to buy assets from our peer, we
// need to make sure we can identify the incoming asset
// payment by the SCID alias through which it comes in
// and compare it to the one in the invoice.
err := m.addScidAlias(
uint64(msg.ShortChannelId()),
msg.Request.AssetSpecifier, msg.Peer,
)
if err != nil {
m.handleError(
fmt.Errorf("error adding local alias: "+
"%w", err),
)
return
}
// Notify subscribers of the incoming peer accepted
// asset buy quote.
event := NewPeerAcceptedBuyQuoteEvent(&msg)
m.publishSubscriberEvent(event)
}
m.negotiator.HandleIncomingBuyAccept(*msg, finaliseCallback)
case *rfqmsg.SellRequest:
err := m.negotiator.HandleIncomingSellRequest(*msg)
if err != nil {
return fmt.Errorf("error handling incoming sell "+
"request: %w", err)
}
case *rfqmsg.SellAccept:
// TODO(ffranr): The stream handler should ensure that the
// accept message corresponds to a request.
finaliseCallback := func(msg rfqmsg.SellAccept,
invalidQuoteEvent fn.Option[InvalidQuoteRespEvent]) {
// If the quote is invalid, notify subscribers of the
// invalid quote event and return.
invalidQuoteEvent.WhenSome(
func(event InvalidQuoteRespEvent) {
m.publishSubscriberEvent(&event)
},
)
if invalidQuoteEvent.IsSome() {
return
}
// The quote request has been accepted. Store accepted
// quote so that it can be used to send a payment by our
// lightning node.
scid := msg.ShortChannelId()
m.peerAcceptedSellQuotes.Store(scid, msg)
// Notify subscribers of the incoming peer accepted
// asset sell quote.
event := NewPeerAcceptedSellQuoteEvent(&msg)
m.publishSubscriberEvent(event)
}
m.negotiator.HandleIncomingSellAccept(*msg, finaliseCallback)
case *rfqmsg.Reject:
// The quote request has been rejected. Notify subscribers of
// the rejection.
event := NewIncomingRejectQuoteEvent(msg)
m.publishSubscriberEvent(event)
default:
return fmt.Errorf("unhandled incoming message type: %T", msg)
}
return nil
}
// handleOutgoingMessage handles an outgoing message. Outgoing messages are
// messages that will be sent to a peer.
func (m *Manager) handleOutgoingMessage(outgoingMsg rfqmsg.OutgoingMsg) error {
// Perform type specific handling of the outgoing message.
switch msg := outgoingMsg.(type) {
case *rfqmsg.BuyAccept:
// A peer sent us an asset buy quote request in an attempt to
// buy an asset from us. Having accepted the request, but before
// we inform our peer of our decision, we inform the order
// handler that we are willing to sell the asset subject to a
// sale policy.
m.orderHandler.RegisterAssetSalePolicy(*msg)
// We want to store that we accepted the buy quote, in case we
// need to look it up for a direct peer payment.
m.localAcceptedBuyQuotes.Store(msg.ShortChannelId(), *msg)
// Since our peer is going to buy assets from us, we need to
// make sure we can identify the forwarded asset payment by the
// outgoing SCID alias within the onion packet.
err := m.addScidAlias(
uint64(msg.ShortChannelId()),
msg.Request.AssetSpecifier, msg.Peer,
)
if err != nil {
return fmt.Errorf("error adding local alias: %w", err)
}
case *rfqmsg.SellAccept:
// A peer sent us an asset sell quote request in an attempt to
// sell an asset to us. Having accepted the request, but before
// we inform our peer of our decision, we inform the order
// handler that we are willing to buy the asset subject to a
// purchase policy.
m.orderHandler.RegisterAssetPurchasePolicy(*msg)
// We want to store that we accepted the sell quote, in case we
// need to look it up for a direct peer payment.
m.localAcceptedSellQuotes.Store(msg.ShortChannelId(), *msg)
}
// Send the outgoing message to the peer.
err := m.streamHandler.HandleOutgoingMessage(outgoingMsg)
if err != nil {
return fmt.Errorf("error sending outgoing message to stream "+
"handler: %w", err)
}
return nil
}
// addScidAlias adds a SCID alias to the alias manager.
func (m *Manager) addScidAlias(scidAlias uint64, assetSpecifier asset.Specifier,
peer route.Vertex) error {
// Retrieve all local channels.
ctxb := context.Background()
localChans, err := m.cfg.ChannelLister.ListChannels(ctxb)
if err != nil {
// Not being able to call lnd to add the alias is a critical
// error, which warrants shutting down, as something is wrong.
return fn.NewCriticalError(
fmt.Errorf("add alias: error listing local channels: "+
"%w", err),
)
}
// Filter for channels with the given peer.
peerChannels := lfn.Filter(func(c lndclient.ChannelInfo) bool {
return c.PubKeyBytes == peer
}, localChans)
// Identify the correct channel to use as the base SCID for the alias
// by inspecting the asset data in the custom channel data.
assetID, err := assetSpecifier.UnwrapIdOrErr()
if err != nil {
return fmt.Errorf("asset ID must be specified when adding "+
"alias: %w", err)
}
var (
assetIDStr = assetID.String()
baseSCID uint64
)
for _, localChan := range peerChannels {
if len(localChan.CustomChannelData) == 0 {
continue
}
var assetData rfqmsg.JsonAssetChannel
err = json.Unmarshal(localChan.CustomChannelData, &assetData)
if err != nil {
log.Warnf("Unable to unmarshal channel asset data: %v",
err)
continue
}
for _, channelAsset := range assetData.Assets {
gen := channelAsset.AssetInfo.AssetGenesis
if gen.AssetID == assetIDStr {
baseSCID = localChan.ChannelID
break
}
}
}
// As a fallback, if the base SCID is not found and there's only one
// channel with the target peer, assume that the base SCID corresponds
// to that channel.
if baseSCID == 0 && len(peerChannels) == 1 {
baseSCID = peerChannels[0].ChannelID
}
// At this point, if the base SCID is still not found, we return an
// error. We can't map the SCID alias to a base SCID.
if baseSCID == 0 {
return fmt.Errorf("add alias: base SCID not found for asset: "+
"%v", assetID)
}
log.Debugf("Adding SCID alias %d for base SCID %d", scidAlias, baseSCID)
err = m.cfg.AliasManager.AddLocalAlias(
ctxb, lnwire.NewShortChanIDFromInt(scidAlias),
lnwire.NewShortChanIDFromInt(baseSCID),
)
if err != nil {
// Not being able to call lnd to add the alias is a critical
// error, which warrants shutting down, as something is wrong.
return fn.NewCriticalError(
fmt.Errorf("add alias: error adding SCID alias to "+
"lnd alias manager: %w", err),
)
}
return nil
}
// mainEventLoop is the main event loop of the RFQ manager.
func (m *Manager) mainEventLoop() {
for {
select {
// Handle incoming message.
case incomingMsg := <-m.incomingMessages:
log.Debugf("Manager handling incoming message: %s",
incomingMsg)
err := m.handleIncomingMessage(incomingMsg)
if err != nil {
m.handleError(
fmt.Errorf("failed to handle "+
"incoming message: %w", err),
)
}
// Handle outgoing message.
case outgoingMsg := <-m.outgoingMessages:
log.Debugf("Manager handling outgoing message: %s",
outgoingMsg)
err := m.handleOutgoingMessage(outgoingMsg)
if err != nil {
m.handleError(
fmt.Errorf("failed to handle outgoing "+
"message: %w", err),
)
}
case acceptHtlcEvent := <-m.acceptHtlcEvents:
// Handle a HTLC accept event. Notify any subscribers.
m.publishSubscriberEvent(acceptHtlcEvent)
// Handle subsystem errors.
case err := <-m.subsystemErrChan:
// Report the subsystem error to the main server, in
// case the root cause is a critical error.
m.handleError(
fmt.Errorf("encountered RFQ subsystem error "+
"in main event loop: %w", err),
)
case <-m.Quit:
log.Debug("Manager main event loop has received the " +
"shutdown signal")
return
}
}
}
// UpsertAssetSellOffer upserts an asset sell offer for management by the RFQ
// system. If the offer already exists for the given asset, it will be updated.
func (m *Manager) UpsertAssetSellOffer(offer SellOffer) error {
// Store the asset sell offer in the negotiator.
err := m.negotiator.UpsertAssetSellOffer(offer)
if err != nil {
return fmt.Errorf("error registering asset sell offer: %w", err)
}
return nil
}
// RemoveAssetSellOffer removes an asset sell offer from the RFQ manager.
func (m *Manager) RemoveAssetSellOffer(assetID *asset.ID,
assetGroupKey *btcec.PublicKey) error {
// Remove the asset sell offer from the negotiator.
err := m.negotiator.RemoveAssetSellOffer(assetID, assetGroupKey)
if err != nil {
return fmt.Errorf("error removing asset sell offer: %w", err)
}
return nil
}
// UpsertAssetBuyOffer upserts an asset buy offer for management by the RFQ
// system. If the offer already exists for the given asset, it will be updated.
func (m *Manager) UpsertAssetBuyOffer(offer BuyOffer) error {
// Store the asset buy offer in the negotiator.
err := m.negotiator.UpsertAssetBuyOffer(offer)
if err != nil {
return fmt.Errorf("error registering asset buy offer: %w", err)
}
return nil
}
// BuyOrder instructs the RFQ (Request For Quote) system to request a quote from
// one or more peers for the acquisition of an asset.
//
// The normal use of a buy order is as follows:
// 1. Alice, operating a wallet node, wants to receive a Tap asset as payment
// by issuing a Lightning invoice.
// 2. Alice has an asset channel established with Bob's edge node.
// 3. Before issuing the invoice, Alice needs to agree on an exchange rate with
// Bob, who will facilitate the asset transfer.
// 4. To obtain the best exchange rate, Alice creates a buy order specifying
// the desired asset.
// 5. Alice's RFQ subsystem processes the buy order and sends buy requests to
// relevant peers to find the best rate. In this example, Bob is the only
// available peer.
// 6. Once Bob provides a satisfactory quote, Alice accepts it.
// 7. Alice issues the Lightning invoice, which Charlie will pay.
// 8. Instead of paying Alice directly, Charlie pays Bob.
// 9. Bob then forwards the agreed amount of the Tap asset to Alice over their
// asset channel.
type BuyOrder struct {
// AssetSpecifier is the asset that the buyer is interested in.
AssetSpecifier asset.Specifier
// AssetMaxAmt is the maximum amount of the asset that the provider must
// be willing to offer.
AssetMaxAmt uint64
// Expiry is the time at which the order expires.
Expiry time.Time
// Peer is the peer that the buy order is intended for. This field is
// optional.
//
// TODO(ffranr): Currently, this field must be specified. In the future,
// the negotiator should be able to determine the optimal peer.
Peer fn.Option[route.Vertex]
}
// UpsertAssetBuyOrder upserts an asset buy order for management.
func (m *Manager) UpsertAssetBuyOrder(order BuyOrder) error {
// For now, a peer must be specified.
//
// TODO(ffranr): Add support for peerless buy orders. The negotiator
// should be able to determine the optimal peer.
if order.Peer.IsNone() {
return fmt.Errorf("buy order peer must be specified")
}
// Request a quote from a peer via the negotiator.
err := m.negotiator.HandleOutgoingBuyOrder(order)
if err != nil {
return fmt.Errorf("error registering asset buy order: %w", err)
}
return nil
}
// SellOrder instructs the RFQ (Request For Quote) system to request a quote
// from one or more peers for the disposition of an asset.
//
// Normal usage of a sell order:
// 1. Alice creates a Lightning invoice for Bob to pay.
// 2. Bob wants to pay the invoice using a Tap asset. To do so, Bob pays an
// edge node with a Tap asset, and the edge node forwards the payment to the
// network to settle Alice's invoice. Bob submits a SellOrder to his local
// RFQ service.
// 3. The RFQ service converts the SellOrder into one or more SellRequests.
// These requests are sent to Charlie (the edge node), who shares a relevant
// Tap asset channel with Bob and can forward payments to settle Alice's
// invoice.
// 4. Charlie responds with a quote that satisfies Bob.
// 5. Bob transfers the appropriate Tap asset amount to Charlie via their
// shared Tap asset channel, and Charlie forwards the corresponding amount
// to Alice to settle the Lightning invoice.
type SellOrder struct {
// AssetSpecifier is the asset that the seller is interested in.
AssetSpecifier asset.Specifier
// PaymentMaxAmt is the maximum msat amount that the responding peer
// must agree to pay.
PaymentMaxAmt lnwire.MilliSatoshi
// Expiry is the time at which the order expires.
Expiry time.Time
// Peer is the peer that the buy order is intended for. This field is
// optional.
Peer fn.Option[route.Vertex]
}
// UpsertAssetSellOrder upserts an asset sell order for management.
func (m *Manager) UpsertAssetSellOrder(order SellOrder) error {
// For now, a peer must be specified.
//
// TODO(ffranr): Add support for peerless sell orders. The negotiator
// should be able to determine the optimal peer.
if order.Peer.IsNone() {
return fmt.Errorf("sell order peer must be specified")
}
// Pass the asset sell order to the negotiator which will generate sell
// request messages to send to peers.
m.negotiator.HandleOutgoingSellOrder(order)
return nil
}
// PeerAcceptedBuyQuotes returns buy quotes that were requested by our node and
// have been accepted by our peers. These quotes are exclusively available to
// our node for the acquisition of assets.
func (m *Manager) PeerAcceptedBuyQuotes() BuyAcceptMap {
// Returning the map directly is not thread safe. We will therefore
// create a copy.
buyQuotesCopy := make(map[SerialisedScid]rfqmsg.BuyAccept)
m.peerAcceptedBuyQuotes.ForEach(
func(scid SerialisedScid, accept rfqmsg.BuyAccept) error {
if time.Now().After(accept.AssetRate.Expiry) {
m.peerAcceptedBuyQuotes.Delete(scid)
return nil
}
buyQuotesCopy[scid] = accept
return nil
},
)
return buyQuotesCopy
}
// PeerAcceptedSellQuotes returns sell quotes that were requested by our node
// and have been accepted by our peers. These quotes are exclusively available
// to our node for the sale of assets.
func (m *Manager) PeerAcceptedSellQuotes() SellAcceptMap {
// Returning the map directly is not thread safe. We will therefore
// create a copy.
sellQuotesCopy := make(map[SerialisedScid]rfqmsg.SellAccept)
m.peerAcceptedSellQuotes.ForEach(
func(scid SerialisedScid, accept rfqmsg.SellAccept) error {
if time.Now().After(accept.AssetRate.Expiry) {
m.peerAcceptedSellQuotes.Delete(scid)
return nil
}
sellQuotesCopy[scid] = accept
return nil
},
)
return sellQuotesCopy
}
// LocalAcceptedBuyQuotes returns buy quotes that were accepted by our node and
// have been requested by our peers. These quotes are exclusively available to
// our node for the acquisition of assets.
func (m *Manager) LocalAcceptedBuyQuotes() BuyAcceptMap {
// Returning the map directly is not thread safe. We will therefore
// create a copy.
buyQuotesCopy := make(map[SerialisedScid]rfqmsg.BuyAccept)
m.localAcceptedBuyQuotes.ForEach(
func(scid SerialisedScid, accept rfqmsg.BuyAccept) error {
if time.Now().After(accept.AssetRate.Expiry) {
m.localAcceptedBuyQuotes.Delete(scid)
return nil
}
buyQuotesCopy[scid] = accept
return nil
},
)
return buyQuotesCopy
}
// LocalAcceptedSellQuotes returns sell quotes that were accepted by our node
// and have been requested by our peers. These quotes are exclusively available
// to our node for the sale of assets.
func (m *Manager) LocalAcceptedSellQuotes() SellAcceptMap {
// Returning the map directly is not thread safe. We will therefore
// create a copy.
sellQuotesCopy := make(map[SerialisedScid]rfqmsg.SellAccept)
m.localAcceptedSellQuotes.ForEach(
func(scid SerialisedScid, accept rfqmsg.SellAccept) error {
if time.Now().After(accept.AssetRate.Expiry) {
m.localAcceptedSellQuotes.Delete(scid)
return nil
}
sellQuotesCopy[scid] = accept
return nil
},
)
return sellQuotesCopy
}
// RegisterSubscriber adds a new subscriber to the set of subscribers that will
// be notified of any new events that are broadcast.
//
// TODO(ffranr): Add support for delivering existing events to new subscribers.
func (m *Manager) RegisterSubscriber(
receiver *fn.EventReceiver[fn.Event],
deliverExisting bool, deliverFrom uint64) error {
m.subscribers.Store(receiver.ID(), receiver)
return nil
}
// RemoveSubscriber removes a subscriber from the set of subscribers that will
// be notified of any new events that are broadcast.
func (m *Manager) RemoveSubscriber(
subscriber *fn.EventReceiver[fn.Event]) error {
_, ok := m.subscribers.Load(subscriber.ID())
if !ok {
return fmt.Errorf("subscriber with ID %d not found",
subscriber.ID())
}
subscriber.Stop()
m.subscribers.Delete(subscriber.ID())
return nil
}
// GetAssetGroupKey retrieves the group key of an asset based on its ID.
func (m *Manager) GetAssetGroupKey(ctx context.Context,
id asset.ID) ([]byte, error) {
// First, see if we have already queried our DB for this ID.
v, ok := m.assetIDToGroup.Load(id)
if ok {
return v, nil
}
// Perform the DB query.
group, err := m.cfg.GroupLookup.QueryAssetGroup(ctx, id)
if err != nil {
return nil, err
}
// If the asset does not belong to a group, return early with no error
// or response.
if group == nil || group.GroupKey == nil {
return nil, nil
}
groupKeyBytes := group.GroupKey.GroupPubKey.SerializeCompressed()
// Store the result for future calls.
m.assetIDToGroup.Store(id, groupKeyBytes)
return groupKeyBytes, nil
}
// publishSubscriberEvent publishes an event to all subscribers.
func (m *Manager) publishSubscriberEvent(event fn.Event) {
// Iterate over the subscribers and deliver the event to each one.
m.subscribers.Range(
func(id uint64, sub *fn.EventReceiver[fn.Event]) bool {
sub.NewItemCreated.ChanIn() <- event
return true
},
)
}
// PeerAcceptedBuyQuoteEvent is an event that is broadcast when the RFQ manager
// receives an accept quote message from a peer. This is a quote which was
// requested by our node and has been accepted by a peer.
type PeerAcceptedBuyQuoteEvent struct {
// timestamp is the event creation UTC timestamp.
timestamp time.Time
// BuyAccept is the accepted asset buy quote.
rfqmsg.BuyAccept
}
// NewPeerAcceptedBuyQuoteEvent creates a new PeerAcceptedBuyQuoteEvent.
func NewPeerAcceptedBuyQuoteEvent(
buyAccept *rfqmsg.BuyAccept) *PeerAcceptedBuyQuoteEvent {
return &PeerAcceptedBuyQuoteEvent{
timestamp: time.Now().UTC(),
BuyAccept: *buyAccept,
}
}
// Timestamp returns the event creation UTC timestamp.