Skip to content

Commit

Permalink
importinto: remove useless code and add test (#46413)
Browse files Browse the repository at this point in the history
ref #42930
  • Loading branch information
D3Hunter authored Aug 26, 2023
1 parent b390a09 commit bfacaf8
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 44 deletions.
9 changes: 0 additions & 9 deletions br/pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ type BaseKVEncoder struct {

logger *zap.Logger
recordCache []types.Datum
// the first auto-generated ID in the current encoder.
// if there's no auto-generated id column or the column value is not auto-generated, it will be 0.
LastInsertID uint64
}

// NewBaseKVEncoder creates a new BaseKVEncoder.
Expand Down Expand Up @@ -259,9 +256,6 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(e.SessionCtx,
types.NewIntDatum(rowID), col.ToInfo(), false, false)
if err == nil && e.LastInsertID == 0 {
e.LastInsertID = value.GetUint64()
}
case e.IsAutoRandomCol(col.ToInfo()):
var val types.Datum
realRowID := e.AutoIDFn(rowID)
Expand All @@ -271,9 +265,6 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu
val = types.NewIntDatum(realRowID)
}
value, err = table.CastValue(e.SessionCtx, val, col.ToInfo(), false, false)
if err == nil && e.LastInsertID == 0 {
e.LastInsertID = value.GetUint64()
}
case col.IsGenerated():
// inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil.
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
Expand Down
12 changes: 11 additions & 1 deletion executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,28 +78,38 @@ go_test(
srcs = [
"import_test.go",
"job_test.go",
"precheck_test.go",
"table_import_test.go",
],
embed = [":importer"],
flaky = True,
race = "on",
shard_count = 12,
shard_count = 13,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/config",
"//br/pkg/streamhelper",
"//br/pkg/utils",
"//config",
"//expression",
"//infoschema",
"//kv",
"//parser",
"//parser/ast",
"//parser/model",
"//planner/core",
"//testkit",
"//util/dbterror/exeerrors",
"//util/etcd",
"//util/logutil",
"//util/mock",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_server_v3//embed",
"@org_uber_go_zap//:zap",
],
)
1 change: 0 additions & 1 deletion executor/importer/engine_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,5 @@ func ProcessChunk(
return err
}
progress.AddColSize(encoder.GetColumnSize())
tableImporter.setLastInsertID(encoder.GetLastInsertID())
return nil
}
9 changes: 4 additions & 5 deletions executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,11 +1133,10 @@ type JobImportParam struct {

// JobImportResult is the result of the job import.
type JobImportResult struct {
Msg string
LastInsertID uint64
Affected uint64
Warnings []stmtctx.SQLWarn
ColSizeMap map[int64]int64
Msg string
Affected uint64
Warnings []stmtctx.SQLWarn
ColSizeMap map[int64]int64
}

// JobImporter is the interface for importing a job.
Expand Down
8 changes: 0 additions & 8 deletions executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ import (

type kvEncoder interface {
Encode(row []types.Datum, rowID int64) (*kv.Pairs, error)
// GetLastInsertID returns the first auto-generated ID in the current encoder.
// if there's no auto-generated id column or the column value is not auto-generated, it will be 0.
GetLastInsertID() uint64
// GetColumnSize returns the size of each column in the current encoder.
GetColumnSize() map[int64]int64
io.Closer
Expand Down Expand Up @@ -94,11 +91,6 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err
return en.Record2KV(record, row, rowID)
}

// GetLastInsertID implements the kvEncoder interface.
func (en *tableKVEncoder) GetLastInsertID() uint64 {
return en.LastInsertID
}

func (en *tableKVEncoder) GetColumnSize() map[int64]int64 {
sessionVars := en.SessionCtx.GetSessionVars()
sessionVars.TxnCtxMu.Lock()
Expand Down
10 changes: 5 additions & 5 deletions executor/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ const (
etcdDialTimeout = 5 * time.Second
)

// GetEtcdClient returns an etcd client.
// exported for testing.
var GetEtcdClient = getEtcdClient

// CheckRequirements checks the requirements for IMPORT INTO.
// we check the following things here:
// 1. target table should be empty
// 2. no CDC or PiTR tasks running
//
// todo: check if there's running lightning tasks?
// we check them one by one, and return the first error we meet.
// todo: check all items and return all errors at once.
func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec.SQLExecutor) error {
// todo: maybe we can reuse checker in lightning
if err := e.checkTotalFileSize(); err != nil {
return err
}
Expand Down Expand Up @@ -112,9 +114,7 @@ func (*LoadDataController) checkCDCPiTRTasks(ctx context.Context) error {
return nil
}

// GetEtcdClient returns an etcd client.
// exported for testing.
func GetEtcdClient() (*etcd.Client, error) {
func getEtcdClient() (*etcd.Client, error) {
tidbCfg := tidb.GetGlobalConfig()
tls, err := util.NewTLSConfig(
util.WithCAPath(tidbCfg.Security.ClusterSSLCA),
Expand Down
148 changes: 148 additions & 0 deletions executor/importer/precheck_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package importer_test

import (
"context"
"fmt"
"math/rand"
"net/url"
"testing"
"time"

"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/executor/importer"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/dbterror/exeerrors"
"github.com/pingcap/tidb/util/etcd"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
)

const addrFmt = "http://127.0.0.1:%d"

func createMockETCD(t *testing.T) (string, *embed.Etcd) {
cfg := embed.NewConfig()
cfg.Dir = t.TempDir()
// rand port in [20000, 60000)
randPort := int(rand.Int31n(40000)) + 20000
clientAddr := fmt.Sprintf(addrFmt, randPort)
lcurl, _ := url.Parse(clientAddr)
cfg.LCUrls, cfg.ACUrls = []url.URL{*lcurl}, []url.URL{*lcurl}
lpurl, _ := url.Parse(fmt.Sprintf(addrFmt, randPort+1))
cfg.LPUrls, cfg.APUrls = []url.URL{*lpurl}, []url.URL{*lpurl}
cfg.InitialCluster = "default=" + lpurl.String()
cfg.Logger = "zap"
embedEtcd, err := embed.StartEtcd(cfg)
require.NoError(t, err)

select {
case <-embedEtcd.Server.ReadyNotify():
case <-time.After(5 * time.Second):
embedEtcd.Server.Stop() // trigger a shutdown
require.False(t, true, "server took too long to start")
}
return clientAddr, embedEtcd
}

func TestCheckRequirements(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
ctx := util.WithInternalSourceType(context.Background(), kv.InternalImportInto)
conn := tk.Session().(sqlexec.SQLExecutor)

_, err := conn.Execute(ctx, "create table test.t(id int primary key)")
require.NoError(t, err)
is := tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema)
tableObj, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

c := &importer.LoadDataController{
Plan: &importer.Plan{
DBName: "test",
},
Table: tableObj,
}
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataPreCheckFailed)

// now checkTotalFileSize pass, and try next pre-check item
c.TotalFileSize = 1
// non-empty table
_, err = conn.Execute(ctx, "insert into test.t values(1)")
require.NoError(t, err)
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataPreCheckFailed)
// table not exists
_, err = conn.Execute(ctx, "drop table if exists test.t")
require.NoError(t, err)
require.ErrorContains(t, c.CheckRequirements(ctx, conn), "doesn't exist")

// create table again, now checkTableEmpty pass
_, err = conn.Execute(ctx, "create table test.t(id int primary key)")
require.NoError(t, err)

clientAddr, embedEtcd := createMockETCD(t)
require.NotNil(t, embedEtcd)
t.Cleanup(func() {
embedEtcd.Close()
})
backup := importer.GetEtcdClient
importer.GetEtcdClient = func() (*etcd.Client, error) {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{clientAddr},
})
require.NoError(t, err)
return etcd.NewClient(etcdCli, ""), nil
}
t.Cleanup(func() {
importer.GetEtcdClient = backup
})
// mock a PiTR task
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{clientAddr},
})
t.Cleanup(func() {
require.NoError(t, etcdCli.Close())
})
require.NoError(t, err)
pitrKey := streamhelper.PrefixOfTask() + "test"
_, err = etcdCli.Put(ctx, pitrKey, "")
require.NoError(t, err)
err = c.CheckRequirements(ctx, conn)
require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed)
require.ErrorContains(t, err, "found PiTR log streaming")

