From 108bbc2d342fcc33c96d1e3232f576616957ebfe Mon Sep 17 00:00:00 2001 From: 0xcb9ff9 <0xcb9ff9@proton.me> Date: Tue, 7 Feb 2023 11:49:41 +0800 Subject: [PATCH 1/3] backup use channel pipline --- archive/backup.go | 188 +++++++++++++++++++++++++-------------- archive/backup_test.go | 187 +++++++++++++++++++------------------- command/backup/params.go | 1 + 3 files changed, 218 insertions(+), 158 deletions(-) diff --git a/archive/backup.go b/archive/backup.go index 2584dec5a0..367be25203 100644 --- a/archive/backup.go +++ b/archive/backup.go @@ -8,6 +8,8 @@ import ( "fmt" "io" "os" + "runtime" + "time" "github.com/dogechain-lab/dogechain/helper/common" "github.com/dogechain-lab/dogechain/server/proto" @@ -20,6 +22,11 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) +var ( + // Error Info + ErrBlockRange = errors.New("block range error: from is greater than or equal to to") +) + // CreateBackup fetches blockchain data with the specific range via gRPC // and save this data as binary archive to given path func CreateBackup( @@ -36,6 +43,37 @@ func CreateBackup( resTo = 0 err = nil + signalCh := common.GetTerminationSignalCh() + ctx, cancelFn := context.WithCancel(context.Background()) + + defer cancelFn() + + go func() { + <-signalCh + logger.Info("Caught termination signal, shutting down...") + cancelFn() + }() + + clt := proto.NewSystemClient(conn) + + var reqTo uint64 + + var reqToHash types.Hash + + reqTo, reqToHash, err = determineTo(ctx, clt, to) + if err != nil { + return + } + + if from <= reqTo { + logger.Info("Exporting blocks", "from", from, "to", reqTo) + } else { + err = ErrBlockRange + + return + } + + // open write file // allow to overwrite the overwrites file only if it's explicitly set fileFlag := os.O_WRONLY | os.O_CREATE | os.O_EXCL if overwriteFile { @@ -87,43 +125,12 @@ func CreateBackup( writeBuf = fbuf } - signalCh := common.GetTerminationSignalCh() - ctx, cancelFn := context.WithCancel(context.Background()) - - defer cancelFn() - - go func() { - <-signalCh - logger.Info("Caught termination signal, shutting down...") - cancelFn() - }() - - clt := proto.NewSystemClient(conn) - - var reqTo uint64 - - var reqToHash types.Hash - - reqTo, reqToHash, err = determineTo(ctx, clt, to) - if err != nil { - return - } - - var stream proto.System_ExportClient - - stream, err = clt.Export(ctx, &proto.ExportRequest{ - From: from, - To: reqTo, - }) - if err != nil { - return - } - + // write metadata if err = writeMetadata(writeBuf, logger, reqTo, reqToHash); err != nil { return } - resFrom, resTo, err = processExportStream(stream, logger, writeBuf, from, reqTo) + resFrom, resTo, err = processExport(ctx, clt, logger, writeBuf, from, reqTo) if err != nil { return } @@ -171,59 +178,104 @@ func writeMetadata(writer io.Writer, logger hclog.Logger, to uint64, toHash type return err } -func processExportStream( - stream proto.System_ExportClient, +func processExport( + ctx context.Context, + clt proto.SystemClient, logger hclog.Logger, writer io.Writer, targetFrom, targetTo uint64, ) (uint64, uint64, error) { - var from, to, total uint64 = 0, 0, 0 + var from, to, current, total uint64 = targetFrom, targetTo, targetFrom, 0 - showProgress := func(event *proto.ExportEvent) { - num := event.To - event.From - total += num - expectedTo := targetTo - - if targetTo == 0 { - expectedTo = event.Latest - } + if (targetTo - targetFrom + 1) == 0 { + return 0, 0, ErrBlockRange + } - expectedTotal := expectedTo - targetFrom - progress := 100 * (float64(event.To) - float64(targetFrom)) / float64(expectedTotal) + showProgress := func(block *types.Block) { + current = block.Header.Number + total += 1 + progress := 100 * (float64(total) / float64(targetTo-targetFrom+1)) logger.Info( - fmt.Sprintf("%d blocks are written", num), - "total", total, - "from", targetFrom, - "to", expectedTo, + fmt.Sprintf("%d blocks are written", total), + "from", from, + "to", to, + "height", current, "progress", fmt.Sprintf("%.2f%%", progress), ) } - firstBlok := true + maxFetchBlockNum := runtime.NumCPU() * 2 - for { - event, err := stream.Recv() - if errors.Is(io.EOF, err) || status.Code(err) == codes.Canceled { - return from, to, nil - } + // fetch blocks from channel + // block data channel: [from, to] + blockCh := make(chan *types.Block, maxFetchBlockNum) - if err != nil { - return from, to, err - } + go func(from uint64, to uint64) { + defer func() { + blockCh <- nil + }() - // tips: writer.Write() not necessarily write all data, use io.Copy() instead - if _, err := io.Copy(writer, bytes.NewBuffer(event.Data)); err != nil { - return from, to, err - } + for num := from; num <= to; num++ { + for { + select { + case <-ctx.Done(): + return + default: + } + + resp, err := clt.BlockByNumber(context.Background(), &proto.BlockByNumberRequest{Number: num}) + if status.Code(err) == codes.Canceled { + return + } + + if err != nil { + logger.Error("Failed to fetch block", "Number", num, "err", err) + time.Sleep(1 * time.Second) + + continue + } + + blk := &types.Block{} + + err = blk.UnmarshalRLP(resp.Data) + if err != nil { + logger.Error("Failed to unmarshal block", "Number", num, "err", err) + time.Sleep(1 * time.Second) - if firstBlok { - from = event.From - firstBlok = false + continue + } + + if blk.Number() != num { + logger.Error("Fetch block number is wrong", "Number", num, "err", err) + + return + } + + blockCh <- blk + + break + } } + }(from, to) - to = event.To + // write blocks to writer + for { + select { + case <-ctx.Done(): + return from, current, nil + case block := <-blockCh: + // block == nil means all blocks are fetched + if block == nil { + return from, current, nil + } - showProgress(event) + // tips: writer.Write() not necessarily write all data, use io.Copy() instead + if _, err := io.Copy(writer, bytes.NewBuffer(block.MarshalRLP())); err != nil { + return from, current, err + } + + showProgress(block) + } } } diff --git a/archive/backup_test.go b/archive/backup_test.go index d875e67b36..219d30e95a 100644 --- a/archive/backup_test.go +++ b/archive/backup_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "errors" - "io" "testing" "github.com/dogechain-lab/dogechain/server/proto" @@ -15,28 +14,6 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) -type recvData struct { - event *proto.ExportEvent - err error -} - -type mockSystemExportClient struct { - proto.System_ExportClient - recvs []recvData - cur int -} - -func (m *mockSystemExportClient) Recv() (*proto.ExportEvent, error) { - if m.cur >= len(m.recvs) { - return nil, io.EOF - } - - recv := m.recvs[m.cur] - m.cur++ - - return recv.event, recv.err -} - var ( genesis = &types.Block{ Header: &types.Header{ @@ -45,6 +22,7 @@ var ( }, } blocks = []*types.Block{ + genesis, { Header: &types.Header{ Number: 1, @@ -75,7 +53,7 @@ type systemClientMock struct { proto.SystemClient status *proto.ServerStatus errForStatus error - block *proto.BlockResponse + blocks []*types.Block errForBlock error } @@ -84,10 +62,20 @@ func (m *systemClientMock) GetStatus(context.Context, *emptypb.Empty, ...grpc.Ca } func (m *systemClientMock) BlockByNumber( - context.Context, - *proto.BlockByNumberRequest, ...grpc.CallOption, + _ctx context.Context, + req *proto.BlockByNumberRequest, + _opts ...grpc.CallOption, ) (*proto.BlockResponse, error) { - return m.block, m.errForBlock + // find block by request number + for _, b := range m.blocks { + if b.Header.Number == req.Number { + return &proto.BlockResponse{ + Data: b.MarshalRLP(), + }, m.errForBlock + } + } + + return nil, m.errForBlock } func Test_determineTo(t *testing.T) { @@ -116,9 +104,7 @@ func Test_determineTo(t *testing.T) { Number: 10, }, }, - block: &proto.BlockResponse{ - Data: blocks[1].MarshalRLP(), - }, + blocks: blocks, }, resTo: 2, resToHash: blocks[1].Hash(), @@ -135,6 +121,7 @@ func Test_determineTo(t *testing.T) { Hash: blocks[1].Hash().String(), }, }, + blocks: blocks, }, resTo: 1, resToHash: blocks[1].Hash(), @@ -167,87 +154,108 @@ func Test_determineTo(t *testing.T) { func Test_processExportStream(t *testing.T) { tests := []struct { - name string - mockSystemExportClient *mockSystemExportClient + name string + systemClientMock proto.SystemClient // result - from uint64 - to uint64 - err error + from uint64 + to uint64 + err error + recvs []*proto.BlockResponse }{ { - name: "should be succeed with event", - mockSystemExportClient: &mockSystemExportClient{ - recvs: []recvData{ - { - event: &proto.ExportEvent{ - From: 1, - To: 2, - Data: append(blocks[0].MarshalRLP(), blocks[1].MarshalRLP()...), - }, + name: "should be succeed with single block", + systemClientMock: &systemClientMock{ + status: &proto.ServerStatus{ + Current: &proto.ServerStatus_Block{ + // greater than targetTo + Number: int64(blocks[2].Number()), }, }, + blocks: blocks, }, - from: 1, - to: 2, + from: blocks[1].Number(), + to: blocks[1].Number(), err: nil, + recvs: []*proto.BlockResponse{ + { + Data: blocks[1].MarshalRLP(), + }, + }, }, { - name: "should succeed with multiple events", - mockSystemExportClient: &mockSystemExportClient{ - recvs: []recvData{ - { - event: &proto.ExportEvent{ - From: 1, - To: 2, - Data: append(blocks[0].MarshalRLP(), blocks[1].MarshalRLP()...), - }, - }, - { - event: &proto.ExportEvent{ - From: 3, - To: 3, - Data: blocks[2].MarshalRLP(), - }, + name: "should be succeed multiple blocks", + systemClientMock: &systemClientMock{ + status: &proto.ServerStatus{ + Current: &proto.ServerStatus_Block{ + // greater than targetTo + Number: int64(blocks[2].Number()), }, }, + blocks: blocks, }, - from: 1, - to: 3, + from: blocks[0].Number(), + to: blocks[2].Number(), err: nil, + recvs: []*proto.BlockResponse{ + { + Data: blocks[0].MarshalRLP(), + }, + { + Data: blocks[1].MarshalRLP(), + }, + { + Data: blocks[2].MarshalRLP(), + }, + }, }, { - name: "should fail when received error", - mockSystemExportClient: &mockSystemExportClient{ - recvs: []recvData{ - { - event: &proto.ExportEvent{ - From: 1, - To: 2, - Data: append(blocks[0].MarshalRLP(), blocks[1].MarshalRLP()...), - }, - }, - { - err: errors.New("failed to send"), + name: "should be succeed select range of 2-3 blocks", + systemClientMock: &systemClientMock{ + status: &proto.ServerStatus{ + Current: &proto.ServerStatus_Block{ + // greater than targetTo + Number: int64(blocks[2].Number()), }, - { - event: &proto.ExportEvent{ - From: 3, - To: 3, - Data: blocks[2].MarshalRLP(), - }, + }, + blocks: blocks, + }, + from: blocks[1].Number(), + to: blocks[2].Number(), + err: nil, + recvs: []*proto.BlockResponse{ + { + Data: blocks[1].MarshalRLP(), + }, + { + Data: blocks[2].MarshalRLP(), + }, + }, + }, + { + name: "should be failed with error range", + systemClientMock: &systemClientMock{ + status: &proto.ServerStatus{ + Current: &proto.ServerStatus_Block{ + // greater than targetTo + Number: int64(blocks[2].Number()), }, }, + blocks: blocks, }, - from: 0, - to: 0, - err: errors.New("failed to send"), + from: blocks[2].Number(), + to: blocks[1].Number(), + err: ErrBlockRange, + recvs: []*proto.BlockResponse{}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var buffer bytes.Buffer - from, to, err := processExportStream(tt.mockSystemExportClient, hclog.NewNullLogger(), &buffer, 0, 0) + var ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + from, to, err := processExport(ctx, tt.systemClientMock, hclog.NewNullLogger(), &buffer, tt.from, tt.to) assert.Equal(t, tt.err, err) if err != nil { @@ -259,11 +267,10 @@ func Test_processExportStream(t *testing.T) { // create expected data expectedData := make([]byte, 0) - for _, rv := range tt.mockSystemExportClient.recvs { - if rv.err != nil { - break + for _, rv := range tt.recvs { + if rv != nil { + expectedData = append(expectedData, rv.Data...) } - expectedData = append(expectedData, rv.event.Data...) } assert.Equal(t, expectedData, buffer.Bytes()) }) diff --git a/command/backup/params.go b/command/backup/params.go index b3dac4fa9e..9717d3ab6c 100644 --- a/command/backup/params.go +++ b/command/backup/params.go @@ -88,6 +88,7 @@ func (p *backupParams) createBackup(grpcAddress string) error { if err != nil { return err } + defer connection.Close() // resFrom and resTo represents the range of blocks that can be included in the file resFrom, resTo, err := archive.CreateBackup( From 6472f6cb47dd54c40dec6caa6cc246121f152202 Mon Sep 17 00:00:00 2001 From: 0xcb9ff9 <0xcb9ff9@proton.me> Date: Tue, 7 Feb 2023 12:03:43 +0800 Subject: [PATCH 2/3] fix test, quickly skip the block --- archive/backup_test.go | 12 +-- archive/restore.go | 154 +++++++++++++++++++++------------------ archive/restore_test.go | 92 +++-------------------- command/backup/params.go | 2 +- server/server.go | 2 +- 5 files changed, 104 insertions(+), 158 deletions(-) diff --git a/archive/backup_test.go b/archive/backup_test.go index 219d30e95a..8710b2b28d 100644 --- a/archive/backup_test.go +++ b/archive/backup_test.go @@ -96,7 +96,7 @@ func Test_determineTo(t *testing.T) { }{ { name: "should return expected 'to'", - targetTo: toPtr(2), + targetTo: toPtr(blocks[2].Number()), systemClientMock: &systemClientMock{ status: &proto.ServerStatus{ Current: &proto.ServerStatus_Block{ @@ -106,24 +106,24 @@ func Test_determineTo(t *testing.T) { }, blocks: blocks, }, - resTo: 2, - resToHash: blocks[1].Hash(), + resTo: blocks[2].Number(), + resToHash: blocks[2].Hash(), err: nil, }, { name: "should return latest if target to is greater than the latest in node", - targetTo: toPtr(2), + targetTo: toPtr(blocks[2].Number()), systemClientMock: &systemClientMock{ status: &proto.ServerStatus{ Current: &proto.ServerStatus_Block{ // less than targetTo - Number: 1, + Number: int64(blocks[1].Number()), Hash: blocks[1].Hash().String(), }, }, blocks: blocks, }, - resTo: 1, + resTo: blocks[1].Number(), resToHash: blocks[1].Hash(), err: nil, }, diff --git a/archive/restore.go b/archive/restore.go index f6ab777ba2..a4a96672bc 100644 --- a/archive/restore.go +++ b/archive/restore.go @@ -8,11 +8,11 @@ import ( "io" "math/big" "os" + "runtime" - "github.com/dogechain-lab/dogechain/blockchain" "github.com/dogechain-lab/dogechain/helper/common" - "github.com/dogechain-lab/dogechain/helper/progress" "github.com/dogechain-lab/dogechain/types" + "github.com/hashicorp/go-hclog" "github.com/klauspost/compress/zstd" ) @@ -23,8 +23,8 @@ const ( var zstdMagic = []byte{0x28, 0xb5, 0x2f, 0xfd} // zstd Magic number type blockchainInterface interface { - SubscribeEvents() blockchain.Subscription Genesis() types.Hash + GetHeaderNumber() (uint64, bool) GetBlockByNumber(uint64, bool) (*types.Block, bool) GetHashByNumber(uint64) types.Hash WriteBlock(block *types.Block, source string) error @@ -32,14 +32,14 @@ type blockchainInterface interface { } // RestoreChain reads blocks from the archive and write to the chain -func RestoreChain(chain blockchainInterface, filePath string, progression *progress.ProgressionWrapper) error { +func RestoreChain(log hclog.Logger, chain blockchainInterface, filePath string) error { fp, err := os.OpenFile(filePath, os.O_RDONLY, 0) if err != nil { return err } defer fp.Close() - fbuf := bufio.NewReaderSize(fp, 1*1024*1024) // 1MB buffer + fbuf := bufio.NewReaderSize(fp, 8*1024*1024) // 8MB buffer // check whether the file is compressed fileMagic, err := fbuf.Peek(len(zstdMagic)) @@ -56,6 +56,8 @@ func RestoreChain(chain blockchainInterface, filePath string, progression *progr } defer zstdReader.Close() + log.Info("archive is compressed with zstd") + readBuf = zstdReader } else { readBuf = fbuf @@ -63,11 +65,11 @@ func RestoreChain(chain blockchainInterface, filePath string, progression *progr blockStream := newBlockStream(readBuf) - return importBlocks(chain, blockStream, progression) + return importBlocks(log, chain, blockStream) } // import blocks scans all blocks from stream and write them to chain -func importBlocks(chain blockchainInterface, blockStream *blockStream, progression *progress.ProgressionWrapper) error { +func importBlocks(log hclog.Logger, chain blockchainInterface, blockStream *blockStream) error { shutdownCh := common.GetTerminationSignalCh() metadata, err := blockStream.getMetadata() @@ -85,95 +87,105 @@ func importBlocks(chain blockchainInterface, blockStream *blockStream, progressi return nil } - // skip existing blocks - firstBlock, err := consumeCommonBlocks(chain, blockStream, shutdownCh) - if err != nil { - return err - } - - if firstBlock == nil { - return nil - } - - // Create a blockchain subscription for the sync progression and start tracking - progression.StartProgression("", firstBlock.Number(), chain.SubscribeEvents()) - // Stop monitoring the sync progression upon exit - defer progression.StopProgression() + maxFetchBlockNum := runtime.NumCPU() * 2 - // Set the goal - progression.UpdateHighestProgression(metadata.Latest) + // create channel to next block stream + nextBlockCh := make(chan *types.Block, maxFetchBlockNum) - nextBlock := firstBlock + storeLatestBlkNumber, exist := chain.GetHeaderNumber() + if !exist { + storeLatestBlkNumber = 0 + } + // skip genesis block for { - if err := chain.VerifyFinalizedBlock(nextBlock); err != nil { + firstBlock, err := blockStream.nextBlock() + if err != nil { return err } - if err := chain.WriteBlock(nextBlock, WriteBlockSource); err != nil { - return err + if firstBlock == nil { + return nil } - progression.UpdateCurrentProgression(nextBlock.Number()) + if firstBlock.Number() > 0 && firstBlock.Number() > storeLatestBlkNumber { + nextBlockCh <- firstBlock - nextBlock, err = blockStream.nextBlock() - if err != nil { - return err - } - - if nextBlock == nil { break } - select { - case <-shutdownCh: - return nil - default: - } + log.Info("block exist, skip", "block", firstBlock.Number()) } - return nil -} + go func() { + for { + // check shutdown signal + select { + case <-shutdownCh: + return + default: + } + + nextBlock, err := blockStream.nextBlock() + if err != nil { + log.Error("failed to read block", "err", err) + } + + // end of stream + if nextBlock == nil { + nextBlockCh <- nil + + return + } + + nextBlockCh <- nextBlock + } + }() -// consumeCommonBlocks consumes blocks in blockstream to latest block in chain or different hash -// returns the first block to be written into chain -func consumeCommonBlocks( - chain blockchainInterface, - blockStream *blockStream, - shutdownCh <-chan os.Signal, -) (*types.Block, error) { for { - block, err := blockStream.nextBlock() - if err != nil { - return nil, err + nextBlock := <-nextBlockCh + + // end of stream + if nextBlock == nil { + break } - if block == nil { - return nil, nil + storageBlk, exist := chain.GetBlockByNumber(nextBlock.Number(), false) + + if exist && + storageBlk.Number() == nextBlock.Number() && + storageBlk.Hash() != nextBlock.Hash() { + return fmt.Errorf( + "block %d has different hash in storage (%s) and archive (%s)", + nextBlock.Number(), + storageBlk.Hash(), + nextBlock.Hash(), + ) } - if block.Number() == 0 { - if block.Hash() != chain.Genesis() { - return nil, fmt.Errorf( - "the hash of genesis block (%s) does not match blockchain genesis (%s)", - block.Hash(), - chain.Genesis(), - ) + // skip existing blocks + if !exist { + if err := chain.VerifyFinalizedBlock(nextBlock); err != nil { + return err } - continue - } + if err := chain.WriteBlock(nextBlock, WriteBlockSource); err != nil { + return err + } - if hash := chain.GetHashByNumber(block.Number()); hash != block.Hash() { - return block, nil + log.Info("block imported", "block", nextBlock.Number()) + } else { + log.Info("block exist, skip", "block", nextBlock.Number()) } select { case <-shutdownCh: - return nil, nil + return nil default: } } + + return nil } // blockStream parse RLP-encoded block from stream and consumed the used bytes @@ -247,7 +259,7 @@ func (b *blockStream) loadRLPArray() (uint64, error) { // loadRLPPrefix loads first byte of RLP encoded data from input func (b *blockStream) loadRLPPrefix() (byte, error) { buf := b.buffer[:1] - if _, err := b.input.Read(buf); err != nil { + if _, err := io.ReadFull(b.input, buf); err != nil { return 0, err } @@ -269,7 +281,11 @@ func (b *blockStream) loadPrefixSize(offset uint64, prefix byte) (uint64, uint64 b.reserveCap(offset + payloadSizeSize) payloadSizeBytes := b.buffer[offset : offset+payloadSizeSize] - n, err := b.input.Read(payloadSizeBytes) + + n, err := io.ReadFull(b.input, payloadSizeBytes) + if errors.Is(io.EOF, err) { + return 0, 0, io.EOF + } if err != nil { return 0, 0, err @@ -277,7 +293,7 @@ func (b *blockStream) loadPrefixSize(offset uint64, prefix byte) (uint64, uint64 if uint64(n) < payloadSizeSize { // couldn't load required amount of bytes - return 0, 0, io.EOF + return 0, 0, io.ErrUnexpectedEOF } payloadSize := new(big.Int).SetBytes(payloadSizeBytes).Int64() @@ -293,7 +309,7 @@ func (b *blockStream) loadPayload(offset uint64, size uint64) error { b.reserveCap(offset + size) buf := b.buffer[offset : offset+size] - if _, err := b.input.Read(buf); err != nil { + if _, err := io.ReadFull(b.input, buf); err != nil { return err } diff --git a/archive/restore_test.go b/archive/restore_test.go index 3fbbd6fb53..22a6ae1b8e 100644 --- a/archive/restore_test.go +++ b/archive/restore_test.go @@ -2,14 +2,11 @@ package archive import ( "bytes" - "fmt" "io" - "os" "testing" - "github.com/dogechain-lab/dogechain/blockchain" - "github.com/dogechain-lab/dogechain/helper/progress" "github.com/dogechain-lab/dogechain/types" + "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" ) @@ -29,6 +26,14 @@ func (m *mockChain) Genesis() types.Hash { return m.genesis.Hash() } +func (m *mockChain) GetHeaderNumber() (uint64, bool) { + if l := len(m.blocks); l != 0 { + return m.blocks[l-1].Number(), true + } + + return 0, false +} + func (m *mockChain) GetBlockByNumber(num uint64, full bool) (*types.Block, bool) { for _, b := range m.blocks { if b.Number() == num { @@ -58,10 +63,6 @@ func (m *mockChain) VerifyFinalizedBlock(block *types.Block) error { return nil } -func (m *mockChain) SubscribeEvents() blockchain.Subscription { - return blockchain.NewMockSubscription() -} - func getLatestBlockFromMockChain(m *mockChain) *types.Block { if l := len(m.blocks); l != 0 { return m.blocks[l-1] @@ -112,9 +113,8 @@ func Test_importBlocks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - progression := progress.NewProgressionWrapper(progress.ChainSyncRestore) blockStream := newTestBlockStream(tt.metadata, tt.archiveBlocks...) - err := importBlocks(tt.chain, blockStream, progression) + err := importBlocks(hclog.NewNullLogger(), tt.chain, blockStream) assert.Equal(t, tt.err, err) latestBlock := getLatestBlockFromMockChain(tt.chain) @@ -123,76 +123,6 @@ func Test_importBlocks(t *testing.T) { } } -func Test_consumeCommonBlocks(t *testing.T) { - newTestArchiveStream := func(blocks ...*types.Block) *blockStream { - var buf bytes.Buffer - for _, b := range blocks { - buf.Write(b.MarshalRLP()) - } - - return newBlockStream(&buf) - } - - tests := []struct { - name string - blockStream *blockStream - chain blockchainInterface - // result - block *types.Block - err error - }{ - { - name: "should consume common blocks", - blockStream: newTestArchiveStream(genesis, blocks[0], blocks[1], blocks[2]), - chain: &mockChain{ - genesis: genesis, - blocks: []*types.Block{blocks[0], blocks[1]}, - }, - block: blocks[2], - err: nil, - }, - { - name: "should consume all blocks", - blockStream: newTestArchiveStream(genesis, blocks[0], blocks[1]), - chain: &mockChain{ - genesis: genesis, - blocks: []*types.Block{blocks[0], blocks[1]}, - }, - block: nil, - err: nil, - }, - { - name: "should return error in case of genesis mismatch", - blockStream: newTestArchiveStream(genesis, blocks[0], blocks[1]), - chain: &mockChain{ - genesis: &types.Block{ - Header: &types.Header{ - Hash: types.StringToHash("wrong genesis"), - Number: 0, - }, - }, - blocks: []*types.Block{blocks[0], blocks[1]}, - }, - block: nil, - err: fmt.Errorf( - "the hash of genesis block (%s) does not match blockchain genesis (%s)", - genesis.Hash(), - types.StringToHash("wrong genesis"), - ), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - osSignal := make(<-chan os.Signal) - resultBlock, err := consumeCommonBlocks(tt.chain, tt.blockStream, osSignal) - - assert.Equal(t, tt.block, resultBlock) - assert.Equal(t, tt.err, err) - }) - } -} - func Test_parseBlock(t *testing.T) { tests := []struct { name string @@ -311,7 +241,7 @@ func Test_loadPrefixSize(t *testing.T) { prefix: 0xf9, // 2 bytes size: 0, consumed: 0, - err: io.EOF, + err: io.ErrUnexpectedEOF, }, } diff --git a/command/backup/params.go b/command/backup/params.go index 9717d3ab6c..75bf3b44e6 100644 --- a/command/backup/params.go +++ b/command/backup/params.go @@ -88,7 +88,7 @@ func (p *backupParams) createBackup(grpcAddress string) error { if err != nil { return err } - defer connection.Close() + defer conn.Close() // resFrom and resTo represents the range of blocks that can be included in the file resFrom, resTo, err := archive.CreateBackup( diff --git a/server/server.go b/server/server.go index 305d13ad61..0e8c22b986 100644 --- a/server/server.go +++ b/server/server.go @@ -354,7 +354,7 @@ func (s *Server) restoreChain() error { return nil } - if err := archive.RestoreChain(s.blockchain, *s.config.RestoreFile, s.restoreProgression); err != nil { + if err := archive.RestoreChain(s.logger, s.blockchain, *s.config.RestoreFile); err != nil { return err } From 9c975d696c80cea494052c11cc635f3919c3732c Mon Sep 17 00:00:00 2001 From: 0xcb9ff9 <0xcb9ff9@proton.me> Date: Mon, 13 Feb 2023 12:13:32 +0800 Subject: [PATCH 3/3] stop in unmarshaling error --- archive/backup.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/archive/backup.go b/archive/backup.go index 367be25203..192785749e 100644 --- a/archive/backup.go +++ b/archive/backup.go @@ -241,9 +241,8 @@ func processExport( err = blk.UnmarshalRLP(resp.Data) if err != nil { logger.Error("Failed to unmarshal block", "Number", num, "err", err) - time.Sleep(1 * time.Second) - continue + return } if blk.Number() != num { @@ -270,7 +269,7 @@ func processExport( return from, current, nil } - // tips: writer.Write() not necessarily write all data, use io.Copy() instead + // tips: writer.Write() does not necessarily write all data, use io.Copy() instead if _, err := io.Copy(writer, bytes.NewBuffer(block.MarshalRLP())); err != nil { return from, current, err }