From a4c177f1182b44d98e0589594d0fcf8c966387af Mon Sep 17 00:00:00 2001 From: im-adithya Date: Mon, 5 Aug 2024 22:20:34 +0530 Subject: [PATCH 1/5] chore: stop subscription goroutines if node is stopped --- lnclient/lnd/lnd.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/lnclient/lnd/lnd.go b/lnclient/lnd/lnd.go index 41b6b9e4..3e99948f 100644 --- a/lnclient/lnd/lnd.go +++ b/lnclient/lnd/lnd.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "io" "math" "sort" "strconv" @@ -14,6 +15,8 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" decodepay "github.com/nbd-wtf/ln-decodepay" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/getAlby/hub/events" "github.com/getAlby/hub/lnclient" @@ -444,6 +447,12 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln default: payment, err := paymentStream.Recv() if err != nil { + if grpcErr, ok := status.FromError(err); ok { + if grpcErr.Code() == codes.Unknown { + logger.Logger.Error("LND node stopped or unreachable, exiting payment subscription") + return + } + } logger.Logger.WithError(err).Error("Failed to receive payment") continue } @@ -501,6 +510,10 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln default: invoice, err := invoiceStream.Recv() if err != nil { + if err == io.EOF { + logger.Logger.Error("LND node stopped or unreachable, exiting invoice subscription") + return + } logger.Logger.WithError(err).Error("Failed to receive invoice") continue } From b5caa97d2964fd9b94c65adcc6e5b358725ab98e Mon Sep 17 00:00:00 2001 From: im-adithya Date: Tue, 6 Aug 2024 15:45:11 +0530 Subject: [PATCH 2/5] chore: wait for 10 seconds before subscribing for txs --- lnclient/lnd/lnd.go | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/lnclient/lnd/lnd.go b/lnclient/lnd/lnd.go index 3e99948f..c8c5edb3 100644 --- a/lnclient/lnd/lnd.go +++ b/lnclient/lnd/lnd.go @@ -6,7 +6,6 @@ import ( "encoding/hex" "errors" "fmt" - "io" "math" "sort" "strconv" @@ -15,8 +14,6 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" decodepay "github.com/nbd-wtf/ln-decodepay" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/getAlby/hub/events" "github.com/getAlby/hub/lnclient" @@ -438,8 +435,10 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln }) if err != nil { logger.Logger.WithError(err).Error("Error subscribing to payments") + time.Sleep(10 * time.Second) continue } + paymentsLoop: for { select { case <-lndCtx.Done(): @@ -447,14 +446,8 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln default: payment, err := paymentStream.Recv() if err != nil { - if grpcErr, ok := status.FromError(err); ok { - if grpcErr.Code() == codes.Unknown { - logger.Logger.Error("LND node stopped or unreachable, exiting payment subscription") - return - } - } logger.Logger.WithError(err).Error("Failed to receive payment") - continue + break paymentsLoop } switch payment.Status { @@ -501,8 +494,10 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln invoiceStream, err := lndClient.SubscribeInvoices(lndCtx, &lnrpc.InvoiceSubscription{}) if err != nil { logger.Logger.WithError(err).Error("Error subscribing to invoices") + time.Sleep(10 * time.Second) continue } + invoicesLoop: for { select { case <-lndCtx.Done(): @@ -510,12 +505,8 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln default: invoice, err := invoiceStream.Recv() if err != nil { - if err == io.EOF { - logger.Logger.Error("LND node stopped or unreachable, exiting invoice subscription") - return - } logger.Logger.WithError(err).Error("Failed to receive invoice") - continue + break invoicesLoop } if invoice.State != lnrpc.Invoice_SETTLED { continue From 11a119622a208e524fe2800639874d8b69aba122 Mon Sep 17 00:00:00 2001 From: im-adithya Date: Tue, 6 Aug 2024 16:00:07 +0530 Subject: [PATCH 3/5] feat(lnd): store node info in memory --- lnclient/lnd/lnd.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/lnclient/lnd/lnd.go b/lnclient/lnd/lnd.go index c8c5edb3..69a12662 100644 --- a/lnclient/lnd/lnd.go +++ b/lnclient/lnd/lnd.go @@ -28,9 +28,9 @@ import ( ) type LNDService struct { - client *wrapper.LNDWrapper - pubkey string - cancel context.CancelFunc + client *wrapper.LNDWrapper + nodeInfo *lnclient.NodeInfo + cancel context.CancelFunc } func (svc *LNDService) GetBalance(ctx context.Context) (balance int64, err error) { @@ -95,7 +95,18 @@ func (svc *LNDService) ListTransactions(ctx context.Context, from, until, limit, } func (svc *LNDService) GetInfo(ctx context.Context) (info *lnclient.NodeInfo, err error) { - resp, err := svc.client.GetInfo(ctx, &lnrpc.GetInfoRequest{}) + if svc.nodeInfo == nil { + nodeInfo, err := fetchNodeInfo(ctx, svc.client) + if err != nil { + return nil, err + } + svc.nodeInfo = nodeInfo + } + return svc.nodeInfo, nil +} + +func fetchNodeInfo(ctx context.Context, client *wrapper.LNDWrapper) (*lnclient.NodeInfo, error) { + resp, err := client.GetInfo(ctx, &lnrpc.GetInfoRequest{}) if err != nil { return nil, err } @@ -144,7 +155,7 @@ func (svc *LNDService) ListChannels(ctx context.Context) ([]lnclient.Channel, er return nil, err } - nodeInfo, err := svc.GetInfo(ctx) + nodeInfo, err := svc.client.GetInfo(ctx, &lnrpc.GetInfoRequest{}) if err != nil { return nil, err } @@ -418,14 +429,14 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln logger.Logger.Errorf("Failed to create new LND client %v", err) return nil, err } - info, err := lndClient.GetInfo(ctx, &lnrpc.GetInfoRequest{}) + nodeInfo, err := fetchNodeInfo(ctx, lndClient) if err != nil { return nil, err } lndCtx, cancel := context.WithCancel(ctx) - lndService := &LNDService{client: lndClient, pubkey: info.IdentityPubkey, cancel: cancel} + lndService := &LNDService{client: lndClient, nodeInfo: nodeInfo, cancel: cancel} // Subscribe to payments go func() { @@ -525,7 +536,7 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln } }() - logger.Logger.Infof("Connected to LND - alias %s", info.Alias) + logger.Logger.Infof("Connected to LND - alias %s", nodeInfo.Alias) return lndService, nil } @@ -932,5 +943,5 @@ func (svc *LNDService) GetSupportedNIP47NotificationTypes() []string { } func (svc *LNDService) GetPubkey() string { - return svc.pubkey + return svc.nodeInfo.Pubkey } From ebee82aed90ff6d685ca944e81483e22032be126 Mon Sep 17 00:00:00 2001 From: im-adithya Date: Wed, 7 Aug 2024 13:32:20 +0530 Subject: [PATCH 4/5] chore: improve tx subscription goroutine --- lnclient/lnd/lnd.go | 84 ++++++++++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/lnclient/lnd/lnd.go b/lnclient/lnd/lnd.go index 69a12662..74d54eb1 100644 --- a/lnclient/lnd/lnd.go +++ b/lnclient/lnd/lnd.go @@ -95,13 +95,6 @@ func (svc *LNDService) ListTransactions(ctx context.Context, from, until, limit, } func (svc *LNDService) GetInfo(ctx context.Context) (info *lnclient.NodeInfo, err error) { - if svc.nodeInfo == nil { - nodeInfo, err := fetchNodeInfo(ctx, svc.client) - if err != nil { - return nil, err - } - svc.nodeInfo = nodeInfo - } return svc.nodeInfo, nil } @@ -441,24 +434,33 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln // Subscribe to payments go func() { for { - paymentStream, err := lndClient.SubscribePayments(lndCtx, &routerrpc.TrackPaymentsRequest{ - NoInflightUpdates: true, - }) - if err != nil { - logger.Logger.WithError(err).Error("Error subscribing to payments") - time.Sleep(10 * time.Second) - continue - } - paymentsLoop: - for { - select { - case <-lndCtx.Done(): - return - default: + select { + case <-lndCtx.Done(): + return + default: + paymentStream, err := lndClient.SubscribePayments(lndCtx, &routerrpc.TrackPaymentsRequest{ + NoInflightUpdates: true, + }) + if err != nil { + logger.Logger.WithError(err).Error("Error subscribing to payments") + select { + case <-lndCtx.Done(): + return + case <-time.After(10 * time.Second): + continue + } + } + paymentsLoop: + for { payment, err := paymentStream.Recv() if err != nil { logger.Logger.WithError(err).Error("Failed to receive payment") - break paymentsLoop + select { + case <-lndCtx.Done(): + return + case <-time.After(2 * time.Second): + break paymentsLoop + } } switch payment.Status { @@ -502,23 +504,33 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln // Subscribe to invoices go func() { for { - invoiceStream, err := lndClient.SubscribeInvoices(lndCtx, &lnrpc.InvoiceSubscription{}) - if err != nil { - logger.Logger.WithError(err).Error("Error subscribing to invoices") - time.Sleep(10 * time.Second) - continue - } - invoicesLoop: - for { - select { - case <-lndCtx.Done(): - return - default: + select { + case <-lndCtx.Done(): + return + default: + invoiceStream, err := lndClient.SubscribeInvoices(lndCtx, &lnrpc.InvoiceSubscription{}) + if err != nil { + logger.Logger.WithError(err).Error("Error subscribing to invoices") + select { + case <-lndCtx.Done(): + return + case <-time.After(10 * time.Second): + continue + } + } + invoicesLoop: + for { invoice, err := invoiceStream.Recv() if err != nil { - logger.Logger.WithError(err).Error("Failed to receive invoice") - break invoicesLoop + logger.Logger.WithError(err).Error("Failed to receive payment") + select { + case <-lndCtx.Done(): + return + case <-time.After(2 * time.Second): + break invoicesLoop + } } + if invoice.State != lnrpc.Invoice_SETTLED { continue } From 879f0c234f27218e6a1989cd907a1aad472f25a9 Mon Sep 17 00:00:00 2001 From: im-adithya Date: Wed, 7 Aug 2024 13:40:29 +0530 Subject: [PATCH 5/5] chore: invoice typo --- lnclient/lnd/lnd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lnclient/lnd/lnd.go b/lnclient/lnd/lnd.go index 74d54eb1..aeb1ab40 100644 --- a/lnclient/lnd/lnd.go +++ b/lnclient/lnd/lnd.go @@ -522,7 +522,7 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln for { invoice, err := invoiceStream.Recv() if err != nil { - logger.Logger.WithError(err).Error("Failed to receive payment") + logger.Logger.WithError(err).Error("Failed to receive invoice") select { case <-lndCtx.Done(): return