Skip to content

Commit

Permalink
Setup a payments manager with voucher cache (#17)
Browse files Browse the repository at this point in the history
* Add payments manager

* Use nitro node in payments manager

* Add a placeholder HTTP middleware

* Add voucher extraction and validation to the HTTP middleware

* Use pointer for Nitro instance

* Initialize quit channel

* Fix signer recovery from payment header

* Use strings for keys in voucher cache

* Move ChainOpts to ETH chain service

* Add comments and fix lint errors

* Fix static checks

* Removed commented out type
  • Loading branch information
prathamesh0 authored and neerajvijay1997 committed Oct 16, 2023
1 parent d54438d commit 1ec0ec7
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 1 deletion.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ require (
require (
github.com/BurntSushi/toml v1.3.2
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/libp2p/go-libp2p-kad-dht v0.24.2
github.com/tidwall/buntdb v1.2.10
github.com/urfave/cli/v2 v2.25.3
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63

)

Expand Down Expand Up @@ -143,7 +145,6 @@ require (
go.uber.org/fx v1.20.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.25.0 // indirect
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sync v0.3.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d h1:dg1dEPuWpEqDnvIw251EVy4zlP8gWbsGj4BsUKCRpYs=
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA=
Expand Down
126 changes: 126 additions & 0 deletions paymentsmanager/http_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package paymentsmanager

import (
"bytes"
"encoding/json"
"errors"
"io"
"math/big"
"net/http"
"regexp"
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/statechannels/go-nitro/crypto"
"golang.org/x/exp/slog"
)

const (
PAYMENT_HEADER_KEY = "x-payment"
PAYMENT_HEADER_REGEX = "vhash:(.*),vsig:(.*)"
)

var (
ErrHeaderMissing = errors.New("payment header x-payment not set")
ErrInvalidPaymentHeader = errors.New("invalid payment header format")
ErrUnableToRecoverSigner = errors.New("unable to recover the voucher signer")
)

// HTTPMiddleware: extracts and validates vouchers from RPC requests
func HTTPMiddleware(next http.Handler, validator VoucherValidator, queryRates map[string]*big.Int) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Validate voucher
r, err := extractAndValidateVoucher(r, validator, queryRates)
if err != nil {
if strings.Contains(err.Error(), ErrPayment) {
http.Error(w, err.Error(), http.StatusPaymentRequired)
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
}

return
}

// Let the request move ahead after voucher validation
next.ServeHTTP(w, r)
})
}

func extractAndValidateVoucher(r *http.Request, validator VoucherValidator, queryRates map[string]*big.Int) (*http.Request, error) {
// Determine RPC method from the request
isRpcCall, rpcMethod := isRpcCall(r)
if !isRpcCall {
return r, nil
}

// Determine the query cost
queryCost := queryRates[rpcMethod]
if queryCost == nil || queryCost.Cmp(big.NewInt(0)) == 0 {
slog.Info("Serving a free RPC request", "method", rpcMethod)
return r, nil
}

// Extract voucher details from the header
paymentHeader := r.Header.Get(PAYMENT_HEADER_KEY)
if paymentHeader == "" {
return r, ErrHeaderMissing
}

re := regexp.MustCompile(PAYMENT_HEADER_REGEX)
match := re.FindStringSubmatch(paymentHeader)

var vhash, vsig string
if match != nil {
vhash = match[1]
vsig = match[2]
} else {
return r, ErrInvalidPaymentHeader
}

// Determine signer from the voucher hash and signature
vhashBytes := common.Hex2Bytes(strings.TrimPrefix(vhash, "0x"))
signature := crypto.SplitSignature(common.Hex2Bytes(strings.TrimPrefix(vsig, "0x")))
signer, err := crypto.RecoverEthereumMessageSigner(vhashBytes, signature)
if err != nil {
return r, ErrUnableToRecoverSigner
}

// Remove the payment header from the request
r.Header.Del(PAYMENT_HEADER_KEY)

err = validator.ValidateVoucher(common.HexToHash(vhash), signer, queryCost)
if err != nil {
return r, err
}

slog.Info("Serving a paid RPC request", "method", rpcMethod, "cost", queryCost, "sender", signer.Hex())
return r, nil
}

