From 4c7eaf9ef4e4b6df835fcd939d1325f86b2801b6 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 18 Nov 2022 17:42:09 +0800 Subject: [PATCH] address comment --- br/pkg/lightning/restore/get_pre_info.go | 36 ++---------------------- br/pkg/lightning/restore/restore.go | 35 ++++++++++++----------- 2 files changed, 20 insertions(+), 51 deletions(-) diff --git a/br/pkg/lightning/restore/get_pre_info.go b/br/pkg/lightning/restore/get_pre_info.go index f1075d43c042c..93927c6956809 100644 --- a/br/pkg/lightning/restore/get_pre_info.go +++ b/br/pkg/lightning/restore/get_pre_info.go @@ -444,23 +444,7 @@ func (p *PreRestoreInfoGetterImpl) ReadFirstNRowsByTableName(ctx context.Context // ReadFirstNRowsByFileMeta reads the first N rows of an data file. // It implements the PreRestoreInfoGetter interface. func (p *PreRestoreInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) ([]string, [][]types.Datum, error) { - var ( - reader storage.ReadSeekCloser - err error - ) - switch { - case dataFileMeta.Type == mydump.SourceTypeParquet: - reader, err = mydump.OpenParquetReader(ctx, p.srcStorage, dataFileMeta.Path, dataFileMeta.FileSize) - case dataFileMeta.Compression != mydump.CompressionNone: - var compressType storage.CompressType - compressType, err = mydump.ToStorageCompressType(dataFileMeta.Compression) - if err != nil { - break - } - reader, err = storage.WithCompression(p.srcStorage, compressType).Open(ctx, dataFileMeta.Path) - default: - reader, err = p.srcStorage.Open(ctx, dataFileMeta.Path) - } + reader, err := openReader(ctx, dataFileMeta, p.srcStorage) if err != nil { return nil, nil, errors.Trace(err) } @@ -598,23 +582,7 @@ func (p *PreRestoreInfoGetterImpl) sampleDataFromTable( return resultIndexRatio, isRowOrdered, nil } sampleFile := tableMeta.DataFiles[0].FileMeta - var ( - reader storage.ReadSeekCloser - err error - ) - switch { - case sampleFile.Type == mydump.SourceTypeParquet: - reader, err = mydump.OpenParquetReader(ctx, p.srcStorage, sampleFile.Path, sampleFile.FileSize) - case sampleFile.Compression != mydump.CompressionNone: - var compressType storage.CompressType - compressType, err = mydump.ToStorageCompressType(sampleFile.Compression) - if err != nil { - break - } - reader, err = storage.WithCompression(p.srcStorage, compressType).Open(ctx, sampleFile.Path) - default: - reader, err = p.srcStorage.Open(ctx, sampleFile.Path) - } + reader, err := openReader(ctx, sampleFile, p.srcStorage) if err != nil { return 0.0, false, errors.Trace(err) } diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 0a0e05b45ac5d..d7a4fd0d3cc8b 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2190,23 +2190,7 @@ func newChunkRestore( ) (*chunkRestore, error) { blockBufSize := int64(cfg.Mydumper.ReadBlockSize) - var ( - reader storage.ReadSeekCloser - compressType storage.CompressType - err error - ) - switch { - case chunk.FileMeta.Type == mydump.SourceTypeParquet: - reader, err = mydump.OpenParquetReader(ctx, store, chunk.FileMeta.Path, chunk.FileMeta.FileSize) - case chunk.FileMeta.Compression != mydump.CompressionNone: - compressType, err = mydump.ToStorageCompressType(chunk.FileMeta.Compression) - if err != nil { - break - } - reader, err = storage.WithCompression(store, compressType).Open(ctx, chunk.FileMeta.Path) - default: - reader, err = store.Open(ctx, chunk.FileMeta.Path) - } + reader, err := openReader(ctx, chunk.FileMeta, store) if err != nil { return nil, errors.Trace(err) } @@ -2771,3 +2755,20 @@ func (cr *chunkRestore) restore( } return errors.Trace(firstErr(encodeErr, deliverErr)) } + +func openReader(ctx context.Context, fileMeta mydump.SourceFileMeta, store storage.ExternalStorage) ( + reader storage.ReadSeekCloser, err error) { + switch { + case fileMeta.Type == mydump.SourceTypeParquet: + reader, err = mydump.OpenParquetReader(ctx, store, fileMeta.Path, fileMeta.FileSize) + case fileMeta.Compression != mydump.CompressionNone: + compressType, err2 := mydump.ToStorageCompressType(fileMeta.Compression) + if err2 != nil { + return nil, err2 + } + reader, err = storage.WithCompression(store, compressType).Open(ctx, fileMeta.Path) + default: + reader, err = store.Open(ctx, fileMeta.Path) + } + return +}