Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importinto: estimate real file size by sample on compressed file #46404

Merged
merged 6 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/lightning/mydump/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//util/table-filter",
"//util/zeropool",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_spkg_bom//:bom",
"@com_github_xitongsys_parquet_go//parquet",
"@com_github_xitongsys_parquet_go//reader",
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
Expand Down Expand Up @@ -731,6 +732,14 @@

// SampleFileCompressRatio samples the compress ratio of the compressed file.
func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) {
failpoint.Inject("SampleFileCompressRatio", func(val failpoint.Value) {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
switch v := val.(type) {
case string:
failpoint.Return(1.0, errors.New(v))
case int:
failpoint.Return(float64(v)/100, nil)

Check warning on line 740 in br/pkg/lightning/mydump/loader.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/mydump/loader.go#L735-L740

Added lines #L735 - L740 were not covered by tests
}
})
if fileMeta.Compression == CompressionNone {
return 1, nil
}
Expand Down
4 changes: 3 additions & 1 deletion executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ go_test(
embed = [":importer"],
flaky = True,
race = "on",
shard_count = 12,
shard_count = 13,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/config",
"//br/pkg/lightning/mydump",
"//config",
"//expression",
"//parser",
Expand All @@ -100,6 +101,7 @@ go_test(
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@org_uber_go_zap//:zap",
],
Expand Down
29 changes: 22 additions & 7 deletions executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,15 +920,15 @@
return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "failed to read file size by seek")
}
compressTp := mydump.ParseCompressionOnFileExtension(fileNameKey)
dataFiles = append(dataFiles, &mydump.SourceFileMeta{
fileMeta := mydump.SourceFileMeta{

Check warning on line 923 in executor/importer/import.go

View check run for this annotation

Codecov / codecov/patch

executor/importer/import.go#L923

Added line #L923 was not covered by tests
Path: fileNameKey,
FileSize: size,
Compression: compressTp,
Type: sourceType,
// todo: if we support compression for physical mode, should set it to size * compressRatio to better split
// engines
RealSize: size,
})
RealSize: size,
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}
fileMeta.RealSize = e.getFileRealSize(ctx, fileMeta, s)
dataFiles = append(dataFiles, &fileMeta)

Check warning on line 931 in executor/importer/import.go

View check run for this annotation

Codecov / codecov/patch

executor/importer/import.go#L928-L931

Added lines #L928 - L931 were not covered by tests
totalSize = size
} else {
var commonPrefix string
Expand All @@ -950,13 +950,15 @@
return nil
}
compressTp := mydump.ParseCompressionOnFileExtension(remotePath)
dataFiles = append(dataFiles, &mydump.SourceFileMeta{
fileMeta := mydump.SourceFileMeta{

Check warning on line 953 in executor/importer/import.go

View check run for this annotation

Codecov / codecov/patch

executor/importer/import.go#L953

Added line #L953 was not covered by tests
Path: remotePath,
FileSize: size,
Compression: compressTp,
Type: sourceType,
RealSize: size,
})
}
fileMeta.RealSize = e.getFileRealSize(ctx, fileMeta, s)
dataFiles = append(dataFiles, &fileMeta)

Check warning on line 961 in executor/importer/import.go

View check run for this annotation

Codecov / codecov/patch

executor/importer/import.go#L959-L961

Added lines #L959 - L961 were not covered by tests
totalSize += size
return nil
})
Expand All @@ -971,6 +973,19 @@
return nil
}

func (e *LoadDataController) getFileRealSize(ctx context.Context,
fileMeta mydump.SourceFileMeta, store storage.ExternalStorage) int64 {
if fileMeta.Compression == mydump.CompressionNone {
return fileMeta.FileSize
}
compressRatio, err := mydump.SampleFileCompressRatio(ctx, fileMeta, store)
if err != nil {
e.logger.Warn("failed to get compress ratio", zap.String("file", fileMeta.Path), zap.Error(err))
return fileMeta.FileSize
}
return int64(compressRatio * float64(fileMeta.FileSize))

Check warning on line 986 in executor/importer/import.go

View check run for this annotation

Codecov / codecov/patch

executor/importer/import.go#L977-L986

Added lines #L977 - L986 were not covered by tests
}

