Skip to content

Commit

Permalink
renames, tweaks, fix hub sourceFromNum passed lib
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Jul 14, 2022
1 parent 4fc4125 commit e141dd6
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 162 deletions.
22 changes: 11 additions & 11 deletions blocktypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,28 @@ type incomingOneBlockFiles struct {
}

type incomingBlocksFile struct {
filename string // Base filename (%100 of block_num)
indexedBlocks map[uint64]bool
blocks chan *PreprocessedBlock
filename string // Base filename (%100 of block_num)
filteredBlocks map[uint64]bool
blocks chan *PreprocessedBlock
}

func (i *incomingBlocksFile) ShouldProcessBlock(blockNum uint64) bool {
if i.indexedBlocks == nil {
func (i *incomingBlocksFile) PassesFilter(blockNum uint64) bool {
if i.filteredBlocks == nil {
return true
}
_, found := i.indexedBlocks[blockNum]
_, found := i.filteredBlocks[blockNum]
return found
}

func newIncomingBlocksFile(baseFileName string, blocksIndexed []uint64) *incomingBlocksFile {
func newIncomingBlocksFile(baseFileName string, filteredBlocks []uint64) *incomingBlocksFile {
ibf := &incomingBlocksFile{
filename: baseFileName,
blocks: make(chan *PreprocessedBlock, 0),
}
if blocksIndexed != nil {
ibf.indexedBlocks = make(map[uint64]bool)
for _, blk := range blocksIndexed {
ibf.indexedBlocks[blk] = true
if filteredBlocks != nil {
ibf.filteredBlocks = make(map[uint64]bool)
for _, blk := range filteredBlocks {
ibf.filteredBlocks[blk] = true

}
}
Expand Down
8 changes: 4 additions & 4 deletions filesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (s *FileSource) streamReader(blockReader BlockReader, prevLastBlockRead Blo
continue
}

if !incomingBlockFile.ShouldProcessBlock(blockNum) {
if !incomingBlockFile.PassesFilter(blockNum) {
continue
}

Expand Down Expand Up @@ -418,9 +418,9 @@ func (s *FileSource) preprocess(block *Block, out chan *PreprocessedBlock) {
obj: obj,
cursor: &Cursor{
Step: StepNewIrreversible,
Block: block,
LIB: block,
HeadBlock: block,
Block: block.AsRef(),
LIB: block.AsRef(),
HeadBlock: block.AsRef(),
}}

zlog.Debug("block pre processed", zap.Stringer("block_ref", block))
Expand Down
13 changes: 7 additions & 6 deletions forkable/forkable.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Forkable struct {
lastLongestChain []*Block
}

func (p *Forkable) BlocksFromIrreversibleNum(num uint64) (out []*bstream.PreprocessedBlock) {
func (p *Forkable) BlocksFromNum(num uint64) (out []*bstream.PreprocessedBlock) {
p.RLock()
defer p.RUnlock()

Expand All @@ -65,9 +65,6 @@ func (p *Forkable) BlocksFromIrreversibleNum(num uint64) (out []*bstream.Preproc
}

libNum := p.forkDB.libRef.Num()
if num > libNum {
return nil
}

var seenBlock bool
for i := range seg {
Expand All @@ -92,6 +89,10 @@ func (p *Forkable) BlocksFromIrreversibleNum(num uint64) (out []*bstream.Preproc
return out
}

func (p *Forkable) Linkable(blk *bstream.Block) bool {
return !bstream.IsEmpty(p.forkDB.BlockInCurrentChain(blk, blk.LibNum))
}

func blockIn(id string, array []*Block) bool {
for _, b := range array {
if id == b.BlockID {
Expand Down Expand Up @@ -273,8 +274,8 @@ func (p *Forkable) targetChainBlock(blk *bstream.Block) bstream.BlockRef {
return blk
}

func (p *Forkable) matchFilter(filter bstream.StepType) bool {
return p.filterSteps&filter != 0
func (p *Forkable) matchFilter(step bstream.StepType) bool {
return p.filterSteps&step != 0
}

func (p *Forkable) computeNewLongestChain(ppBlk *ForkableBlock) []*Block {
Expand Down
17 changes: 15 additions & 2 deletions forkable/forkable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1611,12 +1611,25 @@ func TestForkable_BlocksFromIrreversibleNum(t *testing.T) {
requestBlock: 5,
},
{
name: "no source cause not irreversible yet",
name: "source within reversible segment",
forkdbBlocks: []*bstream.Block{
bstream.TestBlockWithLIBNum("00000003", "00000002", 2),
bstream.TestBlockWithLIBNum("00000004", "00000003", 3),
bstream.TestBlockWithLIBNum("00000005", "00000004", 3),
},
expectBlocks: []expectedBlock{
{
bstream.TestBlockWithLIBNum("00000004", "00000003", 3),
bstream.StepNew,
3,
},
{
bstream.TestBlockWithLIBNum("00000005", "00000004", 3),
bstream.StepNew,
3,
},
},

requestBlock: 4,
},
}
Expand All @@ -1628,7 +1641,7 @@ func TestForkable_BlocksFromIrreversibleNum(t *testing.T) {
require.NoError(t, frkb.ProcessBlock(blk, nil))
}

out := frkb.BlocksFromIrreversibleNum(test.requestBlock)
out := frkb.BlocksFromNum(test.requestBlock)
var seenBlocks []expectedBlock
for _, blk := range out {
seenBlocks = append(seenBlocks, expectedBlock{blk.Block, blk.Obj.(*ForkableObject).Step(), blk.Obj.(*ForkableObject).Cursor().LIB.Num()})
Expand Down
6 changes: 0 additions & 6 deletions forkable/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ import (

type Option func(f *Forkable)

func WithForkDB(in *ForkDB) Option {
return func(f *Forkable) {
f.forkDB = in
}
}

func WithLogger(logger *zap.Logger) Option {
return func(f *Forkable) {
f.logger = logger
Expand Down
15 changes: 9 additions & 6 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ type ForkableHub struct {
*shutter.Shutter
sync.Mutex

forkdb *forkable.ForkDB
forkable *forkable.Forkable

keepFinalBlocks int

optionalHandler bstream.Handler
subscribers []*Subscription
sourceChannelSize int

Expand All @@ -44,22 +44,25 @@ type ForkableHub struct {
oneBlocksSourceFactory bstream.SourceFromNumFactory
}

func NewForkableHub(liveSourceFactory bstream.SourceFactory, oneBlocksSourceFactory bstream.SourceFromNumFactory, keepFinalBlocks int) *ForkableHub {
func NewForkableHub(liveSourceFactory bstream.SourceFactory, oneBlocksSourceFactory bstream.SourceFromNumFactory, keepFinalBlocks int, extraForkableOptions ...forkable.Option) *ForkableHub {
hub := &ForkableHub{
Shutter: shutter.New(),
liveSourceFactory: liveSourceFactory,
oneBlocksSourceFactory: oneBlocksSourceFactory,
keepFinalBlocks: keepFinalBlocks,
sourceChannelSize: 100, // number of blocks that can add up before the subscriber processes them
forkdb: forkable.NewForkDB(),
Ready: make(chan struct{}),
}

hub.forkable = forkable.New(hub,
forkable.WithForkDB(hub.forkdb),
forkable.HoldBlocksUntilLIB(),
forkable.WithKeptFinalBlocks(keepFinalBlocks),
)

for _, opt := range extraForkableOptions {
opt(hub.forkable)
}

return hub
}

Expand Down Expand Up @@ -113,7 +116,7 @@ func (h *ForkableHub) SourceFromBlockNum(num uint64, handler bstream.Handler) bs
h.Lock()
defer h.Unlock()

blocks := h.forkable.BlocksFromIrreversibleNum(num)
blocks := h.forkable.BlocksFromNum(num)
if blocks != nil {
return h.subscribe(handler, blocks)
}
Expand Down Expand Up @@ -152,7 +155,7 @@ func (h *ForkableHub) bootstrap(blk *bstream.Block) error {
return err
}

if h.forkdb.BlockInCurrentChain(blk, blk.LibNum) == bstream.BlockRefEmpty {
if !h.forkable.Linkable(blk) {
zlog.Warn("cannot initialize forkDB on a final block from available one-block-files. Will keep retrying on every block before we become ready")
return nil
}
Expand Down
25 changes: 4 additions & 21 deletions hub/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func TestForkableHub_Bootstrap(t *testing.T) {
expectStartNum uint64
expectReady bool
expectReadyAfter bool
expectHeadRef bstream.BlockRef
expectBlocksInCurrentChain []uint64
}{
{
Expand All @@ -41,8 +40,7 @@ func TestForkableHub_Bootstrap(t *testing.T) {
expectStartNum: 0,
expectReady: true,
expectReadyAfter: true,
expectHeadRef: bstream.NewBlockRefFromID("00000009"),
expectBlocksInCurrentChain: []uint64{4, 5, 8, 9},
expectBlocksInCurrentChain: []uint64{4, 5, 8, 9, 10},
},
{
name: "LIB met on second live block",
Expand All @@ -64,7 +62,6 @@ func TestForkableHub_Bootstrap(t *testing.T) {
expectStartNum: 0,
expectReady: false,
expectReadyAfter: true,
expectHeadRef: bstream.NewBlockRefFromID("0000000a"),
expectBlocksInCurrentChain: []uint64{5, 8, 9, 10},
},
{
Expand Down Expand Up @@ -114,7 +111,6 @@ func TestForkableHub_Bootstrap(t *testing.T) {
expectStartNum: 0,
expectReady: false,
expectReadyAfter: true,
expectHeadRef: bstream.NewBlockRefFromID("00000009"),
expectBlocksInCurrentChain: []uint64{3, 4, 6, 7, 8, 9},
},
}
Expand Down Expand Up @@ -152,11 +148,11 @@ func TestForkableHub_Bootstrap(t *testing.T) {
}
assert.Equal(t, test.expectReadyAfter, fh.ready)

if test.expectHeadRef != nil || test.expectBlocksInCurrentChain != nil {
if test.expectBlocksInCurrentChain != nil {
var seenBlockNums []uint64
chain, _ := fh.forkdb.CompleteSegment(test.expectHeadRef)
chain := fh.forkable.BlocksFromNum(test.expectBlocksInCurrentChain[0])
for _, blk := range chain {
seenBlockNums = append(seenBlockNums, blk.BlockNum)
seenBlockNums = append(seenBlockNums, blk.Num())
}
assert.Equal(t, test.expectBlocksInCurrentChain, seenBlockNums)
}
Expand Down Expand Up @@ -272,26 +268,15 @@ func TestForkableHub_SourceFromBlockNum(t *testing.T) {
},
requestBlock: 5,
},
{
name: "no source cause not irreversible yet",
forkdbBlocks: []*bstream.Block{
bstream.TestBlockWithLIBNum("00000003", "00000002", 2),
bstream.TestBlockWithLIBNum("00000004", "00000003", 3),
bstream.TestBlockWithLIBNum("00000005", "00000004", 3),
},
requestBlock: 4,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {

fh := &ForkableHub{
Shutter: shutter.New(),
forkdb: forkable.NewForkDB(),
}
fh.forkable = forkable.New(fh,
forkable.WithForkDB(fh.forkdb),
forkable.HoldBlocksUntilLIB(),
forkable.WithKeptFinalBlocks(100),
)
Expand Down Expand Up @@ -490,10 +475,8 @@ func TestForkableHub_SourceFromCursor(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fh := &ForkableHub{
Shutter: shutter.New(),
forkdb: forkable.NewForkDB(),
}
fh.forkable = forkable.New(fh,
forkable.WithForkDB(fh.forkdb),
forkable.HoldBlocksUntilLIB(),
forkable.WithKeptFinalBlocks(100),
)
Expand Down
Loading

0 comments on commit e141dd6

Please sign in to comment.