Skip to content

Commit

Permalink
[api] add cache for ReadState()
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Oct 5, 2021
1 parent 88d273d commit b59f347
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 16 deletions.
38 changes: 35 additions & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type Server struct {
grpcServer *grpc.Server
hasActionIndex bool
electionCommittee committee.Committee
readCache *ReadCache
}

// NewServer creates a new server
Expand Down Expand Up @@ -157,6 +158,7 @@ func NewServer(
chainListener: NewChainListener(),
gs: gasstation.NewGasStation(chain, sf.SimulateExecution, dao, cfg.API),
electionCommittee: apiCfg.electionCommittee,
readCache: NewReadCache(30 * time.Minute),
}
if _, ok := cfg.Plugins[config.GatewayPlugin]; ok {
svr.hasActionIndex = true
Expand All @@ -169,6 +171,9 @@ func NewServer(
grpc_prometheus.Register(svr.grpcServer)
reflection.Register(svr.grpcServer)

if err := svr.chainListener.AddResponder(svr.readCache); err != nil {
return nil, err
}
return svr, nil
}

Expand Down Expand Up @@ -456,6 +461,14 @@ func (api *Server) ReadContract(ctx context.Context, in *iotexapi.ReadContractRe
if err := sc.LoadProto(in.Execution); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if d, ok := api.readCache.Get(sc.Contract(), string(sc.Data())); ok {
res := iotexapi.ReadContractResponse{}
if err := proto.Unmarshal(d, &res); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &res, nil
}

if in.CallerAddress == action.EmptyAddress {
in.CallerAddress = address.ZeroAddress
}
Expand Down Expand Up @@ -487,10 +500,13 @@ func (api *Server) ReadContract(ctx context.Context, in *iotexapi.ReadContractRe
}
// ReadContract() is read-only, if no error returned, we consider it a success
receipt.Status = uint64(iotextypes.ReceiptStatus_Success)
return &iotexapi.ReadContractResponse{
res := iotexapi.ReadContractResponse{
Data: hex.EncodeToString(retval),
Receipt: receipt.ConvertToReceiptPb(),
}, nil
}
d, _ := proto.Marshal(&res)
api.readCache.Put(sc.Contract(), string(sc.Data()), d)
return &res, nil
}

// ReadState reads state on blockchain
Expand Down Expand Up @@ -969,6 +985,18 @@ func (api *Server) Stop() error {
}

func (api *Server) readState(ctx context.Context, p protocol.Protocol, height string, methodName []byte, arguments ...[]byte) ([]byte, uint64, error) {
key := height + string(methodName)
for _, v := range arguments {
key += string(v)
}
if d, ok := api.readCache.Get(p.Name(), key); ok {
var h uint64
if height != "" {
h, _ = strconv.ParseUint(height, 0, 64)
}
return d, h, nil
}

// TODO: need to complete the context
tipHeight := api.bc.TipHeight()
ctx = protocol.WithBlockCtx(ctx, protocol.BlockCtx{
Expand Down Expand Up @@ -999,7 +1027,11 @@ func (api *Server) readState(ctx context.Context, p protocol.Protocol, height st
}

// TODO: need to distinguish user error and system error
return p.ReadState(ctx, api.sf, methodName, arguments...)
d, h, err := p.ReadState(ctx, api.sf, methodName, arguments...)
if err == nil {
api.readCache.Put(p.Name(), key, d)
}
return d, h, err
}

func (api *Server) getActionsFromIndex(totalActions, start, count uint64) (*iotexapi.GetActionsResponse, error) {
Expand Down
18 changes: 5 additions & 13 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/iotexproject/iotex-core/blockindex"
"github.com/iotexproject/iotex-core/config"
"github.com/iotexproject/iotex-core/db"
"github.com/iotexproject/iotex-core/gasstation"
"github.com/iotexproject/iotex-core/pkg/unit"
"github.com/iotexproject/iotex-core/pkg/version"
"github.com/iotexproject/iotex-core/state"
Expand Down Expand Up @@ -2078,6 +2077,7 @@ func TestServer_GetEpochMeta(t *testing.T) {
}).AnyTimes()
svr.bc = mbc
}
svr.readCache.Clear()
res, err := svr.GetEpochMeta(context.Background(), &iotexapi.GetEpochMetaRequest{EpochNumber: test.EpochNumber})
require.NoError(err)
require.Equal(test.epochData.Num, res.EpochData.Num)
Expand Down Expand Up @@ -2689,19 +2689,11 @@ func createServer(cfg config.Config, needActPool bool) (*Server, string, error)
}
}