func (e *LoadDataController) getSourceType() mydump.SourceType {
switch e.Format {
case DataFormatParquet:
Expand Down
19 changes: 19 additions & 0 deletions executor/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
package importer

import (
"context"
"fmt"
"runtime"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -157,3 +160,19 @@ func TestASTArgsFromStmt(t *testing.T) {
require.Equal(t, astArgs.ColumnAssignments, importIntoStmt.ColumnAssignments)
require.Equal(t, astArgs.ColumnsAndUserVars, importIntoStmt.ColumnsAndUserVars)
}

func TestGetFileRealSize(t *testing.T) {
err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/mydump/SampleFileCompressRatio", "return(250)")
require.NoError(t, err)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/mydump/SampleFileCompressRatio")
}()
fileMeta := mydump.SourceFileMeta{Compression: mydump.CompressionNone, FileSize: 100}
c := &LoadDataController{logger: log.L()}
require.Equal(t, int64(100), c.getFileRealSize(context.Background(), fileMeta, nil))
fileMeta.Compression = mydump.CompressionGZ
require.Equal(t, int64(250), c.getFileRealSize(context.Background(), fileMeta, nil))
err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/mydump/SampleFileCompressRatio", `return("test err")`)
require.NoError(t, err)
require.Equal(t, int64(100), c.getFileRealSize(context.Background(), fileMeta, nil))
}
56 changes: 0 additions & 56 deletions tests/realtikvtest/importintotest/import_into_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package importintotest

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"net"
Expand Down Expand Up @@ -443,60 +441,6 @@ func (s *mockGCSSuite) TestMultiValueIndex() {
))
}

func (s *mockGCSSuite) TestMixedCompression() {
s.tk.MustExec("DROP DATABASE IF EXISTS multi_load;")
s.tk.MustExec("CREATE DATABASE multi_load;")
s.tk.MustExec("CREATE TABLE multi_load.t (i INT PRIMARY KEY, s varchar(32));")

// gzip content
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
_, err := w.Write([]byte(`1,test1
2,test2
3,test3
4,test4`))
require.NoError(s.T(), err)
err = w.Close()
require.NoError(s.T(), err)

s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-multi-load",
Name: "compress.001.tsv.gz",
},
Content: buf.Bytes(),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-multi-load",
Name: "compress.002.tsv",
},
Content: []byte(`5,test5
6,test6
7,test7
8,test8
9,test9`),
})

sql := fmt.Sprintf(`IMPORT INTO multi_load.t FROM 'gs://test-multi-load/compress.*?endpoint=%s'
WITH thread=1;`, gcsEndpoint)
s.tk.MustQuery(sql)
s.tk.MustQuery("SELECT * FROM multi_load.t;").Check(testkit.Rows(
"1 test1", "2 test2", "3 test3", "4 test4",
"5 test5", "6 test6", "7 test7", "8 test8", "9 test9",
))

// with ignore N rows
s.tk.MustExec("truncate table multi_load.t")
sql = fmt.Sprintf(`IMPORT INTO multi_load.t FROM 'gs://test-multi-load/compress.*?endpoint=%s'
WITH skip_rows=3, thread=1;`, gcsEndpoint)
s.tk.MustQuery(sql)
s.tk.MustQuery("SELECT * FROM multi_load.t;").Check(testkit.Rows(
"4 test4",
"8 test8", "9 test9",
))
}

func (s *mockGCSSuite) TestLoadSQLDump() {
s.tk.MustExec("DROP DATABASE IF EXISTS load_csv;")
s.tk.MustExec("CREATE DATABASE load_csv;")
Expand Down
5 changes: 4 additions & 1 deletion tests/realtikvtest/importintotest3/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ go_test(
name = "importintotest3_test",
timeout = "moderate",
srcs = [
"dummy_test.go",
"file_compression_test.go",
"main_test.go",
],
flaky = True,
race = "on",
deps = [
"//br/pkg/lightning/mydump",
"//config",
"//kv",
"//testkit",
"//tests/realtikvtest",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_golang_snappy//:snappy",
"@com_github_klauspost_compress//zstd",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_stretchr_testify//suite",
Expand Down
19 changes: 0 additions & 19 deletions tests/realtikvtest/importintotest3/dummy_test.go

This file was deleted.

124 changes: 124 additions & 0 deletions tests/realtikvtest/importintotest3/file_compression_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package importintotest

import (
"bytes"
"compress/gzip"
"fmt"
"io"

"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/testkit"
)

func (s *mockGCSSuite) getCompressedData(compression mydump.Compression, data []byte) []byte {
var buf bytes.Buffer
var w io.WriteCloser
switch compression {
case mydump.CompressionGZ:
w = gzip.NewWriter(&buf)
case mydump.CompressionZStd:
var err error
w, err = zstd.NewWriter(&buf)
s.NoError(err)
case mydump.CompressionSnappy:
w = snappy.NewBufferedWriter(&buf)
default:
panic(fmt.Sprintf("unknown compression type: %d", compression))
}
_, err := w.Write(data)
s.NoError(err)
s.NoError(w.Close())
compressedData := buf.Bytes()
s.NotEqual(data, compressedData)
return compressedData
}

func (s *mockGCSSuite) TestGzipAndMixedCompression() {
s.prepareAndUseDB("gzip")
s.tk.MustExec("CREATE TABLE gzip.t (i INT PRIMARY KEY, s varchar(32));")

s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "gzip", Name: "compress.001.csv.gz"},
Content: s.getCompressedData(mydump.CompressionGZ, []byte("1,test1\n2,test2")),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "gzip", Name: "compress.001.csv.gzip"},
Content: s.getCompressedData(mydump.CompressionGZ, []byte("3,test3\n4,test4")),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "gzip", Name: "compress.002.csv"},
Content: []byte("5,test5\n6,test6\n7,test7\n8,test8\n9,test9"),
})

