From b44ca2694850bbf39f8a8fe3c5ed36b7d9222a0c Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Mon, 6 Feb 2023 23:31:49 +0800 Subject: [PATCH 1/5] lightning: support compression when sending kv pairs to tikv --- br/pkg/lightning/backend/local/local.go | 63 ++++++++++++++----- br/pkg/lightning/config/config.go | 53 ++++++++++++++++ br/pkg/lightning/config/config_test.go | 14 +++++ .../lightning_import_compress/config.toml | 5 ++ .../lightning_import_compress/config_gz.toml | 6 ++ .../config_gzip.toml | 6 ++ br/tests/lightning_import_compress/run.sh | 56 +++++++++++++++++ 7 files changed, 187 insertions(+), 16 deletions(-) create mode 100644 br/tests/lightning_import_compress/config.toml create mode 100644 br/tests/lightning_import_compress/config_gz.toml create mode 100644 br/tests/lightning_import_compress/config_gzip.toml create mode 100644 br/tests/lightning_import_compress/run.sh diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 6f0a2e6a130fd..1cdee0f7aac9b 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "math" + "net" "os" "path/filepath" "strings" @@ -130,18 +131,25 @@ type ImportClientFactory interface { } type importClientFactoryImpl struct { - conns *common.GRPCConns - splitCli split.SplitClient - tls *common.TLS - tcpConcurrency int + conns *common.GRPCConns + splitCli split.SplitClient + tls *common.TLS + tcpConcurrency int + compressionType config.CompressionType } -func newImportClientFactoryImpl(splitCli split.SplitClient, tls *common.TLS, tcpConcurrency int) *importClientFactoryImpl { +func newImportClientFactoryImpl( + splitCli split.SplitClient, + tls *common.TLS, + tcpConcurrency int, + compressionType config.CompressionType, +) *importClientFactoryImpl { return &importClientFactoryImpl{ - conns: common.NewGRPCConns(), - splitCli: splitCli, - tls: tls, - tcpConcurrency: tcpConcurrency, + conns: common.NewGRPCConns(), + splitCli: splitCli, + tls: tls, + tcpConcurrency: tcpConcurrency, + compressionType: compressionType, } } @@ -150,9 +158,11 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) if err != nil { return nil, errors.Trace(err) } - opt := grpc.WithTransportCredentials(insecure.NewCredentials()) + var opts []grpc.DialOption if f.tls.TLSConfig() != nil { - opt = grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig())) + opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig()))) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } ctx, cancel := context.WithTimeout(ctx, dialTimeout) @@ -163,10 +173,7 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) if addr == "" { addr = store.GetAddress() } - conn, err := grpc.DialContext( - ctx, - addr, - opt, + opts = append(opts, grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: gRPCKeepAliveTime, @@ -174,6 +181,21 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) PermitWithoutStream: true, }), ) + if f.compressionType != config.CompressionTypeNone { + opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(f.compressionType.String()))) + } + + failpoint.Inject("LoggingImportBytes", func() { + opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) { + conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", target) + if err != nil { + return nil, err + } + return &loggingConn{Conn: conn}, nil + })) + }) + + conn, err := grpc.DialContext(ctx, addr, opts...) cancel() if err != nil { return nil, errors.Trace(err) @@ -200,6 +222,15 @@ func (f *importClientFactoryImpl) Close() { f.conns.Close() } +type loggingConn struct { + net.Conn +} + +func (c loggingConn) Write(b []byte) (int, error) { + log.L().Debug("import write", zap.Int("bytes", len(b))) + return c.Conn.Write(b) +} + // Range record start and end key for localStoreDir.DB // so we can write it to tikv in streaming type Range struct { @@ -479,7 +510,7 @@ func NewLocalBackend( if err != nil { return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() } - importClientFactory := newImportClientFactoryImpl(splitCli, tls, rangeConcurrency) + importClientFactory := newImportClientFactoryImpl(splitCli, tls, rangeConcurrency, cfg.TikvImporter.CompressKVPairs) duplicateDetection := cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone keyAdapter := KeyAdapter(noopKeyAdapter{}) if duplicateDetection { diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index d14f12066c0f4..3d1583ca50733 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -468,6 +468,58 @@ func (dra DuplicateResolutionAlgorithm) String() string { } } +// CompressionType is the config type of compression algorithm. +type CompressionType int + +const ( + // CompressionTypeNone means no compression. + CompressionTypeNone CompressionType = iota + // CompressionTypeGzip means gzip compression. + CompressionTypeGzip +) + +func (t *CompressionType) UnmarshalTOML(v interface{}) error { + if val, ok := v.(string); ok { + return t.FromStringValue(val) + } + return errors.Errorf("invalid compression-type '%v', please choose valid option between ['gzip']", v) +} + +func (t CompressionType) MarshalText() ([]byte, error) { + return []byte(t.String()), nil +} + +func (t *CompressionType) FromStringValue(s string) error { + switch strings.ToLower(s) { + case "": + *t = CompressionTypeNone + case "gz", "gzip": + *t = CompressionTypeGzip + default: + return errors.Errorf("invalid compression-type '%s', please choose valid option between ['gzip']", s) + } + return nil +} + +func (t *CompressionType) MarshalJSON() ([]byte, error) { + return []byte(`"` + t.String() + `"`), nil +} + +func (t *CompressionType) UnmarshalJSON(data []byte) error { + return t.FromStringValue(strings.Trim(string(data), `"`)) +} + +func (t CompressionType) String() string { + switch t { + case CompressionTypeGzip: + return "gzip" + case CompressionTypeNone: + return "" + default: + panic(fmt.Sprintf("invalid compression type '%d'", t)) + } +} + // PostRestore has some options which will be executed after kv restored. type PostRestore struct { Checksum PostOpLevel `toml:"checksum" json:"checksum"` @@ -582,6 +634,7 @@ type TikvImporter struct { OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"` MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"` SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"` + CompressKVPairs CompressionType `toml:"compress-kv-pairs" json:"compress-kv-pairs"` RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"` RegionSplitKeys int `toml:"region-split-keys" json:"region-split-keys"` SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"` diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index f590391740ec4..2f38709ea9ca8 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -1144,3 +1144,17 @@ func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) { )) require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg)) } + +func TestCompressionType(t *testing.T) { + var ct config.CompressionType + require.NoError(t, ct.FromStringValue("")) + require.Equal(t, config.CompressionTypeNone, ct) + require.NoError(t, ct.FromStringValue("gzip")) + require.Equal(t, config.CompressionTypeGzip, ct) + require.NoError(t, ct.FromStringValue("gz")) + require.Equal(t, config.CompressionTypeGzip, ct) + require.EqualError(t, ct.FromStringValue("zstd"), "invalid compression-type 'zstd', please choose valid option between ['gzip']") + + require.Equal(t, "", config.CompressionTypeNone.String()) + require.Equal(t, "gzip", config.CompressionTypeGzip.String()) +} diff --git a/br/tests/lightning_import_compress/config.toml b/br/tests/lightning_import_compress/config.toml new file mode 100644 index 0000000000000..30df6c8e0c98e --- /dev/null +++ b/br/tests/lightning_import_compress/config.toml @@ -0,0 +1,5 @@ +[tikv-importer] +backend = 'local' + +[mydumper.csv] +header = false diff --git a/br/tests/lightning_import_compress/config_gz.toml b/br/tests/lightning_import_compress/config_gz.toml new file mode 100644 index 0000000000000..d26e6ae237c18 --- /dev/null +++ b/br/tests/lightning_import_compress/config_gz.toml @@ -0,0 +1,6 @@ +[tikv-importer] +backend = 'local' +compress-kv-pairs = 'gz' + +[mydumper.csv] +header = false diff --git a/br/tests/lightning_import_compress/config_gzip.toml b/br/tests/lightning_import_compress/config_gzip.toml new file mode 100644 index 0000000000000..24a873a27599b --- /dev/null +++ b/br/tests/lightning_import_compress/config_gzip.toml @@ -0,0 +1,6 @@ +[tikv-importer] +backend = 'local' +compress-kv-pairs = 'gzip' + +[mydumper.csv] +header = false diff --git a/br/tests/lightning_import_compress/run.sh b/br/tests/lightning_import_compress/run.sh new file mode 100644 index 0000000000000..6e390ff771e98 --- /dev/null +++ b/br/tests/lightning_import_compress/run.sh @@ -0,0 +1,56 @@ +#!/bin/bash +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu + +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/LoggingImportBytes=return" + +mkdir -p "$TEST_DIR/data" + +cat <"$TEST_DIR/data/test-schema-create.sql" +CREATE DATABASE test; +EOF +cat <"$TEST_DIR/data/test.t-schema.sql" +CREATE TABLE test.t (id int primary key, a int, b int, c int); +EOF + +# Generate 200k rows. Total size is about 5MiB. +for i in {1..200000}; do + echo "$i,$i,$i,$i" >>"$TEST_DIR/data/test.t.0.csv" +done + +LOG_FILE1="$TEST_DIR/lightning-import-compress1.log" +LOG_FILE2="$TEST_DIR/lightning-import-compress2.log" +LOG_FILE3="$TEST_DIR/lightning-import-compress3.log" + +run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config.toml" --log-file "$LOG_FILE1" -L debug +run_sql 'DROP DATABASE test;' +run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config_gz.toml" --log-file "$LOG_FILE2" -L debug +run_sql 'DROP DATABASE test;' +run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config_gzip.toml" --log-file "$LOG_FILE3" -L debug + +uncompress=$(grep "import write" /tmp/backup_restore_test/lightning-import-compress1.log | + grep -Eo "bytes=[0-9]+" | sed 's/bytes=//g' | awk '{sum+=$1} END {print sum}') +gzip=$(grep "import write" /tmp/backup_restore_test/lightning-import-compress2.log | + grep -Eo "bytes=[0-9]+" | sed 's/bytes=//g' | awk '{sum+=$1} END {print sum}') +gz=$(grep "import write" /tmp/backup_restore_test/lightning-import-compress3.log | + grep -Eo "bytes=[0-9]+" | sed 's/bytes=//g' | awk '{sum+=$1} END {print sum}') + +echo "uncompress: ${uncompress}, gzip: ${gzip}, gz: ${gz}" +if [ "$uncompress" -lt "$gzip" ] || [ "$uncompress" -lt "$gz" ]; then + echo "compress is not working" + exit 1 +fi From 4c2a4dd6e737b20d8dfb0df9231f81298a81322b Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Tue, 7 Feb 2023 21:34:19 +0800 Subject: [PATCH 2/5] Update br/tests/lightning_import_compress/run.sh --- br/tests/lightning_import_compress/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/tests/lightning_import_compress/run.sh b/br/tests/lightning_import_compress/run.sh index 6e390ff771e98..e6414881e4e0e 100644 --- a/br/tests/lightning_import_compress/run.sh +++ b/br/tests/lightning_import_compress/run.sh @@ -50,7 +50,7 @@ gz=$(grep "import write" /tmp/backup_restore_test/lightning-import-compress3.log grep -Eo "bytes=[0-9]+" | sed 's/bytes=//g' | awk '{sum+=$1} END {print sum}') echo "uncompress: ${uncompress}, gzip: ${gzip}, gz: ${gz}" -if [ "$uncompress" -lt "$gzip" ] || [ "$uncompress" -lt "$gz" ]; then +if [ "$uncompress" -le "$gzip" ] || [ "$uncompress" -le "$gz" ]; then echo "compress is not working" exit 1 fi From 1c3c845c802a7930d6d960136f5ced66502eac6f Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Wed, 8 Feb 2023 10:34:50 +0800 Subject: [PATCH 3/5] address comments --- br/pkg/lightning/backend/local/local.go | 12 +++++++++--- br/pkg/lightning/config/config.go | 16 ++++++++-------- br/pkg/lightning/config/config_test.go | 10 +++++----- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 1cdee0f7aac9b..cd2a7dd44c5cc 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -78,6 +78,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" ) @@ -165,6 +166,7 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } ctx, cancel := context.WithTimeout(ctx, dialTimeout) + defer cancel() bfConf := backoff.DefaultConfig bfConf.MaxDelay = gRPCBackOffMaxDelay @@ -181,8 +183,13 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) PermitWithoutStream: true, }), ) - if f.compressionType != config.CompressionTypeNone { - opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(f.compressionType.String()))) + switch f.compressionType { + case config.CompressionNone: + // do nothing + case config.CompressionGzip: + opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))) + default: + return nil, common.ErrInvalidConfig.GenWithStack("unsupported compression type %s", f.compressionType) } failpoint.Inject("LoggingImportBytes", func() { @@ -196,7 +203,6 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) }) conn, err := grpc.DialContext(ctx, addr, opts...) - cancel() if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 3d1583ca50733..103915f29879e 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -472,10 +472,10 @@ func (dra DuplicateResolutionAlgorithm) String() string { type CompressionType int const ( - // CompressionTypeNone means no compression. - CompressionTypeNone CompressionType = iota - // CompressionTypeGzip means gzip compression. - CompressionTypeGzip + // CompressionNone means no compression. + CompressionNone CompressionType = iota + // CompressionGzip means gzip compression. + CompressionGzip ) func (t *CompressionType) UnmarshalTOML(v interface{}) error { @@ -492,9 +492,9 @@ func (t CompressionType) MarshalText() ([]byte, error) { func (t *CompressionType) FromStringValue(s string) error { switch strings.ToLower(s) { case "": - *t = CompressionTypeNone + *t = CompressionNone case "gz", "gzip": - *t = CompressionTypeGzip + *t = CompressionGzip default: return errors.Errorf("invalid compression-type '%s', please choose valid option between ['gzip']", s) } @@ -511,9 +511,9 @@ func (t *CompressionType) UnmarshalJSON(data []byte) error { func (t CompressionType) String() string { switch t { - case CompressionTypeGzip: + case CompressionGzip: return "gzip" - case CompressionTypeNone: + case CompressionNone: return "" default: panic(fmt.Sprintf("invalid compression type '%d'", t)) diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index 2f38709ea9ca8..91edd8aa46c12 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -1148,13 +1148,13 @@ func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) { func TestCompressionType(t *testing.T) { var ct config.CompressionType require.NoError(t, ct.FromStringValue("")) - require.Equal(t, config.CompressionTypeNone, ct) + require.Equal(t, config.CompressionNone, ct) require.NoError(t, ct.FromStringValue("gzip")) - require.Equal(t, config.CompressionTypeGzip, ct) + require.Equal(t, config.CompressionGzip, ct) require.NoError(t, ct.FromStringValue("gz")) - require.Equal(t, config.CompressionTypeGzip, ct) + require.Equal(t, config.CompressionGzip, ct) require.EqualError(t, ct.FromStringValue("zstd"), "invalid compression-type 'zstd', please choose valid option between ['gzip']") - require.Equal(t, "", config.CompressionTypeNone.String()) - require.Equal(t, "gzip", config.CompressionTypeGzip.String()) + require.Equal(t, "", config.CompressionNone.String()) + require.Equal(t, "gzip", config.CompressionGzip.String()) } From 8f084dad10e81314412b54cace15db461bb8abb5 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Wed, 8 Feb 2023 10:55:56 +0800 Subject: [PATCH 4/5] fix bazel --- br/pkg/lightning/backend/local/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 2cd903d3bc290..42fbadabc9e11 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -70,6 +70,7 @@ go_library( "@org_golang_google_grpc//codes", "@org_golang_google_grpc//credentials", "@org_golang_google_grpc//credentials/insecure", + "@org_golang_google_grpc//encoding/gzip", "@org_golang_google_grpc//keepalive", "@org_golang_google_grpc//status", "@org_golang_x_exp//slices", From 4d8e6f467d5c219cec3d57e1a2453c1d71b50adb Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Wed, 8 Feb 2023 14:34:27 +0800 Subject: [PATCH 5/5] fix slow test lightning_compress --- br/tests/lightning_compress/config.toml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/br/tests/lightning_compress/config.toml b/br/tests/lightning_compress/config.toml index 000018c5c41d4..f4452fe7664a6 100644 --- a/br/tests/lightning_compress/config.toml +++ b/br/tests/lightning_compress/config.toml @@ -12,7 +12,3 @@ enable = true schema = "tidb_lightning_checkpoint_test" driver = "mysql" keep-after-success = true - -[tikv-importer] -send-kv-pairs=10 -region-split-size = 1024