Skip to content

Commit

Permalink
improve concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
yihuang committed Dec 23, 2022
1 parent 1062836 commit 7596dc9
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions cmd/cronosd/cmd/changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func DumpFileChangeSetCmd() *cobra.Command {
prefix := []byte(fmt.Sprintf("s/k:%s/", args[0]))
db = dbm.NewPrefixDB(db, prefix)

cacheSize := cast.ToInt(ctx.Viper.Get("iavl-cache-size"))
cacheSize := cast.ToInt(ctx.Viper.Get(server.FlagIAVLCacheSize))

startVersion, err := cmd.Flags().GetInt(flagStartVersion)
if err != nil {
Expand Down Expand Up @@ -99,16 +99,16 @@ func DumpFileChangeSetCmd() *cobra.Command {
tmpDir = filepath.Dir(output)
}

tree, err := iavl.NewMutableTree(db, cacheSize, true)
if err != nil {
return err
}
if endVersion == 0 {
tree, err := iavl.NewMutableTree(db, 0, true)
if err != nil {
return err
}
latestVersion, err := tree.LazyLoadVersion(0)
if err != nil {
return err
}
endVersion = int(latestVersion)
endVersion = int(latestVersion) + 1
}

works := splitWorkLoad(concurrency, Range{Start: startVersion, End: endVersion})
Expand All @@ -119,7 +119,9 @@ func DumpFileChangeSetCmd() *cobra.Command {
go func(i int) {
defer close(chs[i])
work := works[i]
tmpFile, err := dumpRangeBlocksWorker(tmpDir, tree.ImmutableTree, int64(work.Start), int64(work.End))

tree := iavl.NewImmutableTree(db, cacheSize, true)
tmpFile, err := dumpRangeBlocksWorker(tmpDir, tree, int64(work.Start), int64(work.End))
if err != nil {
fmt.Fprintf(os.Stderr, "worker failed: start: %d, end: %d, err: %e", work.Start, work.End, err)
return
Expand Down Expand Up @@ -177,7 +179,7 @@ func DumpSSTChangeSetCmd() *cobra.Command {
prefix := []byte(fmt.Sprintf("s/k:%s/", args[0]))
db = dbm.NewPrefixDB(db, prefix)

cacheSize := cast.ToInt(ctx.Viper.Get("iavl-cache-size"))
cacheSize := cast.ToInt(ctx.Viper.Get(server.FlagIAVLCacheSize))

startVersion, err := cmd.Flags().GetInt(flagStartVersion)
if err != nil {
Expand Down Expand Up @@ -266,7 +268,7 @@ func ConvertPlainToSSTCmd() *cobra.Command {
})
if err == io.ErrUnexpectedEOF {
// incomplete end of file, we'll output a warning and process the completed versions.
fmt.Fprintln(os.Stderr, "file incomplete, the completed versions are processed, the last completed file offset: %d\n", offset)
fmt.Fprintf(os.Stderr, "file incomplete, the completed versions are processed, the last completed file offset: %d\n", offset)
} else if err != nil {
return err
}
Expand Down Expand Up @@ -309,7 +311,7 @@ func PrintPlainFileCmd() *cobra.Command {
})
if err == io.ErrUnexpectedEOF {
// incomplete end of file, we'll output a warning and process the completed versions.
fmt.Fprintln(os.Stderr, "file incomplete, the completed versions are processed, the last completed file offset: %d\n", offset)
fmt.Fprintf(os.Stderr, "file incomplete, the completed versions are processed, the last completed file offset: %d\n", offset)
} else if err != nil {
return err
}
Expand Down

0 comments on commit 7596dc9

Please sign in to comment.