Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work on db replay #2284

Merged
merged 1 commit into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion byzcoin/bcadmin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ and return success:

```bash
bcadmin db catchup cached.db _bcID_ _url_
bcadmin db replay cached.db _bcID_ --continue
bcadmin db replay cached.db _bcID_
```

The `_bcID_` has to be replaced by the hexadecimal representation of the
Expand Down
119 changes: 73 additions & 46 deletions byzcoin/bcadmin/cmd_db.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package main

import (
"encoding/binary"
"encoding/hex"
"errors"
"flag"
"fmt"
"go.dedis.ch/cothority/v3/byzcoin/trie"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -90,12 +92,10 @@ func dbReplay(c *cli.Context) error {
log.Info("Preparing db")
start := *fb.bcID
err = fb.boltDB.Update(func(tx *bbolt.Tx) error {
if !fb.flagReplayCont {
if tx.Bucket(fb.bucketName) != nil {
err := tx.DeleteBucket(fb.bucketName)
if err != nil {
return err
}
if tx.Bucket(fb.bucketName) != nil {
err := tx.DeleteBucket(fb.bucketName)
if err != nil {
return err
}
}
_, err := tx.CreateBucketIfNotExists(fb.bucketName)
Expand All @@ -108,46 +108,51 @@ func dbReplay(c *cli.Context) error {
return xerrors.Errorf("couldn't add bucket: %+v", err)
}

if !fb.flagReplayCont {
_, err = fb.service.ReplayStateDB(fb.boltDB, fb.bucketName, fb.genesis)
if err != nil {
return xerrors.Errorf("couldn't create stateDB: %+v", err)
}
} else {
index, err := fb.service.ReplayStateDB(fb.boltDB, fb.bucketName, nil)
if err != nil {
return xerrors.Errorf("couldn't replay blocks: %+v", err)
}

log.Info("Searching for block with index", index+1)
sb := fb.db.GetByID(start)
for sb != nil && sb.Index < index+1 {
if len(sb.ForwardLink) == 0 {
break
}
sb = fb.db.GetByID(sb.ForwardLink[0].To)
start = sb.Hash
}
if sb.Index <= index {
log.Info("No new blocks available")
return nil
}
}
log.Lvl2("Copying blocks to skipchain's DB")
fb.skipchain.GetDB().DB = fb.db.DB

log.Info("Replaying blocks")
_, err = fb.service.ReplayStateContLog(start,
&sumFetcher{summarizeBlocks: c.Int("summarize"), bff: fb.blockFetcher})
rso := byzcoin.ReplayStateOptions{
MaxBlocks: c.Int("blocks"),
VerifyFLSig: c.Bool("verifyFLSig"),
}
if c.Bool("continue") {
rso.StartingTrie = fb.trieDB
}
st, err := fb.service.ReplayState(start, newSumFetcher(c), rso)
if err != nil {
return xerrors.Errorf("couldn't replay blocks: %+v", err)
}
log.Info("Successfully checked and replayed all blocks.")
if c.Bool("write") {
log.Info("Writing new stateTrie to DB")
err := fb.boltDB.Update(func(tx *bbolt.Tx) error {
if tx.Bucket(fb.trieBucketName) != nil {
err := tx.DeleteBucket(fb.trieBucketName)
if err != nil {
return fmt.Errorf("while deleting bucket: %v", err)
}
}
bucket, err := tx.CreateBucket(fb.trieBucketName)
if err != nil {
return fmt.Errorf("while creating bucket: %v", err)
}
return st.View(func(b trie.Bucket) error {
return b.ForEach(func(k, v []byte) error {
return bucket.Put(k, v)
})
})
})
if err != nil {
return fmt.Errorf("couldn't update bucket: %v", err)
}
}

return nil
}

type sumFetcher struct {
summarizeBlocks int
bff byzcoin.BlockFetcherFunc
totalTXs int
accepted int
seenBlocks int
Expand All @@ -156,11 +161,12 @@ type sumFetcher struct {
maxTPS float64
maxBlockSize int
totalBlockSize int
verifyFLSig bool
maxBlocks int
}

func (sf sumFetcher) BlockFetcherFunc(sid skipchain.SkipBlockID) (*skipchain.
SkipBlock, error) {
return sf.bff(sid)
func newSumFetcher(c *cli.Context) *sumFetcher {
return &sumFetcher{summarizeBlocks: c.Int("summarize")}
}

func (sf sumFetcher) LogNewBlock(sb *skipchain.SkipBlock) {
Expand Down Expand Up @@ -370,6 +376,28 @@ func dbRemove(c *cli.Context) error {
return fmt.Errorf("couldn't convert %s to int: %v",
c.Args().Get(2), err)
}
if blocks >= latest.Index {
return errors.New("cannot delete up the genesis block or earlier")
}
}

// Checking the removal of blocks will not lead to an unrecoverable state
// of the node.
tr := trie.NewDiskDB(fb.boltDB, fb.trieBucketName)
err = tr.View(func(b trie.Bucket) error {
buf := b.Get([]byte("trieIndexKey"))
if buf == nil {
return errors.New("couldn't get index key")
}
if latest.Index-blocks <= int(binary.LittleEndian.Uint32(buf)) {
return errors.New("need to keep blocks up to the trie-state." +
"\nUse `bcadmin db replay --write` to replay the db to a" +
" previous state.")
}
return nil
})
if err != nil {
return err
}

for i := 0; i < blocks; i++ {
Expand Down Expand Up @@ -509,6 +537,7 @@ func dbCheck(c *cli.Context) error {
type fetchBlocks struct {
cl *skipchain.Client
service *byzcoin.Service
skipchain *skipchain.Service
bcID *skipchain.SkipBlockID
local *onet.LocalTest
roster *onet.Roster
Expand All @@ -520,8 +549,8 @@ type fetchBlocks struct {
db *skipchain.SkipBlockDB
bucketName []byte
flagCatchupBatch int
flagReplayBlocks int
flagReplayCont bool
trieDB trie.DB
trieBucketName []byte
}

func newFetchBlocks(c *cli.Context) (*fetchBlocks,
Expand All @@ -535,13 +564,12 @@ func newFetchBlocks(c *cli.Context) (*fetchBlocks,
local: onet.NewLocalTest(cothority.Suite),
bucketName: []byte("replayStateBucket"),
flagCatchupBatch: c.Int("batch"),
flagReplayBlocks: c.Int("blocks"),
flagReplayCont: c.Bool("continue"),
}

var err error
servers := fb.local.GenServers(1)
fb.service = servers[0].Service(byzcoin.ServiceName).(*byzcoin.Service)
fb.skipchain = servers[0].Service(skipchain.ServiceName).(*skipchain.Service)

log.Info("Opening database", c.Args().First())
fb.db, fb.boltDB, err = fb.openDB(c.Args().First())
Expand All @@ -562,6 +590,10 @@ func newFetchBlocks(c *cli.Context) (*fetchBlocks,
return nil, xerrors.Errorf("couldn't check bcID: %+v", err)
}
}

fb.trieBucketName = []byte(fmt.Sprintf("ByzCoin_%x", *fb.bcID))
fb.trieDB = trie.NewDiskDB(fb.boltDB, fb.trieBucketName)

return fb, nil
}

Expand Down Expand Up @@ -630,11 +662,6 @@ func (fb *fetchBlocks) addURL(url string) error {
}

func (fb *fetchBlocks) blockFetcher(sib skipchain.SkipBlockID) (*skipchain.SkipBlock, error) {
fb.flagReplayBlocks--
if fb.flagReplayBlocks == 0 {
log.Info("reached end of task")
return nil, nil
}
sb := fb.db.GetByID(sib)
if sb == nil {
return nil, nil
Expand Down
18 changes: 14 additions & 4 deletions byzcoin/bcadmin/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,19 +656,29 @@ var cmds = cli.Commands{
Usage: "Replay a chain and check the global state is consistent",
Action: dbReplay,
Flags: []cli.Flag{
cli.BoolFlag{
Name: "continue, cont",
Usage: "continue an aborted replay",
},
cli.IntFlag{
Name: "blocks",
Usage: "how many blocks to apply",
Value: -1,
},
cli.IntFlag{
Name: "summarize, sum",
Usage: "summarize this many blocks in output",
Value: 1,
},
cli.BoolFlag{
Name: "verifyFLSig",
Usage: "turns on forward-link signature verification",
},
cli.BoolFlag{
Name: "continue",
Usage: "take the existing trie and replay from the" +
" latest block",
},
cli.BoolFlag{
Name: "write",
Usage: "write the new trie to the db",
},
},
},
{
Expand Down
16 changes: 8 additions & 8 deletions byzcoin/bcadmin/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# Options:
# -b re-builds bcadmin package

DBG_TEST=2
DBG_TEST=1
DBG_SRV=1
DBG_BCADMIN=2

Expand Down Expand Up @@ -84,15 +84,15 @@ testDbReplay(){
testOK runBA db catchup conode.db $bcID http://localhost:2003
testGrep "Replaying block at index 0" runBA db replay conode.db $bcID

# replay with more than 1 block
testOK runBA mint $bc $key $keyPub 1000
testOK runBA mint $bc $key $keyPub 1000

# replay with more than 1 block
runBA db catchup conode.db $bcID http://localhost:2003
testNGrep "Replaying block at index 0" runBA db replay conode.db $bcID --cont
testReGrep "Replaying block at index 1"
testGrep "Replaying block at index 0" runBA db replay conode.db $bcID
testOK runBA db replay conode.db $bcID --cont
testGrep "index 0" runBA db catchup conode.db $bcID --write --blocks 1
testReNGrep "index 1"
testNGrep "index 0" runBA db catchup conode.db $bcID --write --continue
testReGrep "index 1"
}

testDbMerge(){
Expand Down Expand Up @@ -322,7 +322,7 @@ testAddDarc(){
testGrep "${ID:5:${#ID}-0}" runBA darc show --darc "$ID"

# checks the --shortPrint option
OUTRES=$(runBA darc add --shortPrint)
OUTRES=$(runBA0 darc add --shortPrint)
matchOK "$OUTRES" "darc:[0-9a-f]{64}
\[ed25519:[0-9a-f]{64}\]"
}
Expand Down Expand Up @@ -478,7 +478,7 @@ runBA(){
}

runBA0(){
dbgRun ./bcadmin -c config/ --debug 0 "$@"
./bcadmin -c config/ --debug 0 "$@"
}

testQR() {
Expand Down
7 changes: 7 additions & 0 deletions byzcoin/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,13 @@ func (c *contractConfig) Invoke(rst ReadOnlyStateTrie, inst Instruction, coins [
if err != nil {
return nil, nil, xerrors.Errorf("decoding: %v", err)
}

_, err := rst.(ReadOnlySkipChain).GetBlockByIndex(rst.GetIndex())
if err != nil {
return nil, nil,
fmt.Errorf("couldn't get latest skipblock: %v", err)
}

if rst.GetVersion() < VersionViewchange {
// If everything is correctly signed, then we trust it, no need
// to do additional verification.
Expand Down
2 changes: 1 addition & 1 deletion byzcoin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2173,7 +2173,7 @@ func (s *Service) createStateChanges(sst *stagingStateTrie, scID skipchain.SkipB
if err != nil {
tx.Accepted = false
txOut = append(txOut, tx)
log.Error(s.ServerIdentity(), err)
log.Warn(s.ServerIdentity(), err)
} else {
// We would like to be able to check if this txn is so big it could never fit into a block,
// and if so, drop it. But we can't with the current API of createStateChanges.
Expand Down
Loading