diff --git a/blockchain/blockdao/blob_store_test.go b/blockchain/blockdao/blob_store_test.go index b1731b111d..594dc13931 100644 --- a/blockchain/blockdao/blob_store_test.go +++ b/blockchain/blockdao/blob_store_test.go @@ -122,7 +122,7 @@ func TestBlobStore(t *testing.T) { for i := 0; i < cfg.BlockStoreBatchSize+7; i++ { blk, err := dao.GetBlockByHeight(1 + uint64(i)) r.NoError(err) - if i < cfg.BlockStoreBatchSize { + if i < 7 { // blocks written to disk has sidecar removed r.False(blk.HasBlob()) r.Equal(4, len(blk.Actions)) diff --git a/blockchain/filedao/filedao_v2_test.go b/blockchain/filedao/filedao_v2_test.go index 9b15b58717..2fde8658a7 100644 --- a/blockchain/filedao/filedao_v2_test.go +++ b/blockchain/filedao/filedao_v2_test.go @@ -372,7 +372,7 @@ func TestBlockWithSidecar(t *testing.T) { for i := 0; i < cfg.BlockStoreBatchSize+2; i++ { blk, err := fd.GetBlockByHeight(start + uint64(i)) r.NoError(err) - if i < cfg.BlockStoreBatchSize { + if i < 2 { // blocks written to disk has sidecar removed r.False(blk.HasBlob()) r.Equal(4, len(blk.Actions)) diff --git a/blockchain/filedao/filedao_v2_util.go b/blockchain/filedao/filedao_v2_util.go index 0c2d990a81..5785de452b 100644 --- a/blockchain/filedao/filedao_v2_util.go +++ b/blockchain/filedao/filedao_v2_util.go @@ -19,7 +19,7 @@ import ( ) func (fd *fileDAOv2) populateStagingBuffer() (*stagingBuffer, error) { - buffer := newStagingBuffer(fd.header.BlockStoreSize, fd.deser) + buffer := newStagingBuffer(fd.header.BlockStoreSize, fd.header.Start) blockStoreTip := fd.highestBlockOfStoreTip() for i := uint64(0); i < fd.header.BlockStoreSize; i++ { v, err := fd.kvStore.Get(_headerDataNs, byteutil.Uint64ToBytesBigEndian(i)) @@ -42,7 +42,7 @@ func (fd *fileDAOv2) populateStagingBuffer() (*stagingBuffer, error) { // populate to staging buffer, if the block is in latest round height := info.Block.Height() if height > blockStoreTip { - if _, err = buffer.Put(stagingKey(height, fd.header), info); err != nil { + if _, err = buffer.Put(height, info); err != nil { return nil, err } } else { @@ -87,12 +87,12 @@ func (fd *fileDAOv2) putBlock(blk *block.Block) error { } // add to staging buffer - index := stagingKey(blk.Height(), fd.header) - full, err := fd.blkBuffer.Put(index, blkInfo) + full, err := fd.blkBuffer.Put(blk.Height(), blkInfo) if err != nil { return err } if !full { + index := fd.blkBuffer.slot(blk.Height()) fd.batch.Put(_headerDataNs, byteutil.Uint64ToBytesBigEndian(index), blkBytes, "failed to put block") return nil } @@ -151,11 +151,6 @@ func blockStoreKey(height uint64, header *FileHeader) uint64 { return (height - header.Start) / header.BlockStoreSize } -// stagingKey is the position of block in the staging buffer -func stagingKey(height uint64, header *FileHeader) uint64 { - return (height - header.Start) % header.BlockStoreSize -} - // lowestBlockOfStoreTip is the lowest height of the tip of block storage // used in DeleteTipBlock(), once new tip height drops below this, the tip of block storage can be deleted func (fd *fileDAOv2) lowestBlockOfStoreTip() uint64 { @@ -178,12 +173,7 @@ func (fd *fileDAOv2) getBlock(height uint64) (*block.Block, error) { return nil, db.ErrNotExist } // check whether block in staging buffer or not - storeKey := blockStoreKey(height, fd.header) - if storeKey >= fd.blkStore.Size() { - blkStore, err := fd.blkBuffer.Get(stagingKey(height, fd.header)) - if err != nil { - return nil, err - } + if blkStore := fd.getFromStagingBuffer(height); blkStore != nil { return blkStore.Block, nil } // read from storage DB @@ -199,12 +189,7 @@ func (fd *fileDAOv2) getReceipt(height uint64) ([]*action.Receipt, error) { return nil, db.ErrNotExist } // check whether block in staging buffer or not - storeKey := blockStoreKey(height, fd.header) - if storeKey >= fd.blkStore.Size() { - blkStore, err := fd.blkBuffer.Get(stagingKey(height, fd.header)) - if err != nil { - return nil, err - } + if blkStore := fd.getFromStagingBuffer(height); blkStore != nil { return blkStore.Receipts, nil } // read from storage DB @@ -215,12 +200,23 @@ func (fd *fileDAOv2) getReceipt(height uint64) ([]*action.Receipt, error) { return fd.deser.ReceiptsFromBlockStoreProto(blockStore) } +func (fd *fileDAOv2) getFromStagingBuffer(height uint64) *block.Store { + if fd.loadTip().Height-height >= fd.header.BlockStoreSize { + return nil + } + blkStore := fd.blkBuffer.Get(height) + if blkStore == nil || blkStore.Block.Height() != height { + return nil + } + return blkStore +} + func (fd *fileDAOv2) getBlockStore(height uint64) (*iotextypes.BlockStore, error) { // check whether blockStore in read cache or not storeKey := blockStoreKey(height, fd.header) if value, ok := fd.blkStorePbCache.Get(storeKey); ok { pbInfos := value.(*iotextypes.BlockStores) - return pbInfos.BlockStores[stagingKey(height, fd.header)], nil + return pbInfos.BlockStores[fd.blkBuffer.slot(height)], nil } // read from storage DB value, err := fd.blkStore.Get(storeKey) @@ -240,5 +236,5 @@ func (fd *fileDAOv2) getBlockStore(height uint64) (*iotextypes.BlockStore, error } // add to read cache fd.blkStorePbCache.Add(storeKey, pbStores) - return pbStores.BlockStores[stagingKey(height, fd.header)], nil + return pbStores.BlockStores[fd.blkBuffer.slot(height)], nil } diff --git a/blockchain/filedao/staging_buffer.go b/blockchain/filedao/staging_buffer.go index a478480ddd..e8e56a8a7c 100644 --- a/blockchain/filedao/staging_buffer.go +++ b/blockchain/filedao/staging_buffer.go @@ -6,6 +6,8 @@ package filedao import ( + "sync" + "google.golang.org/protobuf/proto" "github.com/iotexproject/iotex-proto/golang/iotextypes" @@ -15,41 +17,53 @@ import ( type ( stagingBuffer struct { + lock sync.RWMutex size uint64 + start uint64 buffer []*block.Store - deser *block.Deserializer } ) -func newStagingBuffer(size uint64, deser *block.Deserializer) *stagingBuffer { +func newStagingBuffer(size, start uint64) *stagingBuffer { return &stagingBuffer{ size: size, + start: start, buffer: make([]*block.Store, size), - deser: deser, } } -func (s *stagingBuffer) Get(pos uint64) (*block.Store, error) { - if pos >= s.size { - return nil, ErrNotSupported +func (s *stagingBuffer) Get(height uint64) *block.Store { + if height < s.start { + return nil } - return s.buffer[pos], nil + s.lock.RLock() + defer s.lock.RUnlock() + return s.buffer[s.slot(height)] } -func (s *stagingBuffer) Put(pos uint64, blk *block.Store) (bool, error) { - if pos >= s.size { +func (s *stagingBuffer) Put(height uint64, blk *block.Store) (bool, error) { + if height < s.start { return false, ErrNotSupported } + pos := s.slot(height) + s.lock.Lock() + defer s.lock.Unlock() s.buffer[pos] = blk return pos == s.size-1, nil } +func (s *stagingBuffer) slot(height uint64) uint64 { + return (height - s.start) % s.size +} + func (s *stagingBuffer) Serialize() ([]byte, error) { blkStores := []*iotextypes.BlockStore{} // blob sidecar data are stored separately + s.lock.RLock() for _, v := range s.buffer { blkStores = append(blkStores, v.ToProtoWithoutSidecar()) } + s.lock.RUnlock() allBlks := &iotextypes.BlockStores{ BlockStores: blkStores, }