Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importinto: estimate real file size by sample on compressed file #46404

Merged
merged 6 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/lightning/mydump/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//util/table-filter",
"//util/zeropool",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_spkg_bom//:bom",
"@com_github_xitongsys_parquet_go//parquet",
"@com_github_xitongsys_parquet_go//reader",
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
Expand Down Expand Up @@ -731,6 +732,14 @@ func calculateFileBytes(ctx context.Context,

// SampleFileCompressRatio samples the compress ratio of the compressed file.
func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) {
failpoint.Inject("SampleFileCompressPercentage", func(val failpoint.Value) {
switch v := val.(type) {
case string:
failpoint.Return(1.0, errors.New(v))
case int:
failpoint.Return(float64(v)/100, nil)
}
})
if fileMeta.Compression == CompressionNone {
return 1, nil
}
Expand Down
9 changes: 8 additions & 1 deletion br/pkg/storage/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"os"
"path/filepath"
"runtime"
"strings"
"testing"

Expand Down Expand Up @@ -181,17 +182,23 @@ func TestNewCompressReader(t *testing.T) {
require.NoError(t, w.Close())
compressedData := buf.Bytes()

// default cfg
// default cfg(decode asynchronously)
prevRoutineCnt := runtime.NumGoroutine()
r, err := newCompressReader(Zstd, DecompressConfig{}, bytes.NewReader(compressedData))
currRoutineCnt := runtime.NumGoroutine()
require.NoError(t, err)
require.Greater(t, currRoutineCnt, prevRoutineCnt)
allData, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, "data", string(allData))

// sync decode
prevRoutineCnt = runtime.NumGoroutine()
config := DecompressConfig{ZStdDecodeConcurrency: 1}
r, err = newCompressReader(Zstd, config, bytes.NewReader(compressedData))
require.NoError(t, err)
currRoutineCnt = runtime.NumGoroutine()
require.Equal(t, prevRoutineCnt, currRoutineCnt)
allData, err = io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, "data", string(allData))
Expand Down
4 changes: 3 additions & 1 deletion executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ go_test(
embed = [":importer"],
flaky = True,
race = "on",
shard_count = 13,
shard_count = 14,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/config",
"//br/pkg/lightning/mydump",
"//br/pkg/streamhelper",
"//br/pkg/utils",
"//config",
Expand All @@ -106,6 +107,7 @@ go_test(
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
"@io_etcd_go_etcd_client_v3//:client",
Expand Down
29 changes: 21 additions & 8 deletions executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,15 +920,14 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "failed to read file size by seek")
}
compressTp := mydump.ParseCompressionOnFileExtension(fileNameKey)
dataFiles = append(dataFiles, &mydump.SourceFileMeta{
fileMeta := mydump.SourceFileMeta{
Path: fileNameKey,
FileSize: size,
Compression: compressTp,
Type: sourceType,
// todo: if we support compression for physical mode, should set it to size * compressRatio to better split
// engines
RealSize: size,
})
}
fileMeta.RealSize = e.getFileRealSize(ctx, fileMeta, s)
dataFiles = append(dataFiles, &fileMeta)
totalSize = size
} else {
var commonPrefix string
Expand All @@ -950,13 +949,14 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
return nil
}
compressTp := mydump.ParseCompressionOnFileExtension(remotePath)
dataFiles = append(dataFiles, &mydump.SourceFileMeta{
fileMeta := mydump.SourceFileMeta{
Path: remotePath,
FileSize: size,
Compression: compressTp,
Type: sourceType,
RealSize: size,
})
}
fileMeta.RealSize = e.getFileRealSize(ctx, fileMeta, s)
dataFiles = append(dataFiles, &fileMeta)
totalSize += size
return nil
})
Expand All @@ -971,6 +971,19 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
return nil
}

func (e *LoadDataController) getFileRealSize(ctx context.Context,
fileMeta mydump.SourceFileMeta, store storage.ExternalStorage) int64 {
if fileMeta.Compression == mydump.CompressionNone {
return fileMeta.FileSize
}
compressRatio, err := mydump.SampleFileCompressRatio(ctx, fileMeta, store)
if err != nil {
e.logger.Warn("failed to get compress ratio", zap.String("file", fileMeta.Path), zap.Error(err))
return fileMeta.FileSize
}
return int64(compressRatio * float64(fileMeta.FileSize))
}

