diff --git a/api/api.go b/api/api.go index 6cc80d69c3..69c887d030 100644 --- a/api/api.go +++ b/api/api.go @@ -112,6 +112,7 @@ type Server struct { grpcServer *grpc.Server hasActionIndex bool electionCommittee committee.Committee + readCache *ReadCache } // NewServer creates a new server @@ -157,6 +158,7 @@ func NewServer( chainListener: NewChainListener(), gs: gasstation.NewGasStation(chain, sf.SimulateExecution, dao, cfg.API), electionCommittee: apiCfg.electionCommittee, + readCache: NewReadCache(30 * time.Minute), } if _, ok := cfg.Plugins[config.GatewayPlugin]; ok { svr.hasActionIndex = true @@ -169,6 +171,9 @@ func NewServer( grpc_prometheus.Register(svr.grpcServer) reflection.Register(svr.grpcServer) + if err := svr.chainListener.AddResponder(svr.readCache); err != nil { + return nil, err + } return svr, nil } @@ -456,6 +461,14 @@ func (api *Server) ReadContract(ctx context.Context, in *iotexapi.ReadContractRe if err := sc.LoadProto(in.Execution); err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } + if d, ok := api.readCache.Get(sc.Contract(), string(sc.Data())); ok { + res := iotexapi.ReadContractResponse{} + if err := proto.Unmarshal(d, &res); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &res, nil + } + if in.CallerAddress == action.EmptyAddress { in.CallerAddress = address.ZeroAddress } @@ -487,10 +500,13 @@ func (api *Server) ReadContract(ctx context.Context, in *iotexapi.ReadContractRe } // ReadContract() is read-only, if no error returned, we consider it a success receipt.Status = uint64(iotextypes.ReceiptStatus_Success) - return &iotexapi.ReadContractResponse{ + res := iotexapi.ReadContractResponse{ Data: hex.EncodeToString(retval), Receipt: receipt.ConvertToReceiptPb(), - }, nil + } + d, _ := proto.Marshal(&res) + api.readCache.Put(sc.Contract(), string(sc.Data()), d) + return &res, nil } // ReadState reads state on blockchain @@ -969,6 +985,18 @@ func (api *Server) Stop() error { } func (api *Server) readState(ctx context.Context, p protocol.Protocol, height string, methodName []byte, arguments ...[]byte) ([]byte, uint64, error) { + key := height + string(methodName) + for _, v := range arguments { + key += string(v) + } + if d, ok := api.readCache.Get(p.Name(), key); ok { + var h uint64 + if height != "" { + h, _ = strconv.ParseUint(height, 0, 64) + } + return d, h, nil + } + // TODO: need to complete the context tipHeight := api.bc.TipHeight() ctx = protocol.WithBlockCtx(ctx, protocol.BlockCtx{ @@ -999,7 +1027,11 @@ func (api *Server) readState(ctx context.Context, p protocol.Protocol, height st } // TODO: need to distinguish user error and system error - return p.ReadState(ctx, api.sf, methodName, arguments...) + d, h, err := p.ReadState(ctx, api.sf, methodName, arguments...) + if err == nil { + api.readCache.Put(p.Name(), key, d) + } + return d, h, err } func (api *Server) getActionsFromIndex(totalActions, start, count uint64) (*iotexapi.GetActionsResponse, error) { diff --git a/api/api_test.go b/api/api_test.go index dd02329cb1..5eaf8f34ca 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -47,7 +47,6 @@ import ( "github.com/iotexproject/iotex-core/blockindex" "github.com/iotexproject/iotex-core/config" "github.com/iotexproject/iotex-core/db" - "github.com/iotexproject/iotex-core/gasstation" "github.com/iotexproject/iotex-core/pkg/unit" "github.com/iotexproject/iotex-core/pkg/version" "github.com/iotexproject/iotex-core/state" @@ -2078,6 +2077,7 @@ func TestServer_GetEpochMeta(t *testing.T) { }).AnyTimes() svr.bc = mbc } + svr.readCache.Clear() res, err := svr.GetEpochMeta(context.Background(), &iotexapi.GetEpochMetaRequest{EpochNumber: test.EpochNumber}) require.NoError(err) require.Equal(test.epochData.Num, res.EpochData.Num) @@ -2689,19 +2689,11 @@ func createServer(cfg config.Config, needActPool bool) (*Server, string, error) } } - svr := &Server{ - bc: bc, - sf: sf, - dao: dao, - indexer: indexer, - bfIndexer: bfIndexer, - ap: ap, - cfg: cfg, - gs: gasstation.NewGasStation(bc, sf.SimulateExecution, dao, cfg.API), - registry: registry, - hasActionIndex: true, + svr, err := NewServer(cfg, bc, nil, sf, dao, indexer, bfIndexer, ap, registry) + if err != nil { + return nil, "", err } - + svr.hasActionIndex = true return svr, bfIndexFile, nil } diff --git a/api/read_cache.go b/api/read_cache.go new file mode 100644 index 0000000000..3242180bb4 --- /dev/null +++ b/api/read_cache.go @@ -0,0 +1,106 @@ +package api + +import ( + "sync" + "time" + + "go.uber.org/zap" + + "github.com/iotexproject/iotex-core/blockchain/block" + "github.com/iotexproject/iotex-core/pkg/log" +) + +type ( + // Bin stores []byte + Bin struct { + lastUpdate time.Time + bin map[string][]byte + } + + // ReadCache stores bins + ReadCache struct { + total, hit int + lock sync.RWMutex + timeout time.Duration + bins map[string]*Bin + } +) + +// NewBin returns a new read cache +func NewBin() *Bin { + return &Bin{ + lastUpdate: time.Now(), + bin: make(map[string][]byte), + } +} + +func (b *Bin) expire(timeout time.Duration) bool { + return b.lastUpdate.Add(timeout).Before(time.Now()) +} + +func (b *Bin) get(key string) ([]byte, bool) { + s, ok := b.bin[key] + return s, ok +} + +func (b *Bin) put(key string, value []byte) { + b.bin[key] = value + b.lastUpdate = time.Now() +} + +// NewReadCache returns a new read cache +func NewReadCache(timeout time.Duration) *ReadCache { + return &ReadCache{ + timeout: timeout, + bins: make(map[string]*Bin), + } +} + +// Get reads according to key +func (rc *ReadCache) Get(ns, key string) ([]byte, bool) { + rc.lock.RLock() + defer rc.lock.RUnlock() + + rc.total++ + bin, ok := rc.bins[ns] + if !ok { + return nil, false + } + d, ok := bin.get(key) + if ok { + rc.hit++ + log.L().Info("API cache hit", zap.Int("total", rc.total), zap.Int("hit", rc.hit)) + } + return d, ok +} + +// Put writes according to key +func (rc *ReadCache) Put(ns, key string, value []byte) { + rc.lock.Lock() + defer rc.lock.Unlock() + + bin, ok := rc.bins[ns] + if !ok { + bin = NewBin() + rc.bins[ns] = bin + } + bin.put(key, value) +} + +// Clear clears the cache +func (rc *ReadCache) Clear() { + rc.bins = nil + rc.bins = make(map[string]*Bin) +} + +// Respond implements the Responder interface +func (rc *ReadCache) Respond(*block.Block) error { + // for now, we just invalidate the entire cache + rc.Clear() + return nil +} + +// Exit implements the Responder interface +func (rc *ReadCache) Exit() { + rc.Clear() +} diff --git a/api/read_cache_test.go b/api/read_cache_test.go new file mode 100644 index 0000000000..00442d53d9 --- /dev/null +++ b/api/read_cache_test.go @@ -0,0 +1,43 @@ +package api + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestReadCache(t *testing.T) { + r := require.New(t) + + c := NewReadCache(200 * time.Millisecond) + rcTests := []struct { + ns, k string + v []byte + }{ + {"a", "1", []byte{1}}, + {"a", "2", []byte{2}}, + {"b", "1", []byte{1}}, + {"b", "2", []byte{2}}, + } + for _, v := range rcTests { + d, ok := c.Get(v.ns, v.k) + r.False(ok) + r.Nil(d) + c.Put(v.ns, v.k, v.v) + } + + for _, v := range rcTests { + d, ok := c.Get(v.ns, v.k) + r.True(ok) + r.Equal(v.v, d) + } + + c.Clear() + for _, v := range rcTests { + d, ok := c.Get(v.ns, v.k) + r.False(ok) + r.Nil(d) + c.Put(v.ns, v.k, v.v) + } +}