Skip to content

Commit

Permalink
feat(service/header): implement write batching in the Store
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Feb 16, 2022
1 parent 02a7b4c commit 5c63c52
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 31 deletions.
58 changes: 39 additions & 19 deletions service/header/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ var (
DefaultStoreCacheSize = 10240
// DefaultIndexCacheSize defines the amount of max entries allowed in the Height to Hash index cache.
DefaultIndexCacheSize = 2048
// DefaultWriteBatchSize defines the size of the batched header write.
// Headers are written in batches not to thrash the underlying Datastore with writes.
DefaultWriteBatchSize = 2048
)

// errStoppedStore is returned on operations over stopped store
Expand Down Expand Up @@ -45,6 +48,8 @@ type store struct {
writes chan []*ExtendedHeader
// signals writes being done
writesDn chan struct{}
// pending keeps headers pending to be written in a batch
pending []*ExtendedHeader
}

// NewStore constructs a Store over datastore.
Expand Down Expand Up @@ -81,14 +86,15 @@ func newStore(ds datastore.Batching) (*store, error) {
cache: cache,
heightIndex: index,
heightSub: newHeightSub(),
writes: make(chan []*ExtendedHeader, 8),
writes: make(chan []*ExtendedHeader, 16),
writesDn: make(chan struct{}),
pending: make([]*ExtendedHeader, 0, DefaultWriteBatchSize),
}, nil
}

func (s *store) Init(_ context.Context, initial *ExtendedHeader) error {
// trust the given header as the initial head
err := s.write(initial)
err := s.flush(initial)
if err != nil {
return err
}
Expand All @@ -108,7 +114,7 @@ func (s *store) Start(ctx context.Context) error {
s.heightSub.SetHeight(uint64(head.Height))
}

go s.writeLoop()
go s.flushLoop()
return nil
}

Expand Down Expand Up @@ -261,35 +267,49 @@ func (s *store) Append(ctx context.Context, headers ...*ExtendedHeader) error {
}
}

// writeLoop performs writing task to the underlying datastore in a separate routine
// flushLoop performs writing task to the underlying datastore in a separate routine
// This way writes are controlled and manageable from one place allowing
// (1) Appends not to be blocked on long disk IO writes and underlying DB compactions
// (2) Batching header writes
// TODO(@Wondertan): Batch multiple writes together, but first the proper batch size should be investigated
func (s *store) writeLoop() {
func (s *store) flushLoop() {
defer close(s.writesDn)
for headers := range s.writes {
if headers == nil {
return
if headers != nil {
s.pending = append(s.pending, headers...)
}
// ensure we flush only if pending is grown enough, or we are stopping(headers == nil)
if len(s.pending) < DefaultWriteBatchSize && headers != nil {
continue
}

err := s.write(headers...)
err := s.flush(s.pending...)
if err != nil {
// TODO(@Wondertan): Should this be a fatal error case with os.Exit?
from, to := uint64(headers[0].Height), uint64(headers[len(headers)-1].Height)
log.Errorw("writing header batch", "from", from, "to", to)
}
// reset pending
s.pending = s.pending[:0]

if headers == nil {
// a signal to stop
return
}
}
}

// write stores the given headers on disk
func (s *store) write(headers ...*ExtendedHeader) error {
// collect all the headers in a batch to be written
// flush writes the given headers on disk
func (s *store) flush(headers ...*ExtendedHeader) (err error) {
if len(headers) == 0 {
return nil
}

batch, err := s.ds.Batch()
if err != nil {
return err
}

// collect all the headers in the batch to be written
for _, h := range headers {
b, err := h.MarshalBinary()
if err != nil {
Expand All @@ -302,25 +322,25 @@ func (s *store) write(headers ...*ExtendedHeader) error {
}
}

// write the batch on disk
err = batch.Commit()
// marshal and add to batch reference to the new head
b, err := headers[len(headers)-1].Hash().MarshalJSON()
if err != nil {
return err
}

// write height indexes for headers as well
err = s.heightIndex.Index(headers...)
err = batch.Put(headKey, b)
if err != nil {
return err
}

// marshal and store reference to the new head
b, err := headers[len(headers)-1].Hash().MarshalJSON()
// write height indexes for headers as well
err = s.heightIndex.IndexTo(batch, headers...)
if err != nil {
return err
}

return s.ds.Put(headKey, b)
// finally, commit the batch on disk
return batch.Commit()
}

// readHead loads the head from the disk.
Expand Down
11 changes: 3 additions & 8 deletions service/header/store_height_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,14 @@ func (hi *heightIndexer) HashByHeight(h uint64) (tmbytes.HexBytes, error) {
return hi.ds.Get(heightKey(h))
}

// Index saves mapping between header Height and Hash.
func (hi *heightIndexer) Index(headers ...*ExtendedHeader) error {
batch, err := hi.ds.Batch()
if err != nil {
return err
}

// IndexTo saves mapping between header Height and Hash to the given batch.
func (hi *heightIndexer) IndexTo(batch datastore.Batch, headers ...*ExtendedHeader) error {
for _, h := range headers {
err := batch.Put(heightKey(uint64(h.Height)), h.Hash())
if err != nil {
return err
}
}

return batch.Commit()
return nil
}
35 changes: 31 additions & 4 deletions service/header/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,26 @@ import (
"testing"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

tmrand "github.com/tendermint/tendermint/libs/rand"
)

func TestStore(t *testing.T) {
// Alter Cache sizes to read some values from datastore instead of only cache.
// DefaultStoreCacheSize, DefaultStoreCacheSize = 5, 5

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
t.Cleanup(cancel)

suite := NewTestSuite(t, 3)
store := NewTestStore(ctx, t, suite.Head())

ds := sync.MutexWrap(datastore.NewMapDatastore())
store, err := NewStoreWithHead(ds, suite.Head())
require.NoError(t, err)

err = store.Start(ctx)
require.NoError(t, err)

head, err := store.Head(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -55,4 +60,26 @@ func TestStore(t *testing.T) {
h, err := store.GetByHeight(ctx, 12)
require.NoError(t, err)
assert.NotNil(t, h)

err = store.Stop(ctx)
require.NoError(t, err)

// check that the store can be successfully started after previous stop
// with all data being flushed.
store, err = NewStore(ds)
require.NoError(t, err)

err = store.Start(ctx)
require.NoError(t, err)

head, err = store.Head(ctx)
require.NoError(t, err)
assert.Equal(t, suite.Head().Hash(), head.Hash())

out, err = store.GetRangeByHeight(ctx, 1, 13)
require.NoError(t, err)
assert.Len(t, out, 12)

err = store.Stop(ctx)
require.NoError(t, err)
}

0 comments on commit 5c63c52

Please sign in to comment.