Skip to content

Commit

Permalink
remove some old oneblockfile stuff and startBlockResovler, replace by…
Browse files Browse the repository at this point in the history
… new
  • Loading branch information
sduchesneau committed Jun 23, 2022
1 parent 931b8ad commit 86ca353
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 392 deletions.
10 changes: 0 additions & 10 deletions filesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ func NewFileSource(
preprocessorThreadCount: 1,
}

blockStoreUrl := blocksStore.BaseURL()
s.oneBlockFileMode = len(blockStoreUrl.Query()["oneblocks"]) > 0

for _, option := range options {
option(s)
}
Expand All @@ -142,13 +139,6 @@ func (s *FileSource) Run() {
}

func (s *FileSource) run() error {
if s.oneBlockFileMode {
return s.runOneBlockFile()
}
return s.runMergeFile()
}

func (s *FileSource) runMergeFile() error {

go s.launchSink()

Expand Down
87 changes: 87 additions & 0 deletions fork_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package bstream

import (
"fmt"
)

type resolver struct {
// FIXME add context ...
mergedBlockFilesGetter func(base uint64) (map[string]*OneBlockFile, error)
oneBlockFilesGetter func(upTo uint64) map[string]*OneBlockFile

// func (s *DStoreIO) FetchMergedOneBlockFiles(lowBlockNum uint64) ([]*OneBlockFile, error) {
// function to get the list... already have that somewhere in merger
}

func parseFilenames(in []string, upTo uint64) map[string]*OneBlockFile {
out := make(map[string]*OneBlockFile)
for _, f := range in {
obf, err := NewOneBlockFile(f)
if err != nil {
continue
}
if obf.Num > upTo {
continue
}
out[obf.ID] = obf
}
return out
}

func (r *resolver) download(file *OneBlockFile) BlockRef {
return NewBlockRef(file.ID, file.Num) //FIXME need to add the block file downloader
}

func (r *resolver) loadPreviousMergedBlocks(base uint64, blocks map[string]*OneBlockFile) error {
loadedBlocks, err := r.mergedBlockFilesGetter(base)
if err != nil {
return err
}
for k, v := range loadedBlocks {
blocks[k] = v
}
return nil
}

func (r *resolver) resolve(block BlockRef, lib BlockRef) (undoBlocks []BlockRef, continueAfter uint64, err error) {
base := block.Num() / 100 * 100
mergedBlocks, err := r.mergedBlockFilesGetter(base)
if err != nil {
return nil, 0, err
}
nextID := TruncateBlockID(block.ID())
oneBlocks := r.oneBlockFilesGetter(block.Num())

for {
if blk := mergedBlocks[nextID]; blk != nil {
continueAfter = blk.Num
break
}

forkedBlock := oneBlocks[nextID]

if forkedBlock == nil && base <= lib.Num() {
return nil, 0, fmt.Errorf("cannot resolve block %s: no oneBlockFile or merged block found with ID %s", block, nextID)
}

// also true when forkedBlock.Num < base && base <= lib.Num()
if forkedBlock.Num < lib.Num() {
return nil, 0, fmt.Errorf("cannot resolve block %s: forked chain goes beyond LIB, looking for ID %s (this should not happens)", block, nextID)
}

if forkedBlock == nil || forkedBlock.Num < base {
base -= 100
err := r.loadPreviousMergedBlocks(base, mergedBlocks)
if err != nil {
return nil, 0, fmt.Errorf("cannot resolve block %s (cannot load previous bundle (%d): %w)", block, base, err)
}
continue // retry with more mergedBlocks loaded
}

undoBlocks = append(undoBlocks, r.download(forkedBlock))
nextID = forkedBlock.PreviousID

}

return
}
23 changes: 19 additions & 4 deletions oneblockfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ package bstream
import (
"context"
"fmt"
"io/ioutil"
"strconv"
"strings"
"time"

"github.com/streamingfast/dstore"
)

type OneBlockDownloaderFunc = func(ctx context.Context, oneBlockFile *OneBlockFile) (data []byte, err error)
Expand Down Expand Up @@ -103,11 +106,8 @@ func (f *OneBlockFile) LibNum() uint64 {
}

// ParseFilename parses file names formatted like:
// * 0000000100-20170701T122141.0-24a07267-e5914b39
// * 0000000101-20170701T122141.5-dbda3f44-24a07267-mindread1

// * 0000000101-20170701T122141.5-dbda3f44-24a07267-100-mindread1
// * 0000000101-20170701T122141.5-dbda3f44-24a07267-101-mindread2
// * 0000000101-20170701T122141.5-dbda3f44-24a07267-100-mindread2

func ParseFilename(filename string) (blockNum uint64, blockTime time.Time, blockIDSuffix string, previousBlockIDSuffix string, libNum *uint64, canonicalName string, err error) {
parts := strings.Split(filename, "-")
Expand Down Expand Up @@ -165,3 +165,18 @@ func BlockFileNameWithSuffix(block *Block, suffix string) string {

return fmt.Sprintf("%010d-%s-%s-%s-%d-%s", block.Num(), blockTimeString, blockID, previousID, block.LibNum, suffix)
}

func OneBlockDownloaderFromStore(blocksStore dstore.Store) OneBlockDownloaderFunc {
return func(ctx context.Context, obf *OneBlockFile) ([]byte, error) {
for filename := range obf.Filenames {
reader, err := blocksStore.OpenObject(ctx, filename)
if err != nil {
return nil, fmt.Errorf("fetching %s from block store: %w", filename, err)
}
defer reader.Close()

return ioutil.ReadAll(reader)
}
return nil, fmt.Errorf("no filename for this oneBlockFile")
}
}
179 changes: 0 additions & 179 deletions oneblockfilesource.go

This file was deleted.

16 changes: 0 additions & 16 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,22 +134,6 @@ func (s *Stream) createSource(ctx context.Context) (bstream.Source, error) {
return jsf(s.cursor.Block.Num(), forkableHandler), nil
}

if s.tracker != nil {
irreversibleStartBlockNum, previousIrreversibleID, err := s.tracker.ResolveStartBlock(ctx, absoluteStartBlockNum)
if err != nil {
return nil, fmt.Errorf("failed to resolve start block: %w", err)
}
var irrRef bstream.BlockRef
if previousIrreversibleID != "" {
irrRef = bstream.NewBlockRef(previousIrreversibleID, irreversibleStartBlockNum)
}

forkableHandlerWrapper := s.forkableHandlerWrapper(nil, true, absoluteStartBlockNum)
forkableHandler := forkableHandlerWrapper(h, irrRef)
jsf := s.joiningSourceFactoryFromResolvedBlock(irreversibleStartBlockNum, previousIrreversibleID)
return jsf(absoluteStartBlockNum, forkableHandler), nil
}

// no cursor, no tracker, probably just block files on disk
forkableHandlerWrapper := s.forkableHandlerWrapper(nil, false, absoluteStartBlockNum)
forkableHandler := forkableHandlerWrapper(h, nil)
Expand Down
Loading

0 comments on commit 86ca353

Please sign in to comment.