// Helper method to parse request and determine whether it's a RPC call
// A request is a RPC call if:
// - "Content-Type" header is set to "application/json"
// - Request body has non-empty "jsonrpc" and "method" fields
//
// Also returns the parsed RPC method
func isRpcCall(r *http.Request) (bool, string) {
if r.Header.Get("Content-Type") != "application/json" {
return false, ""
}

var ReqBody struct {
JsonRpc string `json:"jsonrpc"`
Method string `json:"method"`
}
bodyBytes, _ := io.ReadAll(r.Body)

err := json.Unmarshal(bodyBytes, &ReqBody)
if err != nil || ReqBody.JsonRpc == "" || ReqBody.Method == "" {
return false, ""
}

// Reassign request body as io.ReadAll consumes it
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))

return true, ReqBody.Method
}
188 changes: 188 additions & 0 deletions paymentsmanager/payments_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package paymentsmanager

import (
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/statechannels/go-nitro/node"
"github.com/statechannels/go-nitro/payments"
"github.com/statechannels/go-nitro/types"
"golang.org/x/exp/slog"
)

const (
DEFAULT_LRU_CACHE_MAX_ACCOUNTS = 1000
DEFAULT_LRU_CACHE_ACCOUNT_TTL = 30 * 60 // 30mins
DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT = 1000
DEFAULT_LRU_CACHE_VOUCHER_TTL = 5 * 60 // 5mins
DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS = 10000
DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL = DEFAULT_LRU_CACHE_ACCOUNT_TTL

DEFAULT_VOUCHER_CHECK_INTERVAL = 2
DEFAULT_VOUCHER_CHECK_ATTEMPTS = 5
)

type InFlightVoucher struct {
voucher payments.Voucher
amount *big.Int
}

// Struct representing the payments manager service
type PaymentsManager struct {
nitro *node.Node

// In-memory LRU cache of vouchers received on payment channels
// Map: payer -> voucher hash -> InFlightVoucher (voucher, delta amount)
receivedVouchersCache *expirable.LRU[string, *expirable.LRU[string, InFlightVoucher]]

// LRU map to keep track of amounts paid so far on payment channels
// Map: channel id -> amount paid so far
paidSoFarOnChannel *expirable.LRU[string, *big.Int]

// Used to signal shutdown of the service
quitChan chan bool
}

func NewPaymentsManager(nitro *node.Node) (PaymentsManager, error) {
pm := PaymentsManager{nitro: nitro}

pm.receivedVouchersCache = expirable.NewLRU[string, *expirable.LRU[string, InFlightVoucher]](
DEFAULT_LRU_CACHE_MAX_ACCOUNTS,
nil,
time.Second*DEFAULT_LRU_CACHE_ACCOUNT_TTL,
)

pm.paidSoFarOnChannel = expirable.NewLRU[string, *big.Int](
DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS,
nil,
time.Second*DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL,
)

pm.quitChan = make(chan bool)

// Load existing open payment channels with amount paid so far from the stored state
err := pm.loadPaymentChannels()
if err != nil {
return PaymentsManager{}, err
}

return pm, nil
}

func (pm *PaymentsManager) Start(wg *sync.WaitGroup) {
slog.Info("starting payments manager...")

wg.Add(1)
go func() {
defer wg.Done()
pm.run()
}()
}

func (pm *PaymentsManager) Stop() error {
slog.Info("stopping payments manager...")
close(pm.quitChan)
return nil
}

func (pm *PaymentsManager) ValidateVoucher(voucherHash common.Hash, signerAddress common.Address, value *big.Int) (bool, bool) {
// Check the payments map for required voucher
var isPaymentReceived, isOfSufficientValue bool
for i := 0; i < DEFAULT_VOUCHER_CHECK_ATTEMPTS; i++ {
isPaymentReceived, isOfSufficientValue = pm.checkVoucherInCache(voucherHash, signerAddress, value)

if isPaymentReceived {
return true, isOfSufficientValue
}

// Retry after an interval if voucher not found
slog.Info("Payment from %s not found, retrying after %d sec...", signerAddress, DEFAULT_VOUCHER_CHECK_INTERVAL)
time.Sleep(DEFAULT_VOUCHER_CHECK_INTERVAL * time.Second)
}

return false, false
}

