Skip to content

Commit

Permalink
fix a few things in hub, filesource and stream
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Jul 13, 2022
1 parent e4696dc commit 4fc4125
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 17 deletions.
6 changes: 3 additions & 3 deletions blocktypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ func (i *incomingBlocksFile) ShouldProcessBlock(blockNum uint64) bool {

func newIncomingBlocksFile(baseFileName string, blocksIndexed []uint64) *incomingBlocksFile {
ibf := &incomingBlocksFile{
filename: baseFileName,
indexedBlocks: nil,
blocks: make(chan *PreprocessedBlock, 0),
filename: baseFileName,
blocks: make(chan *PreprocessedBlock, 0),
}
if blocksIndexed != nil {
ibf.indexedBlocks = make(map[uint64]bool)
for _, blk := range blocksIndexed {
ibf.indexedBlocks[blk] = true

Expand Down
21 changes: 10 additions & 11 deletions filesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,22 +222,16 @@ func (s *FileSource) run() (err error) {
}
}

// if filteredBlocks == nil : act normally, sinon use it

///
// if nil, process everything...

s.logger.Debug("file stream looking for", zap.Uint64("base_block_num", baseBlockNum))
blocksStore := s.blocksStore // default
baseFilename := fmt.Sprintf("%010d", baseBlockNum)

exists, err := blocksStore.FileExists(ctx, baseFilename)
exists, err := s.blocksStore.FileExists(ctx, baseFilename)
if err != nil {
return fmt.Errorf("reading file existence: %w", err)
}

if !exists {
s.logger.Info("reading from blocks store: file does not (yet?) exist, retrying in", zap.String("filename", blocksStore.ObjectPath(baseFilename)), zap.String("base_filename", baseFilename), zap.Any("retry_delay", s.retryDelay))
s.logger.Info("reading from blocks store: file does not (yet?) exist, retrying in", zap.String("filename", s.blocksStore.ObjectPath(baseFilename)), zap.String("base_filename", baseFilename), zap.Any("retry_delay", s.retryDelay))
delay = s.retryDelay
continue
}
Expand All @@ -255,14 +249,15 @@ func (s *FileSource) run() (err error) {

go func() {
s.logger.Debug("launching processing of file", zap.String("base_filename", baseFilename))
if err := s.streamIncomingFile(newIncomingFile, blocksStore); err != nil {
if err := s.streamIncomingFile(newIncomingFile, s.blocksStore); err != nil {
s.Shutdown(fmt.Errorf("processing of file %q failed: %w", baseFilename, err))
}
}()

baseBlockNum += s.bundleSize
if s.stopBlockNum != 0 && baseBlockNum > s.stopBlockNum {
<-s.Terminating() // FIXME just waiting for termination by the caller
close(s.fileStream)
<-s.Terminating()
return nil
}
}
Expand Down Expand Up @@ -466,7 +461,11 @@ func (s *FileSource) launchSink() {
case <-s.Terminating():
zlog.Debug("terminating by launch sink")
return
case incomingFile := <-s.fileStream:
case incomingFile, ok := <-s.fileStream:
if !ok {
s.Shutdown(nil)
return
}
s.logger.Debug("feeding from incoming file", zap.String("filename", incomingFile.filename))

for preBlock := range incomingFile.blocks {
Expand Down
10 changes: 8 additions & 2 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ func NewForkableHub(liveSourceFactory bstream.SourceFactory, oneBlocksSourceFact
}

func (h *ForkableHub) LowestBlockNum() uint64 {
if h.ready {
if h != nil && h.ready {
return h.forkable.LowestBlockNum()
}
return 0
}

func (h *ForkableHub) HeadNum() uint64 {
if h.ready {
if h != nil && h.ready {
return h.forkable.HeadNum()
}
return 0
Expand Down Expand Up @@ -107,6 +107,9 @@ func (h *ForkableHub) unsubscribe(removeSub *Subscription) {
}

func (h *ForkableHub) SourceFromBlockNum(num uint64, handler bstream.Handler) bstream.Source {
if h == nil {
return nil
}
h.Lock()
defer h.Unlock()

Expand All @@ -118,6 +121,9 @@ func (h *ForkableHub) SourceFromBlockNum(num uint64, handler bstream.Handler) bs
}

func (h *ForkableHub) SourceFromCursor(cursor *bstream.Cursor, handler bstream.Handler) bstream.Source {
if h == nil {
return nil
}
h.Lock()
defer h.Unlock()

Expand Down
4 changes: 3 additions & 1 deletion stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ func New(
s.fileSourceFactory = bstream.NewFileSourceFactory(
mergedBlocksStore,
oneBlocksStore,
s.logger)
s.logger,
fileSourceOptions...,
)

return s
}
Expand Down

0 comments on commit 4fc4125

Please sign in to comment.