Skip to content

Commit

Permalink
feat(blob): improve gas estimation and track min gas price (celestiao…
Browse files Browse the repository at this point in the history
…rg#2511)

## Overview

This PR incorporates the more recent gas estimation method in
celestia-app. It also makes use of a new gRPC endpoint for consensus
nodes that reveals the nodes min gas price. This enables the blob module
to submit transactions with the correct gas price. If the node changes
that price, the blob module is able to parse the error and retry using
the new gas price.

Additionally, a stubbed CoreAccessor was introduced for when a core
endpoint is not provided in order to return a better/more readable
error, whereas previously, the CoreAccessor depended on the grpc dial to
silently fail during start and any call to the state module would return
a "cannot dial address" error.

It now looks like this: 

```
{
  "jsonrpc": "2.0",
  "id": 1,
  "error": {
    "code": 1,
    "message": "node is running without state access"
  }
}
```

## Checklist

<!-- 
Please complete the checklist to ensure that the PR is ready to be
reviewed.

IMPORTANT:
PRs should be left in Draft until the below checklist is completed.
-->

- [ ] New and updated code has appropriate documentation
- [ ] New and updated code has new and/or updated testing
- [ ] Required CI checks are passing
- [ ] Visual proof for any user facing features like CLI or
documentation updates
- [ ] Linked issues closed with keywords

---------

Co-authored-by: rene <41963722+renaynay@users.noreply.github.com>
  • Loading branch information
cmwaters and renaynay authored Aug 22, 2023
1 parent 10351be commit bb9b4d4
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 61 deletions.
17 changes: 0 additions & 17 deletions blob/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/shares"

"github.com/celestiaorg/celestia-node/share"
Expand Down Expand Up @@ -86,22 +85,6 @@ func BlobsToShares(blobs ...*Blob) ([]share.Share, error) {
return shares.ToBytes(rawShares), nil
}

const (
perByteGasTolerance = 2
pfbGasFixedCost = 80000
)

// estimateGas estimates the gas required to pay for a set of blobs in a PFB.
func estimateGas(blobs ...*Blob) uint64 {
totalByteCount := 0
for _, blob := range blobs {
totalByteCount += len(blob.Data) + appconsts.NamespaceSize
}
variableGasAmount := (appconsts.DefaultGasPerBlobByte + perByteGasTolerance) * totalByteCount

return uint64(variableGasAmount + pfbGasFixedCost)
}

// constructAndVerifyBlob reconstruct a Blob from the passed shares and compares commitments.
func constructAndVerifyBlob(sh []share.Share, commitment Commitment) (*Blob, bool, error) {
blob, err := SharesToBlobs(sh)
Expand Down
11 changes: 3 additions & 8 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/cosmos/cosmos-sdk/types"
logging "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/shares"

"github.com/celestiaorg/celestia-node/header"
Expand All @@ -36,7 +35,7 @@ type Service struct {
blobSumitter Submitter
// shareGetter retrieves the EDS to fetch all shares from the requested header.
shareGetter share.Getter
// headerGetter fetches header by the provided height
// headerGetter fetches header by the provided height
headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error)
}

Expand All @@ -55,15 +54,11 @@ func NewService(
// Submit sends PFB transaction and reports the height in which it was included.
// Allows sending multiple Blobs atomically synchronously.
// Uses default wallet registered on the Node.
// Handles gas estimation and fee calculation.
func (s *Service) Submit(ctx context.Context, blobs []*Blob) (uint64, error) {
log.Debugw("submitting blobs", "amount", len(blobs))

var (
gasLimit = estimateGas(blobs...)
fee = int64(appconsts.DefaultMinGasPrice * float64(gasLimit))
)

resp, err := s.blobSumitter.SubmitPayForBlob(ctx, types.NewInt(fee), gasLimit, blobs)
resp, err := s.blobSumitter.SubmitPayForBlob(ctx, types.OneInt().Neg(), 0, blobs)
if err != nil {
return 0, err
}
Expand Down
146 changes: 129 additions & 17 deletions state/core_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"

sdkErrors "cosmossdk.io/errors"
"github.com/cosmos/cosmos-sdk/api/tendermint/abci"
nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdktypes "github.com/cosmos/cosmos-sdk/types"
sdktx "github.com/cosmos/cosmos-sdk/types/tx"
auth "github.com/cosmos/cosmos-sdk/x/auth/types"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -21,6 +25,8 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/celestiaorg/celestia-app/app"
apperrors "github.com/celestiaorg/celestia-app/app/errors"
"github.com/celestiaorg/celestia-app/pkg/appconsts"
appblob "github.com/celestiaorg/celestia-app/x/blob"
apptypes "github.com/celestiaorg/celestia-app/x/blob/types"
libhead "github.com/celestiaorg/go-header"
Expand All @@ -34,6 +40,8 @@ var (
ErrInvalidAmount = errors.New("state: amount must be greater than zero")
)

const maxRetries = 5

// CoreAccessor implements service over a gRPC connection
// with a celestia-core node.
type CoreAccessor struct {
Expand All @@ -54,8 +62,15 @@ type CoreAccessor struct {
rpcPort string
grpcPort string

// these fields are mutatable and thus need to be protected by a mutex
lock sync.Mutex
lastPayForBlob int64
payForBlobCount int64
// minGasPrice is the minimum gas price that the node will accept.
// NOTE: just because the first node accepts the transaction, does not mean it
// will find a proposer that does accept the transaction. Better would be
// to set a global min gas price that correct processes conform to.
minGasPrice float64
}

// NewCoreAccessor dials the given celestia-core endpoint and
Expand Down Expand Up @@ -112,6 +127,11 @@ func (ca *CoreAccessor) Start(ctx context.Context) error {
}
ca.rpcCli = cli

ca.minGasPrice, err = ca.queryMinimumGasPrice(ctx)
if err != nil {
return fmt.Errorf("querying minimum gas price: %w", err)
}

return nil
}

Expand Down Expand Up @@ -160,6 +180,9 @@ func (ca *CoreAccessor) constructSignedTx(
return ca.signer.EncodeTx(tx)
}

// SubmitPayForBlob builds, signs, and synchronously submits a MsgPayForBlob. It blocks until the transaction
// is committed and returns the TxReponse. If gasLim is set to 0, the method will automatically estimate the
// gas limit. If the fee is negative, the method will use the nodes min gas price multiplied by the gas limit.
func (ca *CoreAccessor) SubmitPayForBlob(
ctx context.Context,
fee Int,
Expand All @@ -178,25 +201,67 @@ func (ca *CoreAccessor) SubmitPayForBlob(
appblobs[i] = &b.Blob
}

response, err := appblob.SubmitPayForBlob(
ctx,
ca.signer,
ca.coreConn,
sdktx.BroadcastMode_BROADCAST_MODE_BLOCK,
appblobs,
apptypes.SetGasLimit(gasLim),
withFee(fee),
)
// metrics should only be counted on a successful PFD tx
if err == nil && response.Code == 0 {
ca.lastPayForBlob = time.Now().UnixMilli()
ca.payForBlobCount++
}
// we only estimate gas if the user wants us to (by setting the gasLim to 0). In the future we may want
// to make these arguments optional.
if gasLim == 0 {
blobSizes := make([]uint32, len(blobs))
for i, blob := range blobs {
blobSizes[i] = uint32(len(blob.Data))
}

if response != nil && response.Code != 0 {
err = errors.Join(err, sdkErrors.ABCIError(response.Codespace, response.Code, response.Logs.String()))
// TODO (@cmwaters): the default gas per byte and the default tx size cost per byte could be changed
// through governance. This section could be more robust by tracking these values and adjusting the
// gas limit accordingly (as is done for the gas price)
gasLim = apptypes.EstimateGas(blobSizes, appconsts.DefaultGasPerBlobByte, auth.DefaultTxSizeCostPerByte)
}

minGasPrice := ca.getMinGasPrice()

// set the fee for the user as the minimum gas price multiplied by the gas limit
estimatedFee := false
if fee.IsNegative() {
estimatedFee = true
fee = sdktypes.NewInt(int64(math.Ceil(minGasPrice * float64(gasLim))))
}

var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ {
response, err := appblob.SubmitPayForBlob(
ctx,
ca.signer,
ca.coreConn,
sdktx.BroadcastMode_BROADCAST_MODE_BLOCK,
appblobs,
apptypes.SetGasLimit(gasLim),
withFee(fee),
)

// the node is capable of changing the min gas price at any time so we must be able to detect it and
// update our version accordingly
if apperrors.IsInsufficientMinGasPrice(err) && estimatedFee {
// The error message contains enough information to parse the new min gas price
minGasPrice, err = apperrors.ParseInsufficientMinGasPrice(err, minGasPrice, gasLim)
if err != nil {
return nil, fmt.Errorf("parsing insufficient min gas price error: %w", err)
}
ca.setMinGasPrice(minGasPrice)
lastErr = err
// update the fee to retry again
fee = sdktypes.NewInt(int64(math.Ceil(minGasPrice * float64(gasLim))))
continue
}

// metrics should only be counted on a successful PFD tx
if err == nil && response.Code == 0 {
ca.markSuccessfulPFB()
}

if response != nil && response.Code != 0 {
err = errors.Join(err, sdkErrors.ABCIError(response.Codespace, response.Code, response.Logs.String()))
}
return response, err
}
return response, err
return nil, fmt.Errorf("failed to submit blobs after %d attempts: %w", maxRetries, lastErr)
}

func (ca *CoreAccessor) AccountAddress(context.Context) (Address, error) {
Expand Down Expand Up @@ -460,6 +525,53 @@ func (ca *CoreAccessor) QueryRedelegations(
})
}

func (ca *CoreAccessor) LastPayForBlob() int64 {
ca.lock.Lock()
defer ca.lock.Unlock()
return ca.lastPayForBlob
}

func (ca *CoreAccessor) PayForBlobCount() int64 {
ca.lock.Lock()
defer ca.lock.Unlock()
return ca.payForBlobCount
}

func (ca *CoreAccessor) markSuccessfulPFB() {
ca.lock.Lock()
defer ca.lock.Unlock()
ca.lastPayForBlob = time.Now().UnixMilli()
ca.payForBlobCount++
}

func (ca *CoreAccessor) setMinGasPrice(minGasPrice float64) {
ca.lock.Lock()
defer ca.lock.Unlock()
ca.minGasPrice = minGasPrice
}

func (ca *CoreAccessor) getMinGasPrice() float64 {
ca.lock.Lock()
defer ca.lock.Unlock()
return ca.minGasPrice
}

// QueryMinimumGasPrice returns the minimum gas price required by the node.
func (ca *CoreAccessor) queryMinimumGasPrice(
ctx context.Context,
) (float64, error) {
rsp, err := nodeservice.NewServiceClient(ca.coreConn).Config(ctx, &nodeservice.ConfigRequest{})
if err != nil {
return 0, err
}

coins, err := sdktypes.ParseDecCoins(rsp.MinimumGasPrice)
if err != nil {
return 0, err
}
return coins.AmountOf(app.BondDenom).MustFloat64(), nil
}

func (ca *CoreAccessor) IsStopped(context.Context) bool {
return ca.ctx.Err() != nil
}
Expand Down
97 changes: 80 additions & 17 deletions state/core_access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,95 @@ package state

import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"

"cosmossdk.io/math"
sdktypes "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-app/app"
"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/test/util/testnode"
blobtypes "github.com/celestiaorg/celestia-app/x/blob/types"

"github.com/celestiaorg/celestia-node/blob"
"github.com/celestiaorg/celestia-node/share"
)

func TestLifecycle(t *testing.T) {
ca := NewCoreAccessor(nil, nil, "", "", "")
func TestSubmitPayForBlob(t *testing.T) {
accounts := []string{"jimy", "rob"}
tmCfg := testnode.DefaultTendermintConfig()
tmCfg.Consensus.TimeoutCommit = time.Millisecond * 1
appConf := testnode.DefaultAppConfig()
appConf.API.Enable = true
appConf.MinGasPrices = fmt.Sprintf("0.1%s", app.BondDenom)

config := testnode.DefaultConfig().WithTendermintConfig(tmCfg).WithAppConfig(appConf).WithAccounts(accounts)
cctx, rpcAddr, grpcAddr := testnode.NewNetwork(t, config)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

signer := blobtypes.NewKeyringSigner(cctx.Keyring, accounts[0], cctx.ChainID)
ca := NewCoreAccessor(signer, nil, "127.0.0.1", extractPort(rpcAddr), extractPort(grpcAddr))
// start the accessor
err := ca.Start(ctx)
require.NoError(t, err)
// ensure accessor isn't stopped
require.False(t, ca.IsStopped(ctx))
// cancel the top level context (this should not affect the lifecycle of the
// accessor as it should manage its own internal context)
cancel()
// ensure accessor was unaffected by top-level context cancellation
require.False(t, ca.IsStopped(ctx))
// stop the accessor
stopCtx, stopCancel := context.WithCancel(context.Background())
t.Cleanup(stopCancel)
err = ca.Stop(stopCtx)
t.Cleanup(func() {
_ = ca.Stop(ctx)
})

ns, err := share.NewBlobNamespaceV0([]byte("namespace"))
require.NoError(t, err)
blobbyTheBlob, err := blob.NewBlobV0(ns, []byte("data"))
require.NoError(t, err)
// ensure accessor is stopped
require.True(t, ca.IsStopped(ctx))
// ensure that stopping the accessor again does not return an error
err = ca.Stop(stopCtx)

minGas, err := ca.queryMinimumGasPrice(ctx)
require.NoError(t, err)
require.Equal(t, appconsts.DefaultMinGasPrice, minGas)

testcases := []struct {
name string
blobs []*blob.Blob
fee math.Int
gasLim uint64
expErr error
}{
{
name: "empty blobs",
blobs: []*blob.Blob{},
fee: sdktypes.ZeroInt(),
gasLim: 0,
expErr: errors.New("state: no blobs provided"),
},
{
name: "good blob with user provided gas and fees",
blobs: []*blob.Blob{blobbyTheBlob},
fee: sdktypes.NewInt(10_000), // roughly 0.12 utia per gas (should be good)
gasLim: blobtypes.DefaultEstimateGas([]uint32{uint32(len(blobbyTheBlob.Data))}),
expErr: nil,
},
// TODO: add more test cases. The problem right now is that the celestia-app doesn't
// correctly construct the node (doesn't pass the min gas price) hence the price on
// everything is zero and we can't actually test the correct behavior
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
resp, err := ca.SubmitPayForBlob(ctx, tc.fee, tc.gasLim, tc.blobs)
require.Equal(t, tc.expErr, err)
if err == nil {
require.EqualValues(t, 0, resp.Code)
}
})
}

}

func extractPort(addr string) string {
splitStr := strings.Split(addr, ":")
return splitStr[len(splitStr)-1]
}
Loading

0 comments on commit bb9b4d4

Please sign in to comment.