Skip to content

Commit

Permalink
This PR does the following for 'bcadmin db replay':
Browse files Browse the repository at this point in the history
- clean up the different backward-compatible but used nowhere extensions of `StateReplay`
- add all blocks to the skipchain.db, so that contracts using 'ReadOnlySkipchain' work
- by default doesn't verify the forward link signature of blocks, as this is very costly,
and can be supposed to be correct if its in 'cached.db'
- use a memory state trie instead of a disk state trie when replaying -> much faster
- can discard, write, resume blocks

This makes replaying the current chain having 50k blocks possible in about 30' on a fast machine.
  • Loading branch information
ineiti committed May 21, 2020
1 parent d097fd5 commit 13469e1
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 309 deletions.
2 changes: 1 addition & 1 deletion byzcoin/bcadmin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ and return success:

```bash
bcadmin db catchup cached.db _bcID_ _url_
bcadmin db replay cached.db _bcID_ --continue
bcadmin db replay cached.db _bcID_
```

The `_bcID_` has to be replaced by the hexadecimal representation of the
Expand Down
94 changes: 49 additions & 45 deletions byzcoin/bcadmin/cmd_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"go.dedis.ch/cothority/v3/byzcoin/trie"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -90,12 +91,10 @@ func dbReplay(c *cli.Context) error {
log.Info("Preparing db")
start := *fb.bcID
err = fb.boltDB.Update(func(tx *bbolt.Tx) error {
if !fb.flagReplayCont {
if tx.Bucket(fb.bucketName) != nil {
err := tx.DeleteBucket(fb.bucketName)
if err != nil {
return err
}
if tx.Bucket(fb.bucketName) != nil {
err := tx.DeleteBucket(fb.bucketName)
if err != nil {
return err
}
}
_, err := tx.CreateBucketIfNotExists(fb.bucketName)
Expand All @@ -108,46 +107,57 @@ func dbReplay(c *cli.Context) error {
return xerrors.Errorf("couldn't add bucket: %+v", err)
}

if !fb.flagReplayCont {
_, err = fb.service.ReplayStateDB(fb.boltDB, fb.bucketName, fb.genesis)
if err != nil {
return xerrors.Errorf("couldn't create stateDB: %+v", err)
}
} else {
index, err := fb.service.ReplayStateDB(fb.boltDB, fb.bucketName, nil)
if err != nil {
return xerrors.Errorf("couldn't replay blocks: %+v", err)
}
log.Lvl2("Copying blocks to skipchain's DB")
fb.skipchain.GetDB().DB = fb.db.DB

log.Info("Searching for block with index", index+1)
sb := fb.db.GetByID(start)
for sb != nil && sb.Index < index+1 {
if len(sb.ForwardLink) == 0 {
break
}
sb = fb.db.GetByID(sb.ForwardLink[0].To)
start = sb.Hash
}
if sb.Index <= index {
log.Info("No new blocks available")
return nil
log.Info("Replaying blocks")
var dbSt trie.DB
var dbID skipchain.SkipBlockID
rso := byzcoin.ReplayStateOptions{
MaxBlocks: c.Int("blocks"),
VerifyFLSig: c.Bool("verifyFLSig"),
}
if c.Bool("continue") || c.Bool("write") {
dbID = []byte(fmt.Sprintf("ByzCoin_%x", *fb.bcID))
dbSt = trie.NewDiskDB(fb.boltDB, dbID)
if c.Bool("continue") {
rso.StartingTrie = dbSt
}
}

log.Info("Replaying blocks")
_, err = fb.service.ReplayStateContLog(start,
&sumFetcher{summarizeBlocks: c.Int("summarize"), bff: fb.blockFetcher})
st, err := fb.service.ReplayState(start, newSumFetcher(c), rso)
if err != nil {
return xerrors.Errorf("couldn't replay blocks: %+v", err)
}
log.Info("Successfully checked and replayed all blocks.")
if c.Bool("write") {
log.Info("Writing new stateTrie to DB")
err := fb.boltDB.Update(func(tx *bbolt.Tx) error {
if tx.Bucket(dbID) != nil {
err := tx.DeleteBucket(dbID)
if err != nil {
return fmt.Errorf("while deleting bucket: %v", err)
}
}
bucket, err := tx.CreateBucket(dbID)
if err != nil {
return fmt.Errorf("while creating bucket: %v", err)
}
return st.View(func(b trie.Bucket) error {
return b.ForEach(func(k, v []byte) error {
return bucket.Put(k, v)
})
})
})
if err != nil {
return fmt.Errorf("couldn't update bucket: %v", err)
}
}

return nil
}

type sumFetcher struct {
summarizeBlocks int
bff byzcoin.BlockFetcherFunc
totalTXs int
accepted int
seenBlocks int
Expand All @@ -156,11 +166,12 @@ type sumFetcher struct {
maxTPS float64
maxBlockSize int
totalBlockSize int
verifyFLSig bool
maxBlocks int
}

func (sf sumFetcher) BlockFetcherFunc(sid skipchain.SkipBlockID) (*skipchain.
SkipBlock, error) {
return sf.bff(sid)
func newSumFetcher(c *cli.Context) *sumFetcher {
return &sumFetcher{summarizeBlocks: c.Int("summarize")}
}

func (sf sumFetcher) LogNewBlock(sb *skipchain.SkipBlock) {
Expand Down Expand Up @@ -509,6 +520,7 @@ func dbCheck(c *cli.Context) error {
type fetchBlocks struct {
cl *skipchain.Client
service *byzcoin.Service
skipchain *skipchain.Service
bcID *skipchain.SkipBlockID
local *onet.LocalTest
roster *onet.Roster
Expand All @@ -520,8 +532,6 @@ type fetchBlocks struct {
db *skipchain.SkipBlockDB
bucketName []byte
flagCatchupBatch int
flagReplayBlocks int
flagReplayCont bool
}

func newFetchBlocks(c *cli.Context) (*fetchBlocks,
Expand All @@ -535,13 +545,12 @@ func newFetchBlocks(c *cli.Context) (*fetchBlocks,
local: onet.NewLocalTest(cothority.Suite),
bucketName: []byte("replayStateBucket"),
flagCatchupBatch: c.Int("batch"),
flagReplayBlocks: c.Int("blocks"),
flagReplayCont: c.Bool("continue"),
}

var err error
servers := fb.local.GenServers(1)
fb.service = servers[0].Service(byzcoin.ServiceName).(*byzcoin.Service)
fb.skipchain = servers[0].Service(skipchain.ServiceName).(*skipchain.Service)

log.Info("Opening database", c.Args().First())
fb.db, fb.boltDB, err = fb.openDB(c.Args().First())
Expand Down Expand Up @@ -630,11 +639,6 @@ func (fb *fetchBlocks) addURL(url string) error {
}

func (fb *fetchBlocks) blockFetcher(sib skipchain.SkipBlockID) (*skipchain.SkipBlock, error) {
fb.flagReplayBlocks--
if fb.flagReplayBlocks == 0 {
log.Info("reached end of task")
return nil, nil
}
sb := fb.db.GetByID(sib)
if sb == nil {
return nil, nil
Expand Down
18 changes: 14 additions & 4 deletions byzcoin/bcadmin/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,19 +656,29 @@ var cmds = cli.Commands{
Usage: "Replay a chain and check the global state is consistent",
Action: dbReplay,
Flags: []cli.Flag{
cli.BoolFlag{
Name: "continue, cont",
Usage: "continue an aborted replay",
},
cli.IntFlag{
Name: "blocks",
Usage: "how many blocks to apply",
Value: -1,
},
cli.IntFlag{
Name: "summarize, sum",
Usage: "summarize this many blocks in output",
Value: 1,
},
cli.BoolFlag{
Name: "verifyFLSig",
Usage: "turns on forward-link signature verification",
},
cli.BoolFlag{
Name: "continue",
Usage: "take the existing trie and replay from the" +
" latest block",
},
cli.BoolFlag{
Name: "write",
Usage: "write the new trie to the db",
},
},
},
{
Expand Down
16 changes: 8 additions & 8 deletions byzcoin/bcadmin/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# Options:
# -b re-builds bcadmin package

DBG_TEST=2
DBG_TEST=1
DBG_SRV=1
DBG_BCADMIN=2

Expand Down Expand Up @@ -84,15 +84,15 @@ testDbReplay(){
testOK runBA db catchup conode.db $bcID http://localhost:2003
testGrep "Replaying block at index 0" runBA db replay conode.db $bcID

# replay with more than 1 block
testOK runBA mint $bc $key $keyPub 1000
testOK runBA mint $bc $key $keyPub 1000

# replay with more than 1 block
runBA db catchup conode.db $bcID http://localhost:2003
testNGrep "Replaying block at index 0" runBA db replay conode.db $bcID --cont
testReGrep "Replaying block at index 1"
testGrep "Replaying block at index 0" runBA db replay conode.db $bcID
testOK runBA db replay conode.db $bcID --cont
testGrep "index 0" runBA db catchup conode.db $bcID --write --blocks 1
testReNGrep "index 1"
testNGrep "index 0" runBA db catchup conode.db $bcID --write --continue
testReGrep "index 1"
}

testDbMerge(){
Expand Down Expand Up @@ -322,7 +322,7 @@ testAddDarc(){
testGrep "${ID:5:${#ID}-0}" runBA darc show --darc "$ID"

# checks the --shortPrint option
OUTRES=$(runBA darc add --shortPrint)
OUTRES=$(runBA0 darc add --shortPrint)
matchOK "$OUTRES" "darc:[0-9a-f]{64}
\[ed25519:[0-9a-f]{64}\]"
}
Expand Down Expand Up @@ -478,7 +478,7 @@ runBA(){
}

runBA0(){
dbgRun ./bcadmin -c config/ --debug 0 "$@"
./bcadmin -c config/ --debug 0 "$@"
}

testQR() {
Expand Down
7 changes: 7 additions & 0 deletions byzcoin/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,13 @@ func (c *contractConfig) Invoke(rst ReadOnlyStateTrie, inst Instruction, coins [
if err != nil {
return nil, nil, xerrors.Errorf("decoding: %v", err)
}

_, err := rst.(ReadOnlySkipChain).GetBlockByIndex(rst.GetIndex())
if err != nil {
return nil, nil,
fmt.Errorf("couldn't get latest skipblock: %v", err)
}

if rst.GetVersion() < VersionViewchange {
// If everything is correctly signed, then we trust it, no need
// to do additional verification.
Expand Down
2 changes: 1 addition & 1 deletion byzcoin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2173,7 +2173,7 @@ func (s *Service) createStateChanges(sst *stagingStateTrie, scID skipchain.SkipB
if err != nil {
tx.Accepted = false
txOut = append(txOut, tx)
log.Error(s.ServerIdentity(), err)
log.Warn(s.ServerIdentity(), err)
} else {
// We would like to be able to check if this txn is so big it could never fit into a block,
// and if so, drop it. But we can't with the current API of createStateChanges.
Expand Down
Loading

0 comments on commit 13469e1

Please sign in to comment.