svr := &Server{
bc: bc,
sf: sf,
dao: dao,
indexer: indexer,
bfIndexer: bfIndexer,
ap: ap,
cfg: cfg,
gs: gasstation.NewGasStation(bc, sf.SimulateExecution, dao, cfg.API),
registry: registry,
hasActionIndex: true,
svr, err := NewServer(cfg, bc, nil, sf, dao, indexer, bfIndexer, ap, registry)
if err != nil {
return nil, "", err
}

svr.hasActionIndex = true
return svr, bfIndexFile, nil
}

Expand Down
106 changes: 106 additions & 0 deletions api/read_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package api

import (
"sync"
"time"

"go.uber.org/zap"

"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/pkg/log"
)

type (
// Bin stores []byte
Bin struct {
lastUpdate time.Time
bin map[string][]byte
}

// ReadCache stores bins
ReadCache struct {
total, hit int
lock sync.RWMutex
timeout time.Duration
bins map[string]*Bin
}
)

// NewBin returns a new read cache
func NewBin() *Bin {
return &Bin{
lastUpdate: time.Now(),
bin: make(map[string][]byte),
}
}

func (b *Bin) expire(timeout time.Duration) bool {
return b.lastUpdate.Add(timeout).Before(time.Now())
}

func (b *Bin) get(key string) ([]byte, bool) {
s, ok := b.bin[key]
return s, ok
}

func (b *Bin) put(key string, value []byte) {
b.bin[key] = value
b.lastUpdate = time.Now()
}

// NewReadCache returns a new read cache
func NewReadCache(timeout time.Duration) *ReadCache {
return &ReadCache{
timeout: timeout,
bins: make(map[string]*Bin),
}
}

// Get reads according to key
func (rc *ReadCache) Get(ns, key string) ([]byte, bool) {
rc.lock.RLock()
defer rc.lock.RUnlock()

rc.total++
bin, ok := rc.bins[ns]
if !ok {
return nil, false
}
d, ok := bin.get(key)
if ok {
rc.hit++
log.L().Info("API cache hit", zap.Int("total", rc.total), zap.Int("hit", rc.hit))
}
return d, ok
}

// Put writes according to key
func (rc *ReadCache) Put(ns, key string, value []byte) {
rc.lock.Lock()
defer rc.lock.Unlock()

bin, ok := rc.bins[ns]
if !ok {
bin = NewBin()
rc.bins[ns] = bin
}
bin.put(key, value)
}

// Clear clears the cache
func (rc *ReadCache) Clear() {
rc.bins = nil
rc.bins = make(map[string]*Bin)
}

// Respond implements the Responder interface
func (rc *ReadCache) Respond(*block.Block) error {
// for now, we just invalidate the entire cache
rc.Clear()
return nil
}

// Exit implements the Responder interface
func (rc *ReadCache) Exit() {
rc.Clear()
}
43 changes: 43 additions & 0 deletions api/read_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package api

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestReadCache(t *testing.T) {
r := require.New(t)

c := NewReadCache(200 * time.Millisecond)
rcTests := []struct {
ns, k string
v []byte
}{
{"a", "1", []byte{1}},
{"a", "2", []byte{2}},
{"b", "1", []byte{1}},
{"b", "2", []byte{2}},
}
for _, v := range rcTests {
d, ok := c.Get(v.ns, v.k)
r.False(ok)
r.Nil(d)
c.Put(v.ns, v.k, v.v)
}

for _, v := range rcTests {
d, ok := c.Get(v.ns, v.k)
r.True(ok)
r.Equal(v.v, d)
}

c.Clear()
for _, v := range rcTests {
d, ok := c.Get(v.ns, v.k)
r.False(ok)
r.Nil(d)
c.Put(v.ns, v.k, v.v)
}
}

0 comments on commit b59f347

Please sign in to comment.