diff --git a/blocktypes.go b/blocktypes.go index c0f69b3..bf501c9 100644 --- a/blocktypes.go +++ b/blocktypes.go @@ -20,28 +20,28 @@ type incomingOneBlockFiles struct { } type incomingBlocksFile struct { - filename string // Base filename (%100 of block_num) - indexedBlocks map[uint64]bool - blocks chan *PreprocessedBlock + filename string // Base filename (%100 of block_num) + filteredBlocks map[uint64]bool + blocks chan *PreprocessedBlock } -func (i *incomingBlocksFile) ShouldProcessBlock(blockNum uint64) bool { - if i.indexedBlocks == nil { +func (i *incomingBlocksFile) PassesFilter(blockNum uint64) bool { + if i.filteredBlocks == nil { return true } - _, found := i.indexedBlocks[blockNum] + _, found := i.filteredBlocks[blockNum] return found } -func newIncomingBlocksFile(baseFileName string, blocksIndexed []uint64) *incomingBlocksFile { +func newIncomingBlocksFile(baseFileName string, filteredBlocks []uint64) *incomingBlocksFile { ibf := &incomingBlocksFile{ filename: baseFileName, blocks: make(chan *PreprocessedBlock, 0), } - if blocksIndexed != nil { - ibf.indexedBlocks = make(map[uint64]bool) - for _, blk := range blocksIndexed { - ibf.indexedBlocks[blk] = true + if filteredBlocks != nil { + ibf.filteredBlocks = make(map[uint64]bool) + for _, blk := range filteredBlocks { + ibf.filteredBlocks[blk] = true } } diff --git a/filesource.go b/filesource.go index 6da8eec..e35c2b0 100644 --- a/filesource.go +++ b/filesource.go @@ -373,7 +373,7 @@ func (s *FileSource) streamReader(blockReader BlockReader, prevLastBlockRead Blo continue } - if !incomingBlockFile.ShouldProcessBlock(blockNum) { + if !incomingBlockFile.PassesFilter(blockNum) { continue } @@ -418,9 +418,9 @@ func (s *FileSource) preprocess(block *Block, out chan *PreprocessedBlock) { obj: obj, cursor: &Cursor{ Step: StepNewIrreversible, - Block: block, - LIB: block, - HeadBlock: block, + Block: block.AsRef(), + LIB: block.AsRef(), + HeadBlock: block.AsRef(), }} zlog.Debug("block pre processed", zap.Stringer("block_ref", block)) diff --git a/forkable/forkable.go b/forkable/forkable.go index c614f1b..1081b35 100644 --- a/forkable/forkable.go +++ b/forkable/forkable.go @@ -46,7 +46,7 @@ type Forkable struct { lastLongestChain []*Block } -func (p *Forkable) BlocksFromIrreversibleNum(num uint64) (out []*bstream.PreprocessedBlock) { +func (p *Forkable) BlocksFromNum(num uint64) (out []*bstream.PreprocessedBlock) { p.RLock() defer p.RUnlock() @@ -65,9 +65,6 @@ func (p *Forkable) BlocksFromIrreversibleNum(num uint64) (out []*bstream.Preproc } libNum := p.forkDB.libRef.Num() - if num > libNum { - return nil - } var seenBlock bool for i := range seg { @@ -92,6 +89,10 @@ func (p *Forkable) BlocksFromIrreversibleNum(num uint64) (out []*bstream.Preproc return out } +func (p *Forkable) Linkable(blk *bstream.Block) bool { + return !bstream.IsEmpty(p.forkDB.BlockInCurrentChain(blk, blk.LibNum)) +} + func blockIn(id string, array []*Block) bool { for _, b := range array { if id == b.BlockID { @@ -273,8 +274,8 @@ func (p *Forkable) targetChainBlock(blk *bstream.Block) bstream.BlockRef { return blk } -func (p *Forkable) matchFilter(filter bstream.StepType) bool { - return p.filterSteps&filter != 0 +func (p *Forkable) matchFilter(step bstream.StepType) bool { + return p.filterSteps&step != 0 } func (p *Forkable) computeNewLongestChain(ppBlk *ForkableBlock) []*Block { diff --git a/forkable/forkable_test.go b/forkable/forkable_test.go index f5e9058..5af589c 100644 --- a/forkable/forkable_test.go +++ b/forkable/forkable_test.go @@ -1611,12 +1611,25 @@ func TestForkable_BlocksFromIrreversibleNum(t *testing.T) { requestBlock: 5, }, { - name: "no source cause not irreversible yet", + name: "source within reversible segment", forkdbBlocks: []*bstream.Block{ bstream.TestBlockWithLIBNum("00000003", "00000002", 2), bstream.TestBlockWithLIBNum("00000004", "00000003", 3), bstream.TestBlockWithLIBNum("00000005", "00000004", 3), }, + expectBlocks: []expectedBlock{ + { + bstream.TestBlockWithLIBNum("00000004", "00000003", 3), + bstream.StepNew, + 3, + }, + { + bstream.TestBlockWithLIBNum("00000005", "00000004", 3), + bstream.StepNew, + 3, + }, + }, + requestBlock: 4, }, } @@ -1628,7 +1641,7 @@ func TestForkable_BlocksFromIrreversibleNum(t *testing.T) { require.NoError(t, frkb.ProcessBlock(blk, nil)) } - out := frkb.BlocksFromIrreversibleNum(test.requestBlock) + out := frkb.BlocksFromNum(test.requestBlock) var seenBlocks []expectedBlock for _, blk := range out { seenBlocks = append(seenBlocks, expectedBlock{blk.Block, blk.Obj.(*ForkableObject).Step(), blk.Obj.(*ForkableObject).Cursor().LIB.Num()}) diff --git a/forkable/options.go b/forkable/options.go index a14a4c0..1a696c8 100644 --- a/forkable/options.go +++ b/forkable/options.go @@ -21,12 +21,6 @@ import ( type Option func(f *Forkable) -func WithForkDB(in *ForkDB) Option { - return func(f *Forkable) { - f.forkDB = in - } -} - func WithLogger(logger *zap.Logger) Option { return func(f *Forkable) { f.logger = logger diff --git a/hub/hub.go b/hub/hub.go index b3dbf61..efa736e 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -30,11 +30,11 @@ type ForkableHub struct { *shutter.Shutter sync.Mutex - forkdb *forkable.ForkDB forkable *forkable.Forkable keepFinalBlocks int + optionalHandler bstream.Handler subscribers []*Subscription sourceChannelSize int @@ -44,22 +44,25 @@ type ForkableHub struct { oneBlocksSourceFactory bstream.SourceFromNumFactory } -func NewForkableHub(liveSourceFactory bstream.SourceFactory, oneBlocksSourceFactory bstream.SourceFromNumFactory, keepFinalBlocks int) *ForkableHub { +func NewForkableHub(liveSourceFactory bstream.SourceFactory, oneBlocksSourceFactory bstream.SourceFromNumFactory, keepFinalBlocks int, extraForkableOptions ...forkable.Option) *ForkableHub { hub := &ForkableHub{ Shutter: shutter.New(), liveSourceFactory: liveSourceFactory, oneBlocksSourceFactory: oneBlocksSourceFactory, keepFinalBlocks: keepFinalBlocks, sourceChannelSize: 100, // number of blocks that can add up before the subscriber processes them - forkdb: forkable.NewForkDB(), Ready: make(chan struct{}), } hub.forkable = forkable.New(hub, - forkable.WithForkDB(hub.forkdb), forkable.HoldBlocksUntilLIB(), forkable.WithKeptFinalBlocks(keepFinalBlocks), ) + + for _, opt := range extraForkableOptions { + opt(hub.forkable) + } + return hub } @@ -113,7 +116,7 @@ func (h *ForkableHub) SourceFromBlockNum(num uint64, handler bstream.Handler) bs h.Lock() defer h.Unlock() - blocks := h.forkable.BlocksFromIrreversibleNum(num) + blocks := h.forkable.BlocksFromNum(num) if blocks != nil { return h.subscribe(handler, blocks) } @@ -152,7 +155,7 @@ func (h *ForkableHub) bootstrap(blk *bstream.Block) error { return err } - if h.forkdb.BlockInCurrentChain(blk, blk.LibNum) == bstream.BlockRefEmpty { + if !h.forkable.Linkable(blk) { zlog.Warn("cannot initialize forkDB on a final block from available one-block-files. Will keep retrying on every block before we become ready") return nil } diff --git a/hub/hub_test.go b/hub/hub_test.go index e104d2d..138a8ff 100644 --- a/hub/hub_test.go +++ b/hub/hub_test.go @@ -22,7 +22,6 @@ func TestForkableHub_Bootstrap(t *testing.T) { expectStartNum uint64 expectReady bool expectReadyAfter bool - expectHeadRef bstream.BlockRef expectBlocksInCurrentChain []uint64 }{ { @@ -41,8 +40,7 @@ func TestForkableHub_Bootstrap(t *testing.T) { expectStartNum: 0, expectReady: true, expectReadyAfter: true, - expectHeadRef: bstream.NewBlockRefFromID("00000009"), - expectBlocksInCurrentChain: []uint64{4, 5, 8, 9}, + expectBlocksInCurrentChain: []uint64{4, 5, 8, 9, 10}, }, { name: "LIB met on second live block", @@ -64,7 +62,6 @@ func TestForkableHub_Bootstrap(t *testing.T) { expectStartNum: 0, expectReady: false, expectReadyAfter: true, - expectHeadRef: bstream.NewBlockRefFromID("0000000a"), expectBlocksInCurrentChain: []uint64{5, 8, 9, 10}, }, { @@ -114,7 +111,6 @@ func TestForkableHub_Bootstrap(t *testing.T) { expectStartNum: 0, expectReady: false, expectReadyAfter: true, - expectHeadRef: bstream.NewBlockRefFromID("00000009"), expectBlocksInCurrentChain: []uint64{3, 4, 6, 7, 8, 9}, }, } @@ -152,11 +148,11 @@ func TestForkableHub_Bootstrap(t *testing.T) { } assert.Equal(t, test.expectReadyAfter, fh.ready) - if test.expectHeadRef != nil || test.expectBlocksInCurrentChain != nil { + if test.expectBlocksInCurrentChain != nil { var seenBlockNums []uint64 - chain, _ := fh.forkdb.CompleteSegment(test.expectHeadRef) + chain := fh.forkable.BlocksFromNum(test.expectBlocksInCurrentChain[0]) for _, blk := range chain { - seenBlockNums = append(seenBlockNums, blk.BlockNum) + seenBlockNums = append(seenBlockNums, blk.Num()) } assert.Equal(t, test.expectBlocksInCurrentChain, seenBlockNums) } @@ -272,15 +268,6 @@ func TestForkableHub_SourceFromBlockNum(t *testing.T) { }, requestBlock: 5, }, - { - name: "no source cause not irreversible yet", - forkdbBlocks: []*bstream.Block{ - bstream.TestBlockWithLIBNum("00000003", "00000002", 2), - bstream.TestBlockWithLIBNum("00000004", "00000003", 3), - bstream.TestBlockWithLIBNum("00000005", "00000004", 3), - }, - requestBlock: 4, - }, } for _, test := range tests { @@ -288,10 +275,8 @@ func TestForkableHub_SourceFromBlockNum(t *testing.T) { fh := &ForkableHub{ Shutter: shutter.New(), - forkdb: forkable.NewForkDB(), } fh.forkable = forkable.New(fh, - forkable.WithForkDB(fh.forkdb), forkable.HoldBlocksUntilLIB(), forkable.WithKeptFinalBlocks(100), ) @@ -490,10 +475,8 @@ func TestForkableHub_SourceFromCursor(t *testing.T) { t.Run(test.name, func(t *testing.T) { fh := &ForkableHub{ Shutter: shutter.New(), - forkdb: forkable.NewForkDB(), } fh.forkable = forkable.New(fh, - forkable.WithForkDB(fh.forkdb), forkable.HoldBlocksUntilLIB(), forkable.WithKeptFinalBlocks(100), ) diff --git a/joiningsource.go b/joiningsource.go index 9dfe900..84fbd53 100644 --- a/joiningsource.go +++ b/joiningsource.go @@ -25,33 +25,34 @@ import ( var stopSourceOnJoin = errors.New("stopping source on join") -// JoiningSource joins an irreversible-only source (left) to a fork-aware source, close to HEAD (right) -// 1) it tries to get the source from RightSourceFactory (using startblock or cursor) -// 2) if it can't, it will ask the LeftSourceFactory for a source of those blocks. -// 3) when it receives blocks from LeftSource, it looks at RightSource +// JoiningSource joins an irreversible-only source (file) to a fork-aware source close to HEAD (live) +// 1) it tries to get the source from LiveSourceFactory (using startblock or cursor) +// 2) if it can't, it will ask the FileSourceFactory for a source of those blocks. +// 3) when it receives blocks from Filesource, it looks at LiveSource // the JoiningSource will instantiate and run an 'initialSource' until it can bridge the gap type JoiningSource struct { *shutter.Shutter - leftSourceFactory ForkableSourceFactory - rightSourceFactory ForkableSourceFactory + fileSourceFactory ForkableSourceFactory + liveSourceFactory ForkableSourceFactory - lowestRightSourceBlockNum uint64 - rightSource Source - sourcesLock sync.Mutex + lowestLiveBlockNum uint64 + liveSource Source + sourcesLock sync.Mutex handler Handler lastBlockProcessed *Block - irreversibleStartBlockNum uint64 // overriden by cursor if it exists - cursor *Cursor + startBlockNum uint64 // overriden by cursor if it exists + cursor *Cursor logger *zap.Logger } -func NewJoiningSource(leftSourceFactory, - rightSourceFactory ForkableSourceFactory, +func NewJoiningSource( + fileSourceFactory, + liveSourceFactory ForkableSourceFactory, h Handler, startBlockNum uint64, cursor *Cursor, @@ -59,13 +60,13 @@ func NewJoiningSource(leftSourceFactory, logger.Info("creating new joining source", zap.Stringer("cursor", cursor), zap.Uint64("start_block_num", startBlockNum)) s := &JoiningSource{ - Shutter: shutter.New(), - leftSourceFactory: leftSourceFactory, - rightSourceFactory: rightSourceFactory, - handler: h, - irreversibleStartBlockNum: startBlockNum, - cursor: cursor, - logger: logger, + Shutter: shutter.New(), + fileSourceFactory: fileSourceFactory, + liveSourceFactory: liveSourceFactory, + handler: h, + startBlockNum: startBlockNum, + cursor: cursor, + logger: logger, } return s @@ -77,64 +78,58 @@ func (s *JoiningSource) Run() { func (s *JoiningSource) run() error { - // if rightSource works, no need for leftSource or wrapped handler - if src := tryGetSource(s.handler, - s.rightSourceFactory, - s.irreversibleStartBlockNum, - s.cursor); src != nil { - s.rightSource = src + // if liveSource works, no need for fileSource or wrapped handler + if src := s.tryGetSource(s.handler, s.liveSourceFactory); src != nil { + s.liveSource = src - s.OnTerminating(s.rightSource.Shutdown) - s.rightSource.Run() - return s.rightSource.Err() + s.OnTerminating(s.liveSource.Shutdown) + s.liveSource.Run() + return s.liveSource.Err() } - if lowestBlockGetter, ok := s.rightSourceFactory.(LowSourceLimitGetter); ok { - s.lowestRightSourceBlockNum = lowestBlockGetter.LowestBlockNum() + if lowestBlockGetter, ok := s.liveSourceFactory.(LowSourceLimitGetter); ok { + s.lowestLiveBlockNum = lowestBlockGetter.LowestBlockNum() } - leftSrc := tryGetSource(HandlerFunc(s.leftSourceHandler), - s.leftSourceFactory, - s.irreversibleStartBlockNum, - s.cursor) + fileSrc := s.tryGetSource(HandlerFunc(s.fileSourceHandler), s.fileSourceFactory) - if leftSrc == nil { + if fileSrc == nil { return fmt.Errorf("cannot run joining_source: start_block %d (cursor %s) not found", - s.irreversibleStartBlockNum, + s.startBlockNum, s.cursor.String()) } - s.OnTerminating(leftSrc.Shutdown) - leftSrc.Run() + s.OnTerminating(fileSrc.Shutdown) + fileSrc.Run() - if s.rightSource == nil { // got stopped before joining - return leftSrc.Err() + if s.liveSource == nil { // got stopped before joining + return fileSrc.Err() } - s.OnTerminating(s.rightSource.Shutdown) - s.rightSource.Run() - return s.rightSource.Err() + s.OnTerminating(s.liveSource.Shutdown) + s.liveSource.Run() + return s.liveSource.Err() } -func tryGetSource(handler Handler, factory ForkableSourceFactory, irrBlockNum uint64, cursor *Cursor) Source { - if cursor != nil { - return factory.SourceFromCursor(cursor, handler) +func (s *JoiningSource) tryGetSource(handler Handler, factory ForkableSourceFactory) Source { + if s.cursor != nil { + return factory.SourceFromCursor(s.cursor, handler) } - return factory.SourceFromBlockNum(irrBlockNum, handler) + return factory.SourceFromBlockNum(s.startBlockNum, handler) } -func (s *JoiningSource) leftSourceHandler(blk *Block, obj interface{}) error { - if s.rightSource != nil { // we should be already shutdown anyway +func (s *JoiningSource) fileSourceHandler(blk *Block, obj interface{}) error { + if s.liveSource != nil { // we should be already shutdown anyway return nil } - if blk.Number >= s.lowestRightSourceBlockNum { - if src := s.rightSourceFactory.SourceFromBlockNum(blk.Number, s.handler); src != nil { - s.rightSource = src + if blk.Number >= s.lowestLiveBlockNum { + if src := s.liveSourceFactory.SourceFromBlockNum(blk.Number, s.handler); src != nil { + s.liveSource = src return stopSourceOnJoin } - if lowestBlockGetter, ok := s.rightSourceFactory.(LowSourceLimitGetter); ok { - s.lowestRightSourceBlockNum = lowestBlockGetter.LowestBlockNum() + if lowestBlockGetter, ok := s.liveSourceFactory.(LowSourceLimitGetter); ok { + s.lowestLiveBlockNum = lowestBlockGetter.LowestBlockNum() } } diff --git a/joiningsource_test.go b/joiningsource_test.go index 07a8a92..a1e0464 100644 --- a/joiningsource_test.go +++ b/joiningsource_test.go @@ -42,119 +42,119 @@ func TestJoiningSource_vanilla(t *testing.T) { joiningBlock := uint64(4) failingBlock := uint64(5) - leftSF := NewTestSourceFactory() - rightSF := NewTestSourceFactory() + fileSF := NewTestSourceFactory() + liveSF := NewTestSourceFactory() - var rightSrc *TestSource + var liveSrc *TestSource handler, out := testHandler(failingBlock) - rightSF.FromBlockNumFunc = func(num uint64, h Handler) Source { + liveSF.FromBlockNumFunc = func(num uint64, h Handler) Source { if num == joiningBlock { src := NewTestSource(h) - rightSrc = src + liveSrc = src return src } return nil } - joiningSource := NewJoiningSource(leftSF, rightSF, handler, 2, nil, zlog) + joiningSource := NewJoiningSource(fileSF, liveSF, handler, 2, nil, zlog) go joiningSource.Run() - leftSrc := <-leftSF.Created - <-leftSrc.running // test fixture ready to push blocks - assert.Equal(t, uint64(2), leftSrc.StartBlockNum) + fileSrc := <-fileSF.Created + <-fileSrc.running // test fixture ready to push blocks + assert.Equal(t, uint64(2), fileSrc.StartBlockNum) - require.NoError(t, leftSrc.Push(TestBlock("00000002a", "00000001a"), nil)) - require.NoError(t, leftSrc.Push(TestBlock("00000003a", "00000002a"), nil)) - require.EqualError(t, leftSrc.Push(TestBlock("00000004a", "00000003a"), nil), + require.NoError(t, fileSrc.Push(TestBlock("00000002a", "00000001a"), nil)) + require.NoError(t, fileSrc.Push(TestBlock("00000003a", "00000002a"), nil)) + require.EqualError(t, fileSrc.Push(TestBlock("00000004a", "00000003a"), nil), stopSourceOnJoin.Error()) - <-leftSrc.Terminated() // previous error causes termination + <-fileSrc.Terminated() // previous error causes termination assert.Equal(t, 2, len(out)) - require.NotNil(t, rightSrc, "we should have joined to right source") + require.NotNil(t, liveSrc, "we should have joined to live source") - require.NoError(t, rightSrc.Push(TestBlock("00000004a", "00000003a"), nil)) + require.NoError(t, liveSrc.Push(TestBlock("00000004a", "00000003a"), nil)) assert.Equal(t, 3, len(out)) - require.EqualError(t, rightSrc.Push(TestBlock("00000005a", "00000004a"), nil), + require.EqualError(t, liveSrc.Push(TestBlock("00000005a", "00000004a"), nil), errTestMock.Error()) - <-rightSrc.Terminated() + <-liveSrc.Terminated() <-joiningSource.Terminated() } -func TestJoiningSource_skip_left_source(t *testing.T) { +func TestJoiningSource_skip_file_source(t *testing.T) { var fileSF ForkableSourceFactory //not used - rightSF := NewTestSourceFactory() + liveSF := NewTestSourceFactory() handler, out := testHandler(0) - joiningSource := NewJoiningSource(fileSF, rightSF, handler, 2, nil, zlog) + joiningSource := NewJoiningSource(fileSF, liveSF, handler, 2, nil, zlog) go joiningSource.Run() - rightSrc := <-rightSF.Created - <-rightSrc.running + liveSrc := <-liveSF.Created + <-liveSrc.running - assert.Equal(t, uint64(2), rightSrc.StartBlockNum) + assert.Equal(t, uint64(2), liveSrc.StartBlockNum) - require.NoError(t, rightSrc.Push(TestBlock("00000002a", "00000001a"), nil)) - require.NoError(t, rightSrc.Push(TestBlock("00000003a", "00000002a"), nil)) + require.NoError(t, liveSrc.Push(TestBlock("00000002a", "00000001a"), nil)) + require.NoError(t, liveSrc.Push(TestBlock("00000003a", "00000002a"), nil)) assert.Len(t, out, 2) joiningSource.Shutdown(errTestMock) - <-rightSrc.Terminated() // previous error causes termination + <-liveSrc.Terminated() // previous error causes termination <-joiningSource.Terminated() // previous error causes termination } func TestJoiningSource_lowerLimitBackoff(t *testing.T) { - leftSF := NewTestSourceFactory() - rightSF := NewTestSourceFactory() + fileSF := NewTestSourceFactory() + liveSF := NewTestSourceFactory() - rightSF.LowestBlkNum = 8 + liveSF.LowestBlkNum = 8 joiningBlock := uint64(9) - var rightSrc *TestSource - rightSourceFactoryCalls := 0 + var liveSrc *TestSource + liveSourceFactoryCalls := 0 handler, out := testHandler(0) - rightSF.FromBlockNumFunc = func(num uint64, h Handler) Source { - rightSourceFactoryCalls++ + liveSF.FromBlockNumFunc = func(num uint64, h Handler) Source { + liveSourceFactoryCalls++ if num == joiningBlock { src := NewTestSource(h) - rightSrc = src + liveSrc = src return src } return nil } - joiningSource := NewJoiningSource(leftSF, rightSF, handler, 1, nil, zlog) + joiningSource := NewJoiningSource(fileSF, liveSF, handler, 1, nil, zlog) go joiningSource.Run() - leftSrc := <-leftSF.Created - <-leftSrc.running - assert.Equal(t, uint64(1), leftSrc.StartBlockNum) + fileSrc := <-fileSF.Created + <-fileSrc.running + assert.Equal(t, uint64(1), fileSrc.StartBlockNum) - require.NoError(t, leftSrc.Push(TestBlock("00000001a", "00000000a"), nil)) - require.NoError(t, leftSrc.Push(TestBlock("00000002a", "00000001a"), nil)) - require.NoError(t, leftSrc.Push(TestBlock("00000003a", "00000002a"), nil)) - require.NoError(t, leftSrc.Push(TestBlock("00000004a", "00000003a"), nil)) - require.NoError(t, leftSrc.Push(TestBlock("00000005a", "00000004a"), nil)) - require.NoError(t, leftSrc.Push(TestBlock("00000006a", "00000005a"), nil)) - require.NoError(t, leftSrc.Push(TestBlock("00000007a", "00000006a"), nil)) - require.NoError(t, leftSrc.Push(TestBlock("00000008a", "00000007a"), nil)) + require.NoError(t, fileSrc.Push(TestBlock("00000001a", "00000000a"), nil)) + require.NoError(t, fileSrc.Push(TestBlock("00000002a", "00000001a"), nil)) + require.NoError(t, fileSrc.Push(TestBlock("00000003a", "00000002a"), nil)) + require.NoError(t, fileSrc.Push(TestBlock("00000004a", "00000003a"), nil)) + require.NoError(t, fileSrc.Push(TestBlock("00000005a", "00000004a"), nil)) + require.NoError(t, fileSrc.Push(TestBlock("00000006a", "00000005a"), nil)) + require.NoError(t, fileSrc.Push(TestBlock("00000007a", "00000006a"), nil)) + require.NoError(t, fileSrc.Push(TestBlock("00000008a", "00000007a"), nil)) - require.EqualError(t, leftSrc.Push(TestBlock("00000009a", "00000008a"), nil), + require.EqualError(t, fileSrc.Push(TestBlock("00000009a", "00000008a"), nil), stopSourceOnJoin.Error()) - <-leftSrc.Terminated() // previous error causes termination + <-fileSrc.Terminated() // previous error causes termination assert.Equal(t, 8, len(out)) - require.NotNil(t, rightSrc, "we should have joined to right source") - require.NoError(t, rightSrc.Push(TestBlock("00000009a", "00000008a"), nil)) + require.NotNil(t, liveSrc, "we should have joined to live source") + require.NoError(t, liveSrc.Push(TestBlock("00000009a", "00000008a"), nil)) assert.Equal(t, 9, len(out)) - assert.Equal(t, 3, rightSourceFactoryCalls) + assert.Equal(t, 3, liveSourceFactoryCalls) }