// Check for a given payment voucher in LRU cache map
// Returns whether the voucher was found, whether it was of sufficient value
func (pm *PaymentsManager) checkVoucherInCache(voucherHash common.Hash, signerAddress common.Address, minRequiredValue *big.Int) (bool, bool) {
vouchersMap, ok := pm.receivedVouchersCache.Get(signerAddress.Hex())
if !ok {
return false, false
}

receivedVoucher, ok := vouchersMap.Get(voucherHash.Hex())
if !ok {
return false, false
}

if receivedVoucher.amount.Cmp(minRequiredValue) < 0 {
return true, false
}

// Delete the voucher from map after consuming it
vouchersMap.Remove(voucherHash.Hex())
return true, true
}

func (pm *PaymentsManager) run() {
slog.Info("starting voucher subscription...")
for {
select {
case voucher := <-pm.nitro.ReceivedVouchers():
payer, err := pm.getChannelCounterparty(voucher.ChannelId)
if err != nil {
// TODO: Handle
panic(err)
}

paidSoFar, ok := pm.paidSoFarOnChannel.Get(voucher.ChannelId.String())
if !ok {
paidSoFar = big.NewInt(0)
}

paymentAmount := big.NewInt(0).Sub(voucher.Amount, paidSoFar)
pm.paidSoFarOnChannel.Add(voucher.ChannelId.String(), voucher.Amount)
slog.Info("Received a voucher", "payer", payer.String(), "amount", paymentAmount.String())

vouchersMap, ok := pm.receivedVouchersCache.Get(payer.Hex())
if !ok {
vouchersMap = expirable.NewLRU[string, InFlightVoucher](
DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT,
nil,
time.Second*DEFAULT_LRU_CACHE_VOUCHER_TTL,
)

pm.receivedVouchersCache.Add(payer.Hex(), vouchersMap)
}

voucherHash, err := voucher.Hash()
if err != nil {
// TODO: Handle
panic(err)
}

vouchersMap.Add(voucherHash.Hex(), InFlightVoucher{voucher: voucher, amount: paymentAmount})
case <-pm.quitChan:
slog.Info("stopping voucher subscription loop")
return
}
}
}

func (pm *PaymentsManager) getChannelCounterparty(channelId types.Destination) (common.Address, error) {
paymentChannel, err := pm.nitro.GetPaymentChannel(channelId)
if err != nil {
return common.Address{}, err
}

return paymentChannel.Balance.Payer, nil
}

func (pm *PaymentsManager) loadPaymentChannels() error {
// TODO: Implement
return nil
}
53 changes: 53 additions & 0 deletions paymentsmanager/validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package paymentsmanager

import (
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
)

var (
ErrPayment = "Payment error:"
ErrPaymentNotReceived = fmt.Errorf("%s payment not received", ErrPayment)
ErrAmountInsufficient = fmt.Errorf("%s amount insufficient", ErrPayment)
)

// Voucher validator interface to be satisfied by implementations
// using in / out of process Nitro nodes
type VoucherValidator interface {
ValidateVoucher(voucherHash common.Hash, signerAddress common.Address, value *big.Int) error
}

var _ VoucherValidator = &InProcessVoucherValidator{}

// When go-nitro is running in-process
type InProcessVoucherValidator struct {
PaymentsManager
}

func (v InProcessVoucherValidator) ValidateVoucher(voucherHash common.Hash, signerAddress common.Address, value *big.Int) error {
isPaymentReceived, isOfSufficientValue := v.PaymentsManager.ValidateVoucher(voucherHash, signerAddress, value)

if !isPaymentReceived {
return ErrPaymentNotReceived
}

if !isOfSufficientValue {
return ErrAmountInsufficient
}

return nil
}

var _ VoucherValidator = &RemoteVoucherValidator{}

// When go-nitro is running remotely
type RemoteVoucherValidator struct {
// client rpc.RpcClientApi
}

func (r RemoteVoucherValidator) ValidateVoucher(voucherHash common.Hash, signerAddress common.Address, value *big.Int) error {
// TODO: Implement
return nil
}

0 comments on commit 1ec0ec7

Please sign in to comment.