Skip to content

Commit

Permalink
fail early if incomplete
Browse files Browse the repository at this point in the history
allow retry if incomplete
  • Loading branch information
mmsqe committed Dec 12, 2022
1 parent ac15f12 commit 575a003
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 19 deletions.
13 changes: 11 additions & 2 deletions client/decoder.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package client

import (
"fmt"

"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/gogo/protobuf/proto"
)

func DecodeData(data []byte) (pairs []types.StoreKVPair, err error) {
offset := 8
for offset < len(data) {
const prefixLen = 8
offset := prefixLen
dataSize := sdk.BigEndianToUint64(data[:offset])
size := len(data)
if int(dataSize)+prefixLen != size {
return nil, fmt.Errorf("incomplete file: %v vs %v", dataSize, size)
}
for offset < size {
size, n := proto.DecodeVarint(data[offset:])
offset += n
pair := new(types.StoreKVPair)
Expand Down
24 changes: 13 additions & 11 deletions client/file/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ func Sync(versionDB *tmdb.Store, remoteGrpcUrl, remoteUrl, remoteWsUrl, rootDir
case data := <-chData:
pairs, err := client.DecodeData(data.Data)
fmt.Printf("mm-pairs: %+v, %+v\n", len(pairs), err)
if err == nil {
if err = versionDB.PutAtVersion(int64(data.BlockNum), pairs); err != nil {
fmt.Println("mm-put-at-version-panic")
panic(err)
}
if err != nil {
fmt.Println("invalid decode")
} else if err = versionDB.PutAtVersion(int64(data.BlockNum), pairs); err != nil {
fmt.Println("mm-put-at-version-panic")
panic(err)
}
data.ChResult <- err
case err := <-chErr:
// fail read
fmt.Println("mm-fail-read-panic")
Expand Down Expand Up @@ -150,12 +151,14 @@ func Sync(versionDB *tmdb.Store, remoteGrpcUrl, remoteUrl, remoteWsUrl, rootDir
if err := os.WriteFile(file, data.Data, 0600); err != nil {
fmt.Println("mm-WriteFile-panic")
panic(err)
} else {
retry = 0
fmt.Println("mm-reset-retry")
if data.BlockNum > maxBlockNum {
streamer.SetMaxBlockNum(data.BlockNum)
}
}
retry = 0
fmt.Println("mm-reset-retry")
if data.BlockNum > maxBlockNum {
streamer.SetMaxBlockNum(data.BlockNum)
}
data.ChResult <- err
case err := <-chErr:
retry++
fmt.Println("mm-retry", retry)
Expand Down Expand Up @@ -200,7 +203,6 @@ func Sync(versionDB *tmdb.Store, remoteGrpcUrl, remoteUrl, remoteWsUrl, rootDir
blockNum := int(data.Header.Height)
fmt.Printf("mm-set-max-blk: %+v\n", blockNum)
synchronizer.SetMaxBlockNum(blockNum)
streamer.SetMaxBlockNum(blockNum)
}
}
panic(fmt.Sprintf("max retries %d reached", defaultMaxRetry))
Expand Down
24 changes: 18 additions & 6 deletions client/file/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -56,18 +55,20 @@ func (d *httpFileDownloader) GetData(path string) ([]byte, error) {
type BlockData struct {
BlockNum int
Data []byte
ChResult chan<- error
}

type BlockFileWatcher struct {
concurrency int
maxBlockNum int64
maxBlockNum int
getFilePath func(blockNum int) string
onBeforeFetch func(blockNum int) bool
downloader fileDownloader
chData chan *BlockData
chError chan error
chDone chan bool
startLock *sync.Mutex
maxBlockLock *sync.RWMutex
}

func NewBlockFileWatcher(
Expand All @@ -79,12 +80,13 @@ func NewBlockFileWatcher(
) *BlockFileWatcher {
w := &BlockFileWatcher{
concurrency: concurrency,
maxBlockNum: int64(maxBlockNum),
maxBlockNum: maxBlockNum,
getFilePath: getFilePath,
onBeforeFetch: onBeforeFetch,
chData: make(chan *BlockData),
chError: make(chan error),
startLock: new(sync.Mutex),
maxBlockLock: new(sync.RWMutex),
}
if isLocal {
w.downloader = new(localFileDownloader)
Expand All @@ -111,7 +113,12 @@ func (w *BlockFileWatcher) SubscribeError() <-chan error {
}

func (w *BlockFileWatcher) SetMaxBlockNum(num int) {
atomic.StoreInt64(&w.maxBlockNum, int64(num))
w.maxBlockLock.Lock()
defer w.maxBlockLock.Unlock()
// avoid dup job when set max to smaller one while locking
if num > w.maxBlockNum {
w.maxBlockNum = num
}
}

func (w *BlockFileWatcher) fetch(blockNum int) error {
Expand All @@ -131,11 +138,14 @@ func (w *BlockFileWatcher) fetch(blockNum int) error {
}
return err
}

chResult := make(chan error)
w.chData <- &BlockData{
BlockNum: blockNum,
Data: data,
ChResult: chResult,
}
return nil
return <-chResult
}

func (w *BlockFileWatcher) Start(
Expand All @@ -158,7 +168,9 @@ func (w *BlockFileWatcher) Start(
default:
wg := new(sync.WaitGroup)
currentBlockNum := blockNum
maxBlockNum := int(atomic.LoadInt64(&w.maxBlockNum))
w.maxBlockLock.RLock()
maxBlockNum := w.maxBlockNum
w.maxBlockLock.RUnlock()
concurrency := w.concurrency
if diff := maxBlockNum - currentBlockNum; diff < concurrency {
if diff <= 0 {
Expand Down

0 comments on commit 575a003

Please sign in to comment.