-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathprocess.go
565 lines (453 loc) · 12.5 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var (
rpcTimeout = 10 * time.Second
defaultPeerRefreshInterval = 10 * time.Minute
errChannelNotFound = errors.New("channel not found")
)
const burstSize = 10
type lndclient interface {
getInfo() (*info, error)
listChannels() (map[uint64]*channel, error)
listClosedChannels() (map[uint64]*channel, error)
getNodeAlias(key route.Vertex) (string, error)
subscribeHtlcEvents(ctx context.Context) (htlcEventsClient, error)
htlcInterceptor(ctx context.Context) (htlcInterceptorClient, error)
getPendingIncomingHtlcs(ctx context.Context, peer *route.Vertex) (
map[route.Vertex]map[circuitKey]*inFlightHtlc, error)
}
type circuitKey struct {
channel uint64
htlc uint64
}
type interceptEvent struct {
circuitKey
incomingMsat lnwire.MilliSatoshi
outgoingMsat lnwire.MilliSatoshi
resume func(bool) error
}
type resolvedEvent struct {
incomingCircuitKey circuitKey
outgoingCircuitKey circuitKey
settled bool
timestamp time.Time
}
type rateCounters struct {
counters map[route.Vertex]*peerState
}
type rateCountersRequest struct {
counters chan *rateCounters
}
type process struct {
db *Db
client lndclient
limits *Limits
log *zap.SugaredLogger
interceptChan chan interceptEvent
resolveChan chan resolvedEvent
updateLimitChan chan updateLimitEvent
rateCountersRequestChan chan rateCountersRequest
newPeerChan chan route.Vertex
identity route.Vertex
chanMap map[uint64]*channel
aliasMap map[route.Vertex]string
peerCtrls map[route.Vertex]*peerController
burstSize int
peerRefreshInterval time.Duration
// Testing hook
resolvedCallback func()
}
func NewProcess(client lndclient, log *zap.SugaredLogger, limits *Limits, db *Db) *process {
return &process{
db: db,
log: log,
client: client,
interceptChan: make(chan interceptEvent),
resolveChan: make(chan resolvedEvent),
updateLimitChan: make(chan updateLimitEvent),
rateCountersRequestChan: make(chan rateCountersRequest),
newPeerChan: make(chan route.Vertex),
chanMap: make(map[uint64]*channel),
aliasMap: make(map[route.Vertex]string),
peerCtrls: make(map[route.Vertex]*peerController),
limits: limits,
burstSize: burstSize,
peerRefreshInterval: defaultPeerRefreshInterval,
}
}
type updateLimitEvent struct {
limit *Limit
peer *route.Vertex
}
func (p *process) UpdateLimit(ctx context.Context, peer *route.Vertex,
limit *Limit) error {
if peer == nil && limit == nil {
return errors.New("cannot clear default limit")
}
update := updateLimitEvent{
limit: limit,
peer: peer,
}
select {
case p.updateLimitChan <- update:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (p *process) Run(ctx context.Context) error {
p.log.Info("CircuitBreaker started")
info, err := p.client.getInfo()
if err != nil {
return err
}
p.identity = info.nodeKey
p.log.Infow("Connected to lnd node",
"pubkey", p.identity.String())
group, ctx := errgroup.WithContext(ctx)
stream, err := p.client.subscribeHtlcEvents(ctx)
if err != nil {
return err
}
interceptor, err := p.client.htlcInterceptor(ctx)
if err != nil {
return err
}
p.log.Info("Interceptor/notification handlers registered")
group.Go(func() error {
err := p.processHtlcEvents(ctx, stream)
if err != nil {
return fmt.Errorf("htlc events error: %w", err)
}
return nil
})
group.Go(func() error {
err := p.processInterceptor(ctx, interceptor)
if err != nil {
return fmt.Errorf("interceptor error: %w", err)
}
return err
})
group.Go(func() error {
return p.peerRefreshLoop(ctx)
})
group.Go(func() error {
return p.runEventLoop(ctx)
})
return group.Wait()
}
func (p *process) peerRefreshLoop(ctx context.Context) error {
notifiedPeers := make(map[route.Vertex]struct{})
for {
// Get all peers.
channels, err := p.client.listChannels()
if err != nil {
return err
}
// Notify the main event loop of the new ones.
for _, ch := range channels {
if _, ok := notifiedPeers[ch.peer]; ok {
continue
}
notifiedPeers[ch.peer] = struct{}{}
select {
case p.newPeerChan <- ch.peer:
case <-ctx.Done():
return ctx.Err()
}
}
// Poll delay.
select {
case <-time.After(p.peerRefreshInterval):
case <-ctx.Done():
return ctx.Err()
}
}
}
func (p *process) getPeerController(ctx context.Context, peer route.Vertex,
startGo func(func() error)) *peerController {
ctrl, ok := p.peerCtrls[peer]
if ok {
return ctrl
}
// If the peer does not yet exist, initialize it with no pending htlcs.
htlcs := make(map[circuitKey]*inFlightHtlc)
return p.createPeerController(ctx, peer, startGo, htlcs)
}
func (p *process) createPeerController(ctx context.Context, peer route.Vertex,
startGo func(func() error),
htlcs map[circuitKey]*inFlightHtlc) *peerController {
peerCfg, ok := p.limits.PerPeer[peer]
if !ok {
peerCfg = p.limits.Default
}
cfg := &peerControllerCfg{
logger: p.log,
limit: peerCfg,
burstSize: p.burstSize,
htlcs: htlcs,
lnd: p.client,
pubKey: peer,
now: time.Now,
htlcCompleted: func(ctx context.Context, htlc *HtlcInfo) error {
// If the add time of a htlc is zero, it was resumed after a LND
// restart. We don't store these htlcs because they have
// incomplete information (missing add time and amounts).
if htlc.addTime.IsZero() {
log.Debugf("Not storing incomplete htlc resumed after "+
"restart: %v (%v) -> %v (%v)",
htlc.incomingCircuit.channel,
htlc.incomingCircuit.htlc,
htlc.outgoingCircuit.channel,
htlc.outgoingCircuit.htlc)
return nil
}
return p.db.RecordHtlcResolution(ctx, htlc)
},
}
ctrl := newPeerController(cfg)
startGo(func() error {
return ctrl.run(ctx)
})
p.peerCtrls[peer] = ctrl
return ctrl
}
func (p *process) runEventLoop(ctx context.Context) error {
group, ctx := errgroup.WithContext(ctx)
// Event loop will spin up new goroutines using the group that is passed in here.
// We run it in the same group so that both errors in eventLoop and those in
// the goroutines that is spins will will prompt exit.
group.Go(func() error {
return p.eventLoop(ctx, group)
})
return group.Wait()
}
func (p *process) eventLoop(ctx context.Context, group *errgroup.Group) error {
// Retrieve all pending htlcs from lnd.
htlcsPerPeer, err := p.client.getPendingIncomingHtlcs(ctx, nil)
if err != nil {
return err
}
// Initialize peer controllers with currently pending htlcs.
for peer, htlcs := range htlcsPerPeer {
p.createPeerController(ctx, peer, group.Go, htlcs)
}
for {
select {
case interceptEvent := <-p.interceptChan:
chanInfo, err := p.getChanInfo(interceptEvent.channel)
if err != nil {
return err
}
ctrl := p.getPeerController(ctx, chanInfo.peer, group.Go)
peerEvent := peerInterceptEvent{
interceptEvent: interceptEvent,
peerInitiated: !chanInfo.initiator,
}
if err := ctrl.process(ctx, peerEvent); err != nil {
return err
}
case resolvedEvent := <-p.resolveChan:
chanInfo, err := p.getChanInfo(
resolvedEvent.incomingCircuitKey.channel,
)
if err != nil {
return err
}
ctrl := p.getPeerController(ctx, chanInfo.peer, group.Go)
// Lookup the outgoing peer to supplement the information on the
// resolved event. Here we handle a channel lookup error
// differently to the incoming channel, because it's possible
// we were forwarded a HTLC with a bogus outgoing channel. If
// this is the case, LND would have failed the HTLC back even if
// we let it through. We catch and log that error, rather than
// exiting like we do with incoming channels (where we reasonably
// expect to find the channel). We still enforce channel lookup
// for successful HTLCs, because then we know that the channel
// does exist and should be found.
var outgoingPeer *route.Vertex
chanInfo, err = p.getChanInfo(
resolvedEvent.outgoingCircuitKey.channel,
)
switch {
case errors.Is(err, errChannelNotFound) && !resolvedEvent.settled:
log.Debugf("Channel not found for failed htlc: %v",
resolvedEvent.outgoingCircuitKey.channel)
case err != nil:
return err
default:
outgoingPeer = &chanInfo.peer
}
if err := ctrl.resolved(ctx, peerResolvedEvent{
resolvedEvent: resolvedEvent,
outgoingPeer: outgoingPeer,
}); err != nil {
return err
}
if p.resolvedCallback != nil {
p.resolvedCallback()
}
case update := <-p.updateLimitChan:
switch {
// Update sets default limit.
case update.peer == nil:
p.limits.Default = *update.limit
// Update all controllers that have no specific limit.
for node, ctrl := range p.peerCtrls {
_, ok := p.limits.PerPeer[node]
if ok {
continue
}
err := ctrl.updateLimit(ctx, *update.limit)
if err != nil {
return err
}
}
// Update sets specific limit.
case update.limit != nil:
p.limits.PerPeer[*update.peer] = *update.limit
// Update specific controller if it exists.
ctrl, ok := p.peerCtrls[*update.peer]
if ok {
err := ctrl.updateLimit(ctx, *update.limit)
if err != nil {
return err
}
}
// Update clears limit.
case update.limit == nil:
delete(p.limits.PerPeer, *update.peer)
// Apply default limit to peer controller.
ctrl, ok := p.peerCtrls[*update.peer]
if ok {
err := ctrl.updateLimit(ctx, p.limits.Default)
if err != nil {
return err
}
}
}
case req := <-p.rateCountersRequestChan:
allCounts := make(map[route.Vertex]*peerState)
for node, ctrl := range p.peerCtrls {
state, err := ctrl.state(ctx)
if err != nil {
return err
}
allCounts[node] = state
}
req.counters <- &rateCounters{
counters: allCounts,
}
case <-ctx.Done():
return ctx.Err()
// A new or existing peer has been reported.
case newPeer := <-p.newPeerChan:
p.log.Infow("New peer notification received", "peer", newPeer)
// Try to get the existing peer controller. If it doesn't exist, it
// will be created. This causes the peer to be reported over grpc.
_ = p.getPeerController(ctx, newPeer, group.Go)
}
}
}
func (p *process) getRateCounters(ctx context.Context) (
map[route.Vertex]*peerState, error) {
replyChan := make(chan *rateCounters)
select {
case p.rateCountersRequestChan <- rateCountersRequest{
counters: replyChan,
}:
case <-ctx.Done():
return nil, ctx.Err()
}
select {
case reply := <-replyChan:
return reply.counters, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (p *process) processHtlcEvents(ctx context.Context,
stream htlcEventsClient) error {
for {
event, err := stream.recv()
if err != nil {
return err
}
select {
case p.resolveChan <- *event:
case <-ctx.Done():
return ctx.Err()
}
}
}
func (p *process) processInterceptor(ctx context.Context,
interceptor htlcInterceptorClient) error {
for {
event, err := interceptor.recv()
if err != nil {
return err
}
key := event.circuitKey
resume := func(resume bool) error {
return interceptor.send(&interceptResponse{
key: key,
resume: resume,
})
}
select {
case p.interceptChan <- interceptEvent{
circuitKey: key,
incomingMsat: event.incomingMsat,
outgoingMsat: event.outgoingMsat,
resume: resume,
}:
case <-ctx.Done():
return ctx.Err()
}
}
}
func (p *process) getChanInfo(channel uint64) (*channel, error) {
// Try to look up from the cache.
ch, ok := p.chanMap[channel]
if ok {
return ch, nil
}
// Cache miss. Retrieve all channels and update the cache.
channels, err := p.client.listChannels()
if err != nil {
return nil, err
}
for chanId, ch := range channels {
p.chanMap[chanId] = ch
}
// Try looking up the channel again.
ch, ok = p.chanMap[channel]
if ok {
return ch, nil
}
// If the channel is not open, fall back to checking our closed
// channels.
closedChannels, err := p.client.listClosedChannels()
if err != nil {
return nil, err
}
// Add to cache and try again.
for chanId, ch := range closedChannels {
p.chanMap[chanId] = ch
}
ch, ok = p.chanMap[channel]
if ok {
return ch, nil
}
// Channel not found.
return nil, fmt.Errorf("%w: %v", errChannelNotFound, channel)
}