// remove PiTR task, and mock a CDC task
_, err = etcdCli.Delete(ctx, pitrKey)
require.NoError(t, err)
// example: /tidb/cdc/<clusterID>/<namespace>/changefeed/info/<changefeedID>
cdcKey := utils.CDCPrefix + "test_cluster/test_ns/changefeed/info/test_cf"
_, err = etcdCli.Put(ctx, cdcKey, `{"state":"normal"}`)
require.NoError(t, err)
err = c.CheckRequirements(ctx, conn)
require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed)
require.ErrorContains(t, err, "found CDC changefeed")

// remove CDC task, pass
_, err = etcdCli.Delete(ctx, cdcKey)
require.NoError(t, err)
require.NoError(t, c.CheckRequirements(ctx, conn))
}
17 changes: 2 additions & 15 deletions executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,8 @@ type TableImporter struct {
logger *zap.Logger
regionSplitSize int64
regionSplitKeys int64
// the smallest auto-generated ID in current import.
// if there's no auto-generated id column or the column value is not auto-generated, it will be 0.
lastInsertID uint64
diskQuota int64
diskQuotaLock *syncutil.RWMutex
diskQuota int64
diskQuotaLock *syncutil.RWMutex
}

func (ti *TableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) {
Expand Down Expand Up @@ -503,16 +500,6 @@ func (ti *TableImporter) Close() error {
return nil
}

func (ti *TableImporter) setLastInsertID(id uint64) {
// todo: if we run concurrently, we should use atomic operation here.
if id == 0 {
return
}
if ti.lastInsertID == 0 || id < ti.lastInsertID {
ti.lastInsertID = id
}
}

// CheckDiskQuota checks disk quota.
func (ti *TableImporter) CheckDiskQuota(ctx context.Context) {
var locker sync.Locker
Expand Down

0 comments on commit bfacaf8

Please sign in to comment.