Skip to content

Commit

Permalink
feat(BEDS-536): add cache to raw store (#966)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tangui-Bitfly committed Oct 17, 2024
1 parent 425c073 commit 1fdf888
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 86 deletions.
68 changes: 68 additions & 0 deletions backend/pkg/commons/db2/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package db2

import (
"encoding/json"
"fmt"
"sync"
"time"
)

var ttl = 200 * time.Millisecond

type MinimalBlock struct {
Result struct {
Hash string `json:"hash"`
} `json:"result"`
}

type CachedRawStore struct {
db RawStoreReader
// sync.Map with manual delete have better perf than freecache because we can handle this way a ttl < 1s
cache sync.Map
}

func WithCache(reader RawStoreReader) *CachedRawStore {
return &CachedRawStore{
db: reader,
}
}

func (c *CachedRawStore) ReadBlockByNumber(chainID uint64, number int64) (*FullBlockRawData, error) {
key := blockKey(chainID, number)
v, ok := c.cache.Load(key)
if ok {
return v.(*FullBlockRawData), nil
}

block, err := c.db.ReadBlockByNumber(chainID, number)
if block != nil {
c.cache.Store(key, block)

// retrieve the block hash for caching purpose
var mini MinimalBlock
if err := json.Unmarshal(block.Block, &mini); err != nil {
return nil, fmt.Errorf("cannot unmarshal block: %w", err)
}
c.cache.Store(mini.Result.Hash, number)
go func() {
time.Sleep(ttl)
c.cache.Delete(key)
c.cache.Delete(mini.Result.Hash)
}()
}
return block, err
}

func (c *CachedRawStore) ReadBlockByHash(chainID uint64, hash string) (*FullBlockRawData, error) {
v, ok := c.cache.Load(hash)
if !ok {
return c.db.ReadBlockByHash(chainID, hash)
}

v, ok = c.cache.Load(blockKey(chainID, v.(int64)))
if !ok {
return c.db.ReadBlockByHash(chainID, hash)
}

return v.(*FullBlockRawData), nil
}
71 changes: 22 additions & 49 deletions backend/pkg/commons/db2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,18 @@ import (
"io"
"math/big"
"net/http"
"sync"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common/hexutil"

"github.com/gobitfly/beaconchain/pkg/commons/db2/store"
)

var ttl = 2 * time.Second

var ErrNotFoundInCache = fmt.Errorf("cannot find hash in cache")
var ErrMethodNotSupported = fmt.Errorf("methode not supported")

type EthClient interface {
ethereum.ChainReader
ethereum.ContractCaller
bind.ContractBackend
ethereum.ChainStateReader
type RawStoreReader interface {
ReadBlockByNumber(chainID uint64, number int64) (*FullBlockRawData, error)
ReadBlockByHash(chainID uint64, hash string) (*FullBlockRawData, error)
}

type WithFallback struct {
Expand Down Expand Up @@ -62,22 +54,14 @@ func (r WithFallback) RoundTrip(request *http.Request) (*http.Response, error) {
}

type BigTableEthRaw struct {
db RawStore

db RawStoreReader
chainID uint64

// cache to store link between block hash and number
// ethclient.Client.BlockByNumber retrieves the uncles by hash
// so we need a way to access it simply
// we also could use postgres db
hashToNumber sync.Map
}

func NewBigTableEthRaw(db RawStore, chainID uint64) *BigTableEthRaw {
func NewBigTableEthRaw(db RawStoreReader, chainID uint64) *BigTableEthRaw {
return &BigTableEthRaw{
db: db,
chainID: chainID,
hashToNumber: sync.Map{},
db: db,
chainID: chainID,
}
}

Expand Down Expand Up @@ -158,16 +142,11 @@ func (r *BigTableEthRaw) handle(ctx context.Context, message *jsonrpcMessage) (*
}

case "eth_getUncleByBlockHashAndIndex":
number, exist := r.hashToNumber.Load(args[0].(string))
if !exist {
return nil, ErrNotFoundInCache
}

index, err := hexutil.DecodeBig(args[1].(string))
if err != nil {
return nil, err
}
respBody, err = r.UncleByBlockNumberAndIndex(ctx, number.(*big.Int), index.Int64())
respBody, err = r.UncleByBlockHashAndIndex(ctx, args[0].(string), index.Int64())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -198,48 +177,42 @@ func makeBody(isSingle bool, messages []*jsonrpcMessage) (io.ReadCloser, error)
return io.NopCloser(bytes.NewReader(b)), nil
}

type MinimalBlock struct {
Result struct {
Hash string `json:"hash"`
} `json:"result"`
}

func (r *BigTableEthRaw) BlockByNumber(ctx context.Context, number *big.Int) ([]byte, error) {
block, err := r.db.ReadBlock(r.chainID, number.Int64())
block, err := r.db.ReadBlockByNumber(r.chainID, number.Int64())
if err != nil {
return nil, err
}
// retrieve the block hash for caching purpose
var mini MinimalBlock
if err := json.Unmarshal(block.Block, &mini); err != nil {
return nil, err
}
r.hashToNumber.Store(mini.Result.Hash, number)
go func(hash string) {
time.Sleep(ttl)
r.hashToNumber.Delete(hash)
}(mini.Result.Hash)
return block.Block, nil
}

func (r *BigTableEthRaw) BlockReceipts(ctx context.Context, number *big.Int) ([]byte, error) {
block, err := r.db.ReadBlock(r.chainID, number.Int64())
block, err := r.db.ReadBlockByNumber(r.chainID, number.Int64())
if err != nil {
return nil, err
}
return block.Receipts, nil
}

func (r *BigTableEthRaw) TraceBlockByNumber(ctx context.Context, number *big.Int) ([]byte, error) {
block, err := r.db.ReadBlock(r.chainID, number.Int64())
block, err := r.db.ReadBlockByNumber(r.chainID, number.Int64())
if err != nil {
return nil, err
}
return block.Traces, nil
}

func (r *BigTableEthRaw) UncleByBlockNumberAndIndex(ctx context.Context, number *big.Int, index int64) ([]byte, error) {
block, err := r.db.ReadBlock(r.chainID, number.Int64())
block, err := r.db.ReadBlockByNumber(r.chainID, number.Int64())
if err != nil {
return nil, err
}
var uncles []*jsonrpcMessage
_ = json.Unmarshal(block.Uncles, &uncles)
return json.Marshal(uncles[index])
}

func (r *BigTableEthRaw) UncleByBlockHashAndIndex(ctx context.Context, hash string, index int64) ([]byte, error) {
block, err := r.db.ReadBlockByHash(r.chainID, hash)
if err != nil {
return nil, err
}
Expand Down
48 changes: 20 additions & 28 deletions backend/pkg/commons/db2/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@ package db2

import (
"context"
"math/big"
"net/http"
"os"
"testing"

"github.com/ethereum/go-ethereum/common"

Check failure on line 5 in backend/pkg/commons/db2/client_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"math/big"
"net/http"
"os"
"testing"

"github.com/gobitfly/beaconchain/pkg/commons/db2/store"
"github.com/gobitfly/beaconchain/pkg/commons/db2/storetest"
)

const (
blockTestNumber int64 = 6008149
chainID uint64 = 1
chainID uint64 = 1
)

func TestBigTableClientRealCondition(t *testing.T) {
Expand Down Expand Up @@ -87,31 +85,21 @@ func TestBigTableClientRealCondition(t *testing.T) {

func benchmarkBlockRetrieval(b *testing.B, ethClient *ethclient.Client, rpcClient *rpc.Client) {
b.ResetTimer()

for j := 0; j < b.N; j++ {
block, err := ethClient.BlockByNumber(context.Background(), big.NewInt(blockTestNumber))
blockTestNumber := int64(20978000 + b.N)
_, err := ethClient.BlockByNumber(context.Background(), big.NewInt(blockTestNumber))
if err != nil {
b.Fatalf("BlockByNumber() error = %v", err)
}
if got, want := block.Number().Int64(), blockTestNumber; got != want {
b.Errorf("got %v, want %v", got, want)
}

receipts, err := ethClient.BlockReceipts(context.Background(), rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(blockTestNumber)))
if err != nil {
if _, err := ethClient.BlockReceipts(context.Background(), rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(blockTestNumber))); err != nil {
b.Fatalf("BlockReceipts() error = %v", err)
}
if len(block.Transactions()) != 0 && len(receipts) == 0 {
b.Errorf("receipts should not be empty")
}

var traces []GethTraceCallResultWrapper
if err := rpcClient.Call(&traces, "debug_traceBlockByNumber", hexutil.EncodeBig(block.Number()), gethTracerArg); err != nil {
if err := rpcClient.Call(&traces, "debug_traceBlockByNumber", hexutil.EncodeBig(big.NewInt(blockTestNumber)), gethTracerArg); err != nil {
b.Fatalf("debug_traceBlockByNumber() error = %v", err)
}
if len(block.Transactions()) != 0 && len(traces) == 0 {
b.Errorf("traces should not be empty")
}
}
}

Expand All @@ -126,9 +114,7 @@ func BenchmarkErigonNode(b *testing.B) {
b.Fatal(err)
}

ethClient := ethclient.NewClient(rpcClient)

benchmarkBlockRetrieval(b, ethClient, rpcClient)
benchmarkBlockRetrieval(b, ethclient.NewClient(rpcClient), rpcClient)
}

func BenchmarkRawBigTable(b *testing.B) {
Expand All @@ -143,18 +129,24 @@ func BenchmarkRawBigTable(b *testing.B) {
b.Fatal(err)
}

rawStore := NewRawStore(store.Wrap(bt, BlocRawTable, ""))
rawStore := WithCache(NewRawStore(store.Wrap(bt, BlocRawTable, "")))
rpcClient, err := rpc.DialOptions(context.Background(), "http://foo.bar", rpc.WithHTTPClient(&http.Client{
Transport: NewBigTableEthRaw(rawStore, chainID),
}))
if err != nil {
b.Fatal(err)
}

ethClient := ethclient.NewClient(rpcClient)

benchmarkBlockRetrieval(b, ethClient, rpcClient)
benchmarkBlockRetrieval(b, ethclient.NewClient(rpcClient), rpcClient)
}

func BenchmarkAll(b *testing.B) {
b.Run("BenchmarkErigonNode", func(b *testing.B) {
BenchmarkErigonNode(b)
})
b.Run("BenchmarkRawBigTable", func(b *testing.B) {
BenchmarkRawBigTable(b)
})
}

func TestBigTableClient(t *testing.T) {
Expand Down
27 changes: 19 additions & 8 deletions backend/pkg/commons/db2/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package db2

import (
"fmt"

Check failure on line 4 in backend/pkg/commons/db2/raw.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)

"github.com/gobitfly/beaconchain/pkg/commons/db2/store"
"github.com/gobitfly/beaconchain/pkg/commons/hexutil"
"github.com/gobitfly/beaconchain/pkg/commons/log"
Expand Down Expand Up @@ -81,7 +80,16 @@ func (db RawStore) AddBlocks(blocks []FullBlockRawData) error {
return db.store.BulkAdd(itemsByKey)
}

func (db RawStore) ReadBlock(chainID uint64, number int64) (*FullBlockRawData, error) {
func (db RawStore) ReadBlockByNumber(chainID uint64, number int64) (*FullBlockRawData, error) {
return db.readBlock(chainID, number)
}

func (db RawStore) ReadBlockByHash(chainID uint64, hash string) (*FullBlockRawData, error) {
// todo use sql db to retrieve hash
return nil, fmt.Errorf("ReadBlockByHash not implemented")
}

func (db RawStore) readBlock(chainID uint64, number int64) (*FullBlockRawData, error) {
key := blockKey(chainID, number)
data, err := db.store.GetRow(key)
if err != nil {
Expand All @@ -104,12 +112,15 @@ func (db RawStore) ReadBlock(chainID uint64, number int64) (*FullBlockRawData, e
return nil, fmt.Errorf("cannot decompress block %d: %w", number, err)
}
return &FullBlockRawData{
ChainID: chainID,
BlockNumber: number,
Block: block,
Receipts: receipts,
Traces: traces,
Uncles: uncles,
ChainID: chainID,
BlockNumber: number,
BlockHash: nil,
BlockUnclesCount: 0,
BlockTxs: nil,
Block: block,
Receipts: receipts,
Traces: traces,
Uncles: uncles,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/commons/db2/raw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestRaw(t *testing.T) {
t.Fatal(err)
}

res, err := db.ReadBlock(block.ChainID, block.BlockNumber)
res, err := db.ReadBlockByNumber(block.ChainID, block.BlockNumber)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 1fdf888

Please sign in to comment.