forked from statechannels/go-nitro
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Setup a payments manager with voucher cache (#17)
* Add payments manager * Use nitro node in payments manager * Add a placeholder HTTP middleware * Add voucher extraction and validation to the HTTP middleware * Use pointer for Nitro instance * Initialize quit channel * Fix signer recovery from payment header * Use strings for keys in voucher cache * Move ChainOpts to ETH chain service * Add comments and fix lint errors * Fix static checks * Removed commented out type
- Loading branch information
1 parent
c57319e
commit 5cda4b5
Showing
5 changed files
with
371 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
package paymentsmanager | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"errors" | ||
"io" | ||
"math/big" | ||
"net/http" | ||
"regexp" | ||
"strings" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
"github.com/statechannels/go-nitro/crypto" | ||
"golang.org/x/exp/slog" | ||
) | ||
|
||
const ( | ||
PAYMENT_HEADER_KEY = "x-payment" | ||
PAYMENT_HEADER_REGEX = "vhash:(.*),vsig:(.*)" | ||
) | ||
|
||
var ( | ||
ErrHeaderMissing = errors.New("payment header x-payment not set") | ||
ErrInvalidPaymentHeader = errors.New("invalid payment header format") | ||
ErrUnableToRecoverSigner = errors.New("unable to recover the voucher signer") | ||
) | ||
|
||
// HTTPMiddleware: extracts and validates vouchers from RPC requests | ||
func HTTPMiddleware(next http.Handler, validator VoucherValidator, queryRates map[string]*big.Int) http.Handler { | ||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
// Validate voucher | ||
r, err := extractAndValidateVoucher(r, validator, queryRates) | ||
if err != nil { | ||
if strings.Contains(err.Error(), ErrPayment) { | ||
http.Error(w, err.Error(), http.StatusPaymentRequired) | ||
} else { | ||
http.Error(w, err.Error(), http.StatusBadRequest) | ||
} | ||
|
||
return | ||
} | ||
|
||
// Let the request move ahead after voucher validation | ||
next.ServeHTTP(w, r) | ||
}) | ||
} | ||
|
||
func extractAndValidateVoucher(r *http.Request, validator VoucherValidator, queryRates map[string]*big.Int) (*http.Request, error) { | ||
// Determine RPC method from the request | ||
isRpcCall, rpcMethod := isRpcCall(r) | ||
if !isRpcCall { | ||
return r, nil | ||
} | ||
|
||
// Determine the query cost | ||
queryCost := queryRates[rpcMethod] | ||
if queryCost == nil || queryCost.Cmp(big.NewInt(0)) == 0 { | ||
slog.Info("Serving a free RPC request", "method", rpcMethod) | ||
return r, nil | ||
} | ||
|
||
// Extract voucher details from the header | ||
paymentHeader := r.Header.Get(PAYMENT_HEADER_KEY) | ||
if paymentHeader == "" { | ||
return r, ErrHeaderMissing | ||
} | ||
|
||
re := regexp.MustCompile(PAYMENT_HEADER_REGEX) | ||
match := re.FindStringSubmatch(paymentHeader) | ||
|
||
var vhash, vsig string | ||
if match != nil { | ||
vhash = match[1] | ||
vsig = match[2] | ||
} else { | ||
return r, ErrInvalidPaymentHeader | ||
} | ||
|
||
// Determine signer from the voucher hash and signature | ||
vhashBytes := common.Hex2Bytes(strings.TrimPrefix(vhash, "0x")) | ||
signature := crypto.SplitSignature(common.Hex2Bytes(strings.TrimPrefix(vsig, "0x"))) | ||
signer, err := crypto.RecoverEthereumMessageSigner(vhashBytes, signature) | ||
if err != nil { | ||
return r, ErrUnableToRecoverSigner | ||
} | ||
|
||
// Remove the payment header from the request | ||
r.Header.Del(PAYMENT_HEADER_KEY) | ||
|
||
err = validator.ValidateVoucher(common.HexToHash(vhash), signer, queryCost) | ||
if err != nil { | ||
return r, err | ||
} | ||
|
||
slog.Info("Serving a paid RPC request", "method", rpcMethod, "cost", queryCost, "sender", signer.Hex()) | ||
return r, nil | ||
} | ||
|
||
// Helper method to parse request and determine whether it's a RPC call | ||
// A request is a RPC call if: | ||
// - "Content-Type" header is set to "application/json" | ||
// - Request body has non-empty "jsonrpc" and "method" fields | ||
// | ||
// Also returns the parsed RPC method | ||
func isRpcCall(r *http.Request) (bool, string) { | ||
if r.Header.Get("Content-Type") != "application/json" { | ||
return false, "" | ||
} | ||
|
||
var ReqBody struct { | ||
JsonRpc string `json:"jsonrpc"` | ||
Method string `json:"method"` | ||
} | ||
bodyBytes, _ := io.ReadAll(r.Body) | ||
|
||
err := json.Unmarshal(bodyBytes, &ReqBody) | ||
if err != nil || ReqBody.JsonRpc == "" || ReqBody.Method == "" { | ||
return false, "" | ||
} | ||
|
||
// Reassign request body as io.ReadAll consumes it | ||
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) | ||
|
||
return true, ReqBody.Method | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
package paymentsmanager | ||
|
||
import ( | ||
"math/big" | ||
"sync" | ||
"time" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
"github.com/hashicorp/golang-lru/v2/expirable" | ||
"github.com/statechannels/go-nitro/node" | ||
"github.com/statechannels/go-nitro/payments" | ||
"github.com/statechannels/go-nitro/types" | ||
"golang.org/x/exp/slog" | ||
) | ||
|
||
const ( | ||
DEFAULT_LRU_CACHE_MAX_ACCOUNTS = 1000 | ||
DEFAULT_LRU_CACHE_ACCOUNT_TTL = 30 * 60 // 30mins | ||
DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT = 1000 | ||
DEFAULT_LRU_CACHE_VOUCHER_TTL = 5 * 60 // 5mins | ||
DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS = 10000 | ||
DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL = DEFAULT_LRU_CACHE_ACCOUNT_TTL | ||
|
||
DEFAULT_VOUCHER_CHECK_INTERVAL = 2 | ||
DEFAULT_VOUCHER_CHECK_ATTEMPTS = 5 | ||
) | ||
|
||
type InFlightVoucher struct { | ||
voucher payments.Voucher | ||
amount *big.Int | ||
} | ||
|
||
// Struct representing the payments manager service | ||
type PaymentsManager struct { | ||
nitro *node.Node | ||
|
||
// In-memory LRU cache of vouchers received on payment channels | ||
// Map: payer -> voucher hash -> InFlightVoucher (voucher, delta amount) | ||
receivedVouchersCache *expirable.LRU[string, *expirable.LRU[string, InFlightVoucher]] | ||
|
||
// LRU map to keep track of amounts paid so far on payment channels | ||
// Map: channel id -> amount paid so far | ||
paidSoFarOnChannel *expirable.LRU[string, *big.Int] | ||
|
||
// Used to signal shutdown of the service | ||
quitChan chan bool | ||
} | ||
|
||
func NewPaymentsManager(nitro *node.Node) (PaymentsManager, error) { | ||
pm := PaymentsManager{nitro: nitro} | ||
|
||
pm.receivedVouchersCache = expirable.NewLRU[string, *expirable.LRU[string, InFlightVoucher]]( | ||
DEFAULT_LRU_CACHE_MAX_ACCOUNTS, | ||
nil, | ||
time.Second*DEFAULT_LRU_CACHE_ACCOUNT_TTL, | ||
) | ||
|
||
pm.paidSoFarOnChannel = expirable.NewLRU[string, *big.Int]( | ||
DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS, | ||
nil, | ||
time.Second*DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL, | ||
) | ||
|
||
pm.quitChan = make(chan bool) | ||
|
||
// Load existing open payment channels with amount paid so far from the stored state | ||
err := pm.loadPaymentChannels() | ||
if err != nil { | ||
return PaymentsManager{}, err | ||
} | ||
|
||
return pm, nil | ||
} | ||
|
||
func (pm *PaymentsManager) Start(wg *sync.WaitGroup) { | ||
slog.Info("starting payments manager...") | ||
|
||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
pm.run() | ||
}() | ||
} | ||
|
||
func (pm *PaymentsManager) Stop() error { | ||
slog.Info("stopping payments manager...") | ||
close(pm.quitChan) | ||
return nil | ||
} | ||
|
||
func (pm *PaymentsManager) ValidateVoucher(voucherHash common.Hash, signerAddress common.Address, value *big.Int) (bool, bool) { | ||
// Check the payments map for required voucher | ||
var isPaymentReceived, isOfSufficientValue bool | ||
for i := 0; i < DEFAULT_VOUCHER_CHECK_ATTEMPTS; i++ { | ||
isPaymentReceived, isOfSufficientValue = pm.checkVoucherInCache(voucherHash, signerAddress, value) | ||
|
||
if isPaymentReceived { | ||
return true, isOfSufficientValue | ||
} | ||
|
||
// Retry after an interval if voucher not found | ||
slog.Info("Payment from %s not found, retrying after %d sec...", signerAddress, DEFAULT_VOUCHER_CHECK_INTERVAL) | ||
time.Sleep(DEFAULT_VOUCHER_CHECK_INTERVAL * time.Second) | ||
} | ||
|
||
return false, false | ||
} | ||
|
||
// Check for a given payment voucher in LRU cache map | ||
// Returns whether the voucher was found, whether it was of sufficient value | ||
func (pm *PaymentsManager) checkVoucherInCache(voucherHash common.Hash, signerAddress common.Address, minRequiredValue *big.Int) (bool, bool) { | ||
vouchersMap, ok := pm.receivedVouchersCache.Get(signerAddress.Hex()) | ||
if !ok { | ||
return false, false | ||
} | ||
|
||
receivedVoucher, ok := vouchersMap.Get(voucherHash.Hex()) | ||
if !ok { | ||
return false, false | ||
} | ||
|
||
if receivedVoucher.amount.Cmp(minRequiredValue) < 0 { | ||
return true, false | ||
} | ||
|
||
// Delete the voucher from map after consuming it | ||
vouchersMap.Remove(voucherHash.Hex()) | ||
return true, true | ||
} | ||
|
||
func (pm *PaymentsManager) run() { | ||
slog.Info("starting voucher subscription...") | ||
for { | ||
select { | ||
case voucher := <-pm.nitro.ReceivedVouchers(): | ||
payer, err := pm.getChannelCounterparty(voucher.ChannelId) | ||
if err != nil { | ||
// TODO: Handle | ||
panic(err) | ||
} | ||
|
||
paidSoFar, ok := pm.paidSoFarOnChannel.Get(voucher.ChannelId.String()) | ||
if !ok { | ||
paidSoFar = big.NewInt(0) | ||
} | ||
|
||
paymentAmount := big.NewInt(0).Sub(voucher.Amount, paidSoFar) | ||
pm.paidSoFarOnChannel.Add(voucher.ChannelId.String(), voucher.Amount) | ||
slog.Info("Received a voucher", "payer", payer.String(), "amount", paymentAmount.String()) | ||
|
||
vouchersMap, ok := pm.receivedVouchersCache.Get(payer.Hex()) | ||
if !ok { | ||
vouchersMap = expirable.NewLRU[string, InFlightVoucher]( | ||
DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT, | ||
nil, | ||
time.Second*DEFAULT_LRU_CACHE_VOUCHER_TTL, | ||
) | ||
|
||
pm.receivedVouchersCache.Add(payer.Hex(), vouchersMap) | ||
} | ||
|
||
voucherHash, err := voucher.Hash() | ||
if err != nil { | ||
// TODO: Handle | ||
panic(err) | ||
} | ||
|
||
vouchersMap.Add(voucherHash.Hex(), InFlightVoucher{voucher: voucher, amount: paymentAmount}) | ||
case <-pm.quitChan: | ||
slog.Info("stopping voucher subscription loop") | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (pm *PaymentsManager) getChannelCounterparty(channelId types.Destination) (common.Address, error) { | ||
paymentChannel, err := pm.nitro.GetPaymentChannel(channelId) | ||
if err != nil { | ||
return common.Address{}, err | ||
} | ||
|
||
return paymentChannel.Balance.Payer, nil | ||
} | ||
|
||
func (pm *PaymentsManager) loadPaymentChannels() error { | ||
// TODO: Implement | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
package paymentsmanager | ||
|
||
import ( | ||
"fmt" | ||
"math/big" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
) | ||
|
||
var ( | ||
ErrPayment = "Payment error:" | ||
ErrPaymentNotReceived = fmt.Errorf("%s payment not received", ErrPayment) | ||
ErrAmountInsufficient = fmt.Errorf("%s amount insufficient", ErrPayment) | ||
) | ||
|
||
// Voucher validator interface to be satisfied by implementations | ||
// using in / out of process Nitro nodes | ||
type VoucherValidator interface { | ||
ValidateVoucher(voucherHash common.Hash, signerAddress common.Address, value *big.Int) error | ||
} | ||
|
||
var _ VoucherValidator = &InProcessVoucherValidator{} | ||
|
||
// When go-nitro is running in-process | ||
type InProcessVoucherValidator struct { | ||
PaymentsManager | ||
} | ||
|
||
func (v InProcessVoucherValidator) ValidateVoucher(voucherHash common.Hash, signerAddress common.Address, value *big.Int) error { | ||
isPaymentReceived, isOfSufficientValue := v.PaymentsManager.ValidateVoucher(voucherHash, signerAddress, value) | ||
|
||
if !isPaymentReceived { | ||
return ErrPaymentNotReceived | ||
} | ||
|
||
if !isOfSufficientValue { | ||
return ErrAmountInsufficient | ||
} | ||
|
||
return nil | ||
} | ||
|
||
var _ VoucherValidator = &RemoteVoucherValidator{} | ||
|
||
// When go-nitro is running remotely | ||
type RemoteVoucherValidator struct { | ||
// client rpc.RpcClientApi | ||
} | ||
|
||
func (r RemoteVoucherValidator) ValidateVoucher(voucherHash common.Hash, signerAddress common.Address, value *big.Int) error { | ||
// TODO: Implement | ||
return nil | ||
} |