From e5fd5144c4b0244b85aee340b2085d33e8d6586d Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 6 Dec 2024 16:35:02 +0800 Subject: [PATCH] lightning: sample once parquet file (#56205) (#57921) close pingcap/tidb#56104 --- br/pkg/lightning/importer/import.go | 13 +------- br/pkg/lightning/mydump/loader.go | 41 +++++++++++++++++++------- br/pkg/lightning/mydump/loader_test.go | 8 +++-- 3 files changed, 38 insertions(+), 24 deletions(-) diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index 514128109ff75..f6456fb678942 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -1770,19 +1770,8 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { allTasks = append(allTasks, task{tr: tr, cp: cp}) if len(cp.Engines) == 0 { - for i, fi := range tableMeta.DataFiles { + for _, fi := range tableMeta.DataFiles { totalDataSizeToRestore += fi.FileMeta.FileSize - if fi.FileMeta.Type == mydump.SourceTypeParquet { - numberRows, err := mydump.ReadParquetFileRowCountByFile(ctx, rc.store, fi.FileMeta) - if err != nil { - return errors.Trace(err) - } - if m, ok := metric.FromContext(ctx); ok { - m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(numberRows)) - } - fi.FileMeta.Rows = numberRows - tableMeta.DataFiles[i] = fi - } } } else { for _, eng := range cp.Engines { diff --git a/br/pkg/lightning/mydump/loader.go b/br/pkg/lightning/mydump/loader.go index e13b2d90a277c..de144a7d19aa5 100644 --- a/br/pkg/lightning/mydump/loader.go +++ b/br/pkg/lightning/mydump/loader.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/storage" regexprrouter "github.com/pingcap/tidb/pkg/util/regexpr-router" filter "github.com/pingcap/tidb/pkg/util/table-filter" @@ -201,6 +202,8 @@ type mdLoaderSetup struct { dbIndexMap map[string]int tableIndexMap map[filter.Table]int setupCfg *MDLoaderSetupConfig + + sampledParquetRowSizes map[string]float64 } // NewMyDumpLoader constructs a MyDumper loader that scanns the data source and constructs a set of metadatas. @@ -277,6 +280,8 @@ func NewMyDumpLoaderWithStore(ctx context.Context, cfg *config.Config, dbIndexMap: make(map[string]int), tableIndexMap: make(map[filter.Table]int), setupCfg: mdLoaderSetupCfg, + + sampledParquetRowSizes: make(map[string]float64), } if err := setup.setup(ctx); err != nil { @@ -489,12 +494,29 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size } s.tableDatas = append(s.tableDatas, info) case SourceTypeParquet: - parquestDataSize, err2 := SampleParquetDataSize(ctx, info.FileMeta, s.loader.GetStore()) - if err2 != nil { - logger.Error("fail to sample parquet data size", zap.String("category", "loader"), - zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err2)) - } else { - info.FileMeta.RealSize = parquestDataSize + tableName := info.TableName.String() + if s.sampledParquetRowSizes[tableName] == 0 { + s.sampledParquetRowSizes[tableName], err = SampleParquetRowSize(ctx, info.FileMeta, s.loader.GetStore()) + if err != nil { + logger.Error("fail to sample parquet row size", zap.String("category", "loader"), + zap.String("schema", res.Schema), zap.String("table", res.Name), + zap.Stringer("type", res.Type), zap.Error(err)) + return errors.Trace(err) + } + } + if s.sampledParquetRowSizes[tableName] != 0 { + totalRowCount, err := ReadParquetFileRowCountByFile(ctx, s.loader.GetStore(), info.FileMeta) + if err != nil { + logger.Error("fail to get file total row count", zap.String("category", "loader"), + zap.String("schema", res.Schema), zap.String("table", res.Name), + zap.Stringer("type", res.Type), zap.Error(err)) + return errors.Trace(err) + } + info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSizes[tableName]) + info.FileMeta.Rows = totalRowCount + if m, ok := metric.FromContext(ctx); ok { + m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(totalRowCount)) + } } s.tableDatas = append(s.tableDatas, info) } @@ -780,8 +802,8 @@ func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store return float64(tot) / float64(pos), nil } -// SampleParquetDataSize samples the data size of the parquet file. -func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (int64, error) { +// SampleParquetRowSize samples row size of the parquet file. +func SampleParquetRowSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) { totalRowCount, err := ReadParquetFileRowCountByFile(ctx, store, fileMeta) if totalRowCount == 0 || err != nil { return 0, err @@ -820,6 +842,5 @@ func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store s break } } - size := int64(float64(totalRowCount) / float64(rowCount) * float64(rowSize)) - return size, nil + return float64(rowSize) / float64(rowCount), nil } diff --git a/br/pkg/lightning/mydump/loader_test.go b/br/pkg/lightning/mydump/loader_test.go index 7e8d71ca95449..b3def155ddd31 100644 --- a/br/pkg/lightning/mydump/loader_test.go +++ b/br/pkg/lightning/mydump/loader_test.go @@ -1159,12 +1159,16 @@ func testSampleParquetDataSize(t *testing.T, count int) { err = store.WriteFile(ctx, fileName, bf.Bytes()) require.NoError(t, err) - size, err := md.SampleParquetDataSize(ctx, md.SourceFileMeta{ + rowSize, err := md.SampleParquetRowSize(ctx, md.SourceFileMeta{ Path: fileName, }, store) require.NoError(t, err) + rowCount, err := md.ReadParquetFileRowCountByFile(ctx, store, md.SourceFileMeta{ + Path: fileName, + }) + require.NoError(t, err) // expected error within 10%, so delta = totalRowSize / 10 - require.InDelta(t, totalRowSize, size, float64(totalRowSize)/10) + require.InDelta(t, totalRowSize, int64(rowSize*float64(rowCount)), float64(totalRowSize)/10) } func TestSampleParquetDataSize(t *testing.T) { t.Run("count=1000", func(t *testing.T) { testSampleParquetDataSize(t, 1000) })