Skip to content

Commit

Permalink
lightning: a way to estimate parquet file size (pingcap#46984) (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 28, 2023
1 parent 5c1e065 commit 72e5323
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 2 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/mydump/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ go_test(
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_xitongsys_parquet_go//parquet",
"@com_github_xitongsys_parquet_go//writer",
"@com_github_xitongsys_parquet_go_source//local",
"@org_uber_go_goleak//:goleak",
Expand Down
61 changes: 59 additions & 2 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ import (
)

// sampleCompressedFileSize represents how many bytes need to be sampled for compressed files
const sampleCompressedFileSize = 4 * 1024
const (
sampleCompressedFileSize = 4 * 1024
maxSampleParquetDataSize = 8 * 1024
maxSampleParquetRowCount = 500
)

// MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader.
type MDDatabaseMeta struct {
Expand Down Expand Up @@ -484,7 +488,7 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size
s.tableSchemas = append(s.tableSchemas, info)
case SourceTypeViewSchema:
s.viewSchemas = append(s.viewSchemas, info)
case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet:
case SourceTypeSQL, SourceTypeCSV:
if info.FileMeta.Compression != CompressionNone {
compressRatio, err2 := SampleFileCompressRatio(ctx, info.FileMeta, s.loader.GetStore())
if err2 != nil {
Expand All @@ -495,6 +499,15 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size
}
}
s.tableDatas = append(s.tableDatas, info)
case SourceTypeParquet:
parquestDataSize, err2 := SampleParquetDataSize(ctx, info.FileMeta, s.loader.GetStore())
if err2 != nil {
logger.Error("fail to sample parquet data size", zap.String("category", "loader"),
zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err2))
} else {
info.FileMeta.RealSize = parquestDataSize
}
s.tableDatas = append(s.tableDatas, info)
}

logger.Debug("file route result", zap.String("schema", res.Schema),
Expand Down Expand Up @@ -768,3 +781,47 @@ func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store
}
return float64(tot) / float64(pos), nil
}

// SampleParquetDataSize samples the data size of the parquet file.
func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (int64, error) {
totalRowCount, err := ReadParquetFileRowCountByFile(ctx, store, fileMeta)
if err != nil {
return 0, err
}

reader, err := store.Open(ctx, fileMeta.Path)
if err != nil {
return 0, err
}
parser, err := NewParquetParser(ctx, store, reader, fileMeta.Path)
if err != nil {
//nolint: errcheck
reader.Close()
return 0, err
}
//nolint: errcheck
defer parser.Close()

var (
rowSize int64
rowCount int64
)
for {
err = parser.ReadRow()
if err != nil {
if errors.Cause(err) == io.EOF {
break
}
return 0, err
}
lastRow := parser.LastRow()
rowCount++
rowSize += int64(lastRow.Length)
parser.RecycleRow(lastRow)
if rowSize > maxSampleParquetDataSize || rowCount > maxSampleParquetRowCount {
break
}
}
size := int64(float64(totalRowCount) / float64(rowCount) * float64(rowSize))
return size, nil
}
63 changes: 63 additions & 0 deletions br/pkg/lightning/mydump/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"compress/gzip"
"context"
"fmt"
"math/rand"
"os"
"path/filepath"
"testing"
"time"

"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
Expand All @@ -32,6 +34,8 @@ import (
router "github.com/pingcap/tidb/util/table-router"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/writer"
)

type testMydumpLoaderSuite struct {
Expand Down Expand Up @@ -1103,3 +1107,62 @@ func TestSampleFileCompressRatio(t *testing.T) {
require.NoError(t, err)
require.InDelta(t, ratio, 5000.0/float64(bf.Len()), 1e-5)
}

func TestSampleParquetDataSize(t *testing.T) {
s := newTestMydumpLoaderSuite(t)
store, err := storage.NewLocalStorage(s.sourceDir)
require.NoError(t, err)

type row struct {
ID int64 `parquet:"name=id, type=INT64"`
Key string `parquet:"name=key, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
Value string `parquet:"name=value, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

byteArray := make([]byte, 0, 40*1024)
bf := bytes.NewBuffer(byteArray)
pwriter, err := writer.NewParquetWriterFromWriter(bf, new(row), 4)
require.NoError(t, err)
pwriter.RowGroupSize = 128 * 1024 * 1024 //128M
pwriter.PageSize = 8 * 1024 //8K
pwriter.CompressionType = parquet.CompressionCodec_SNAPPY
seed := time.Now().Unix()
t.Logf("seed: %d", seed)
rand.Seed(seed)
totalRowSize := 0
for i := 0; i < 1000; i++ {
kl := rand.Intn(20) + 1
key := make([]byte, kl)
kl, err = rand.Read(key)
require.NoError(t, err)
vl := rand.Intn(20) + 1
value := make([]byte, vl)
vl, err = rand.Read(value)
require.NoError(t, err)

totalRowSize += kl + vl + 8
row := row{
ID: int64(i),
Key: string(key[:kl]),
Value: string(value[:vl]),
}
err = pwriter.Write(row)
require.NoError(t, err)
}
err = pwriter.WriteStop()
require.NoError(t, err)

fileName := "test_1.t1.parquet"
err = store.WriteFile(ctx, fileName, bf.Bytes())
require.NoError(t, err)

size, err := md.SampleParquetDataSize(ctx, md.SourceFileMeta{
Path: fileName,
}, store)
require.NoError(t, err)
// expected error within 10%, so delta = totalRowSize / 10
require.InDelta(t, totalRowSize, size, float64(totalRowSize)/10)
}

0 comments on commit 72e5323

Please sign in to comment.