func (e *LoadDataController) getSourceType() mydump.SourceType {
switch e.Format {
case DataFormatParquet:
Expand Down
19 changes: 19 additions & 0 deletions executor/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
package importer

import (
"context"
"fmt"
"runtime"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -157,3 +160,19 @@ func TestASTArgsFromStmt(t *testing.T) {
require.Equal(t, astArgs.ColumnAssignments, importIntoStmt.ColumnAssignments)
require.Equal(t, astArgs.ColumnsAndUserVars, importIntoStmt.ColumnsAndUserVars)
}

func TestGetFileRealSize(t *testing.T) {
err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/mydump/SampleFileCompressPercentage", "return(250)")
require.NoError(t, err)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/mydump/SampleFileCompressPercentage")
}()
fileMeta := mydump.SourceFileMeta{Compression: mydump.CompressionNone, FileSize: 100}
c := &LoadDataController{logger: log.L()}
require.Equal(t, int64(100), c.getFileRealSize(context.Background(), fileMeta, nil))
fileMeta.Compression = mydump.CompressionGZ
require.Equal(t, int64(250), c.getFileRealSize(context.Background(), fileMeta, nil))
err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/mydump/SampleFileCompressPercentage", `return("test err")`)
require.NoError(t, err)
require.Equal(t, int64(100), c.getFileRealSize(context.Background(), fileMeta, nil))
}
4 changes: 3 additions & 1 deletion executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@
func (ti *TableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) {
info := LoadDataReaderInfo{
Opener: func(ctx context.Context) (io.ReadSeekCloser, error) {
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, ti.dataStore, storage.DecompressConfig{})
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, ti.dataStore, storage.DecompressConfig{
ZStdDecodeConcurrency: 1,
})

Check warning on line 260 in executor/importer/table_import.go

View check run for this annotation

Codecov / codecov/patch

executor/importer/table_import.go#L258-L260

Added lines #L258 - L260 were not covered by tests
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
56 changes: 0 additions & 56 deletions tests/realtikvtest/importintotest/import_into_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package importintotest

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"net"
Expand Down Expand Up @@ -443,60 +441,6 @@ func (s *mockGCSSuite) TestMultiValueIndex() {
))
}

func (s *mockGCSSuite) TestMixedCompression() {
s.tk.MustExec("DROP DATABASE IF EXISTS multi_load;")
s.tk.MustExec("CREATE DATABASE multi_load;")
s.tk.MustExec("CREATE TABLE multi_load.t (i INT PRIMARY KEY, s varchar(32));")

// gzip content
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
_, err := w.Write([]byte(`1,test1
2,test2
3,test3
4,test4`))
require.NoError(s.T(), err)
err = w.Close()
require.NoError(s.T(), err)

s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-multi-load",
Name: "compress.001.tsv.gz",
},
Content: buf.Bytes(),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-multi-load",
Name: "compress.002.tsv",
},
Content: []byte(`5,test5
6,test6
7,test7
8,test8
9,test9`),
})

sql := fmt.Sprintf(`IMPORT INTO multi_load.t FROM 'gs://test-multi-load/compress.*?endpoint=%s'
WITH thread=1;`, gcsEndpoint)
s.tk.MustQuery(sql)
s.tk.MustQuery("SELECT * FROM multi_load.t;").Check(testkit.Rows(
"1 test1", "2 test2", "3 test3", "4 test4",
"5 test5", "6 test6", "7 test7", "8 test8", "9 test9",
))

// with ignore N rows
s.tk.MustExec("truncate table multi_load.t")
sql = fmt.Sprintf(`IMPORT INTO multi_load.t FROM 'gs://test-multi-load/compress.*?endpoint=%s'
WITH skip_rows=3, thread=1;`, gcsEndpoint)
s.tk.MustQuery(sql)
s.tk.MustQuery("SELECT * FROM multi_load.t;").Check(testkit.Rows(
"4 test4",
"8 test8", "9 test9",
))
}

func (s *mockGCSSuite) TestLoadSQLDump() {
s.tk.MustExec("DROP DATABASE IF EXISTS load_csv;")
s.tk.MustExec("CREATE DATABASE load_csv;")
Expand Down
5 changes: 4 additions & 1 deletion tests/realtikvtest/importintotest3/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ go_test(
name = "importintotest3_test",
timeout = "moderate",
srcs = [
"dummy_test.go",
"file_compression_test.go",
"main_test.go",
],
flaky = True,
race = "on",
deps = [
"//br/pkg/lightning/mydump",
"//config",
"//kv",
"//testkit",
"//tests/realtikvtest",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_golang_snappy//:snappy",
"@com_github_klauspost_compress//zstd",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_stretchr_testify//suite",
Expand Down
19 changes: 0 additions & 19 deletions tests/realtikvtest/importintotest3/dummy_test.go

This file was deleted.

Loading
Loading