sql := fmt.Sprintf(`IMPORT INTO gzip.t FROM 'gs://gzip/compress.*?endpoint=%s'
WITH thread=1;`, gcsEndpoint)
s.tk.MustQuery(sql)
s.tk.MustQuery("SELECT * FROM gzip.t;").Check(testkit.Rows(
"1 test1", "2 test2", "3 test3", "4 test4",
"5 test5", "6 test6", "7 test7", "8 test8", "9 test9",
))

// with ignore N rows
s.tk.MustExec("truncate table gzip.t")
sql = fmt.Sprintf(`IMPORT INTO gzip.t FROM 'gs://gzip/compress.*?endpoint=%s'
WITH skip_rows=1, thread=1;`, gcsEndpoint)
s.tk.MustQuery(sql)
s.tk.MustQuery("SELECT * FROM gzip.t;").Check(testkit.Rows(
"2 test2", "4 test4", "6 test6", "7 test7", "8 test8", "9 test9",
))
}

func (s *mockGCSSuite) TestZStd() {
s.prepareAndUseDB("zstd")
s.tk.MustExec("CREATE TABLE zstd.t (i INT PRIMARY KEY, s varchar(32));")

s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "zstd", Name: "t.01.csv.zst"},
Content: s.getCompressedData(mydump.CompressionZStd, []byte("1,test1\n2,test2")),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "zstd", Name: "t.02.csv.zstd"},
Content: s.getCompressedData(mydump.CompressionZStd, []byte("3,test3\n4,test4")),
})

sql := fmt.Sprintf(`IMPORT INTO zstd.t FROM 'gs://zstd/t.*?endpoint=%s'
WITH thread=1;`, gcsEndpoint)
s.tk.MustQuery(sql)
s.tk.MustQuery("SELECT * FROM zstd.t;").Check(testkit.Rows(
"1 test1", "2 test2", "3 test3", "4 test4",
))
}

func (s *mockGCSSuite) TestSnappy() {
s.prepareAndUseDB("snappy")
s.tk.MustExec("CREATE TABLE snappy.t (i INT PRIMARY KEY, s varchar(32));")

s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "snappy", Name: "t.01.csv.snappy"},
Content: s.getCompressedData(mydump.CompressionSnappy, []byte("1,test1\n2,test2")),
})

sql := fmt.Sprintf(`IMPORT INTO snappy.t FROM 'gs://snappy/t.*?endpoint=%s'
WITH thread=1;`, gcsEndpoint)
s.tk.MustQuery(sql)
s.tk.MustQuery("SELECT * FROM snappy.t;").Check(testkit.Rows(
"1 test1", "2 test2",
))
}
6 changes: 6 additions & 0 deletions tests/realtikvtest/importintotest3/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ func (s *mockGCSSuite) cleanupSysTables() {
s.tk.MustExec("delete from mysql.tidb_background_subtask")
}

func (s *mockGCSSuite) prepareAndUseDB(db string) {
s.tk.MustExec("drop database if exists " + db)
s.tk.MustExec("create database " + db)
s.tk.MustExec("use " + db)
}

func init() {
// need a real PD
config.UpdateGlobal(func(conf *config.Config) {
Expand Down
Loading