Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backup] Improved writing and reading of blockchain backup file #297

Merged
merged 3 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 119 additions & 68 deletions archive/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"io"
"os"
"runtime"
"time"

"github.com/dogechain-lab/dogechain/helper/common"
"github.com/dogechain-lab/dogechain/server/proto"
Expand All @@ -20,6 +22,11 @@ import (
"google.golang.org/protobuf/types/known/emptypb"
)

var (
// Error Info
ErrBlockRange = errors.New("block range error: from is greater than or equal to to")
)

// CreateBackup fetches blockchain data with the specific range via gRPC
// and save this data as binary archive to given path
func CreateBackup(
Expand All @@ -36,6 +43,37 @@ func CreateBackup(
resTo = 0
err = nil

signalCh := common.GetTerminationSignalCh()
ctx, cancelFn := context.WithCancel(context.Background())

defer cancelFn()

go func() {
<-signalCh
logger.Info("Caught termination signal, shutting down...")
cancelFn()
}()

clt := proto.NewSystemClient(conn)

var reqTo uint64

var reqToHash types.Hash

reqTo, reqToHash, err = determineTo(ctx, clt, to)
if err != nil {
return
}

if from <= reqTo {
logger.Info("Exporting blocks", "from", from, "to", reqTo)
} else {
err = ErrBlockRange

return
}

// open write file
// allow to overwrite the overwrites file only if it's explicitly set
fileFlag := os.O_WRONLY | os.O_CREATE | os.O_EXCL
if overwriteFile {
Expand Down Expand Up @@ -87,43 +125,12 @@ func CreateBackup(
writeBuf = fbuf
}

signalCh := common.GetTerminationSignalCh()
ctx, cancelFn := context.WithCancel(context.Background())

defer cancelFn()

go func() {
<-signalCh
logger.Info("Caught termination signal, shutting down...")
cancelFn()
}()

clt := proto.NewSystemClient(conn)

var reqTo uint64

var reqToHash types.Hash

reqTo, reqToHash, err = determineTo(ctx, clt, to)
if err != nil {
return
}

var stream proto.System_ExportClient

stream, err = clt.Export(ctx, &proto.ExportRequest{
From: from,
To: reqTo,
})
if err != nil {
return
}

// write metadata
if err = writeMetadata(writeBuf, logger, reqTo, reqToHash); err != nil {
return
}

resFrom, resTo, err = processExportStream(stream, logger, writeBuf, from, reqTo)
resFrom, resTo, err = processExport(ctx, clt, logger, writeBuf, from, reqTo)
if err != nil {
return
}
Expand Down Expand Up @@ -171,59 +178,103 @@ func writeMetadata(writer io.Writer, logger hclog.Logger, to uint64, toHash type
return err
}

func processExportStream(
stream proto.System_ExportClient,
func processExport(
ctx context.Context,
clt proto.SystemClient,
logger hclog.Logger,
writer io.Writer,
targetFrom, targetTo uint64,
) (uint64, uint64, error) {
var from, to, total uint64 = 0, 0, 0
var from, to, current, total uint64 = targetFrom, targetTo, targetFrom, 0

showProgress := func(event *proto.ExportEvent) {
num := event.To - event.From
total += num
expectedTo := targetTo

if targetTo == 0 {
expectedTo = event.Latest
}
if (targetTo - targetFrom + 1) == 0 {
return 0, 0, ErrBlockRange
}

expectedTotal := expectedTo - targetFrom
progress := 100 * (float64(event.To) - float64(targetFrom)) / float64(expectedTotal)
showProgress := func(block *types.Block) {
current = block.Header.Number
total += 1

progress := 100 * (float64(total) / float64(targetTo-targetFrom+1))
logger.Info(
fmt.Sprintf("%d blocks are written", num),
"total", total,
"from", targetFrom,
"to", expectedTo,
fmt.Sprintf("%d blocks are written", total),
"from", from,
"to", to,
"height", current,
"progress", fmt.Sprintf("%.2f%%", progress),
)
}

firstBlok := true
maxFetchBlockNum := runtime.NumCPU() * 2

for {
event, err := stream.Recv()
if errors.Is(io.EOF, err) || status.Code(err) == codes.Canceled {
return from, to, nil
}
// fetch blocks from channel
// block data channel: [from, to]
blockCh := make(chan *types.Block, maxFetchBlockNum)

if err != nil {
return from, to, err
}
go func(from uint64, to uint64) {
defer func() {
blockCh <- nil
}()

// tips: writer.Write() not necessarily write all data, use io.Copy() instead
if _, err := io.Copy(writer, bytes.NewBuffer(event.Data)); err != nil {
return from, to, err
}
for num := from; num <= to; num++ {
for {
select {
case <-ctx.Done():
return
default:
}

resp, err := clt.BlockByNumber(context.Background(), &proto.BlockByNumberRequest{Number: num})
if status.Code(err) == codes.Canceled {
return
}

if err != nil {
logger.Error("Failed to fetch block", "Number", num, "err", err)
time.Sleep(1 * time.Second)

continue
DarianShawn marked this conversation as resolved.
Show resolved Hide resolved
}

blk := &types.Block{}

err = blk.UnmarshalRLP(resp.Data)
if err != nil {
logger.Error("Failed to unmarshal block", "Number", num, "err", err)

if firstBlok {
from = event.From
firstBlok = false
return
}

if blk.Number() != num {
logger.Error("Fetch block number is wrong", "Number", num, "err", err)

return
}

blockCh <- blk

break
}
}
}(from, to)

to = event.To
// write blocks to writer
for {
select {
case <-ctx.Done():
return from, current, nil
case block := <-blockCh:
// block == nil means all blocks are fetched
if block == nil {
return from, current, nil
}

showProgress(event)
// tips: writer.Write() does not necessarily write all data, use io.Copy() instead
if _, err := io.Copy(writer, bytes.NewBuffer(block.MarshalRLP())); err != nil {
return from, current, err
}

showProgress(block)
}
}
}
Loading