Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importinto: remove useless code and add test #46413

Merged
merged 2 commits into from
Aug 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions br/pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ type BaseKVEncoder struct {

logger *zap.Logger
recordCache []types.Datum
// the first auto-generated ID in the current encoder.
// if there's no auto-generated id column or the column value is not auto-generated, it will be 0.
LastInsertID uint64
}

// NewBaseKVEncoder creates a new BaseKVEncoder.
Expand Down Expand Up @@ -259,9 +256,6 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(e.SessionCtx,
types.NewIntDatum(rowID), col.ToInfo(), false, false)
if err == nil && e.LastInsertID == 0 {
e.LastInsertID = value.GetUint64()
}
case e.IsAutoRandomCol(col.ToInfo()):
var val types.Datum
realRowID := e.AutoIDFn(rowID)
Expand All @@ -271,9 +265,6 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu
val = types.NewIntDatum(realRowID)
}
value, err = table.CastValue(e.SessionCtx, val, col.ToInfo(), false, false)
if err == nil && e.LastInsertID == 0 {
e.LastInsertID = value.GetUint64()
}
case col.IsGenerated():
// inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil.
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
Expand Down
12 changes: 11 additions & 1 deletion executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,38 @@ go_test(
srcs = [
"import_test.go",
"job_test.go",
"precheck_test.go",
"table_import_test.go",
],
embed = [":importer"],
flaky = True,
race = "on",
shard_count = 12,
shard_count = 13,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/config",
"//br/pkg/streamhelper",
"//br/pkg/utils",
"//config",
"//expression",
"//infoschema",
"//kv",
"//parser",
"//parser/ast",
"//parser/model",
"//planner/core",
"//testkit",
"//util/dbterror/exeerrors",
"//util/etcd",
"//util/logutil",
"//util/mock",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_server_v3//embed",
"@org_uber_go_zap//:zap",
],
)
1 change: 0 additions & 1 deletion executor/importer/engine_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,5 @@ func ProcessChunk(
return err
}
progress.AddColSize(encoder.GetColumnSize())
tableImporter.setLastInsertID(encoder.GetLastInsertID())
return nil
}
9 changes: 4 additions & 5 deletions executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,11 +1133,10 @@ type JobImportParam struct {

// JobImportResult is the result of the job import.
type JobImportResult struct {
Msg string
LastInsertID uint64
Affected uint64
Warnings []stmtctx.SQLWarn
ColSizeMap map[int64]int64
Msg string
Affected uint64
Warnings []stmtctx.SQLWarn
ColSizeMap map[int64]int64
}

// JobImporter is the interface for importing a job.
Expand Down
8 changes: 0 additions & 8 deletions executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ import (

type kvEncoder interface {
Encode(row []types.Datum, rowID int64) (*kv.Pairs, error)
// GetLastInsertID returns the first auto-generated ID in the current encoder.
// if there's no auto-generated id column or the column value is not auto-generated, it will be 0.
GetLastInsertID() uint64
// GetColumnSize returns the size of each column in the current encoder.
GetColumnSize() map[int64]int64
io.Closer
Expand Down Expand Up @@ -94,11 +91,6 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err
return en.Record2KV(record, row, rowID)
}

// GetLastInsertID implements the kvEncoder interface.
func (en *tableKVEncoder) GetLastInsertID() uint64 {
return en.LastInsertID
}

func (en *tableKVEncoder) GetColumnSize() map[int64]int64 {
sessionVars := en.SessionCtx.GetSessionVars()
sessionVars.TxnCtxMu.Lock()
Expand Down
10 changes: 5 additions & 5 deletions executor/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@
etcdDialTimeout = 5 * time.Second
)

// GetEtcdClient returns an etcd client.
// exported for testing.
var GetEtcdClient = getEtcdClient

// CheckRequirements checks the requirements for IMPORT INTO.
// we check the following things here:
// 1. target table should be empty
// 2. no CDC or PiTR tasks running
//
// todo: check if there's running lightning tasks?
// we check them one by one, and return the first error we meet.
// todo: check all items and return all errors at once.
func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec.SQLExecutor) error {
// todo: maybe we can reuse checker in lightning
if err := e.checkTotalFileSize(); err != nil {
return err
}
Expand Down Expand Up @@ -112,9 +114,7 @@
return nil
}

// GetEtcdClient returns an etcd client.
// exported for testing.
func GetEtcdClient() (*etcd.Client, error) {
func getEtcdClient() (*etcd.Client, error) {

Check warning on line 117 in executor/importer/precheck.go

View check run for this annotation

Codecov / codecov/patch

executor/importer/precheck.go#L117

Added line #L117 was not covered by tests
tidbCfg := tidb.GetGlobalConfig()
tls, err := util.NewTLSConfig(
util.WithCAPath(tidbCfg.Security.ClusterSSLCA),
Expand Down
148 changes: 148 additions & 0 deletions executor/importer/precheck_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package importer_test

import (
"context"
"fmt"
"math/rand"
"net/url"
"testing"
"time"

"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/executor/importer"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/dbterror/exeerrors"
"github.com/pingcap/tidb/util/etcd"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
)

const addrFmt = "http://127.0.0.1:%d"

func createMockETCD(t *testing.T) (string, *embed.Etcd) {
cfg := embed.NewConfig()
cfg.Dir = t.TempDir()
// rand port in [20000, 60000)
randPort := int(rand.Int31n(40000)) + 20000
clientAddr := fmt.Sprintf(addrFmt, randPort)
lcurl, _ := url.Parse(clientAddr)
cfg.LCUrls, cfg.ACUrls = []url.URL{*lcurl}, []url.URL{*lcurl}
lpurl, _ := url.Parse(fmt.Sprintf(addrFmt, randPort+1))
cfg.LPUrls, cfg.APUrls = []url.URL{*lpurl}, []url.URL{*lpurl}
cfg.InitialCluster = "default=" + lpurl.String()
cfg.Logger = "zap"
embedEtcd, err := embed.StartEtcd(cfg)
require.NoError(t, err)

select {
case <-embedEtcd.Server.ReadyNotify():
case <-time.After(5 * time.Second):
embedEtcd.Server.Stop() // trigger a shutdown
require.False(t, true, "server took too long to start")
}
return clientAddr, embedEtcd
}

func TestCheckRequirements(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
ctx := util.WithInternalSourceType(context.Background(), kv.InternalImportInto)
conn := tk.Session().(sqlexec.SQLExecutor)

_, err := conn.Execute(ctx, "create table test.t(id int primary key)")
require.NoError(t, err)
is := tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema)
tableObj, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

c := &importer.LoadDataController{
Plan: &importer.Plan{
DBName: "test",
},
Table: tableObj,
}
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataPreCheckFailed)

// now checkTotalFileSize pass, and try next pre-check item
c.TotalFileSize = 1
// non-empty table
_, err = conn.Execute(ctx, "insert into test.t values(1)")
require.NoError(t, err)
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataPreCheckFailed)
// table not exists
_, err = conn.Execute(ctx, "drop table if exists test.t")
require.NoError(t, err)
require.ErrorContains(t, c.CheckRequirements(ctx, conn), "doesn't exist")

// create table again, now checkTableEmpty pass
_, err = conn.Execute(ctx, "create table test.t(id int primary key)")
require.NoError(t, err)

clientAddr, embedEtcd := createMockETCD(t)
require.NotNil(t, embedEtcd)
t.Cleanup(func() {
embedEtcd.Close()
})
backup := importer.GetEtcdClient
importer.GetEtcdClient = func() (*etcd.Client, error) {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{clientAddr},
})
require.NoError(t, err)
return etcd.NewClient(etcdCli, ""), nil
}
t.Cleanup(func() {
importer.GetEtcdClient = backup
})
// mock a PiTR task
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{clientAddr},
})
t.Cleanup(func() {
require.NoError(t, etcdCli.Close())
})
require.NoError(t, err)
pitrKey := streamhelper.PrefixOfTask() + "test"
_, err = etcdCli.Put(ctx, pitrKey, "")
require.NoError(t, err)
err = c.CheckRequirements(ctx, conn)
require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed)
require.ErrorContains(t, err, "found PiTR log streaming")

// remove PiTR task, and mock a CDC task
_, err = etcdCli.Delete(ctx, pitrKey)
require.NoError(t, err)
// example: /tidb/cdc/<clusterID>/<namespace>/changefeed/info/<changefeedID>
cdcKey := utils.CDCPrefix + "test_cluster/test_ns/changefeed/info/test_cf"
_, err = etcdCli.Put(ctx, cdcKey, `{"state":"normal"}`)
require.NoError(t, err)
err = c.CheckRequirements(ctx, conn)
require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed)
require.ErrorContains(t, err, "found CDC changefeed")

// remove CDC task, pass
_, err = etcdCli.Delete(ctx, cdcKey)
require.NoError(t, err)
require.NoError(t, c.CheckRequirements(ctx, conn))
}
17 changes: 2 additions & 15 deletions executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,8 @@ type TableImporter struct {
logger *zap.Logger
regionSplitSize int64
regionSplitKeys int64
// the smallest auto-generated ID in current import.
// if there's no auto-generated id column or the column value is not auto-generated, it will be 0.
lastInsertID uint64
diskQuota int64
diskQuotaLock *syncutil.RWMutex
diskQuota int64
diskQuotaLock *syncutil.RWMutex
}

func (ti *TableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) {
Expand Down Expand Up @@ -503,16 +500,6 @@ func (ti *TableImporter) Close() error {
return nil
}

func (ti *TableImporter) setLastInsertID(id uint64) {
// todo: if we run concurrently, we should use atomic operation here.
if id == 0 {
return
}
if ti.lastInsertID == 0 || id < ti.lastInsertID {
ti.lastInsertID = id
}
}

// CheckDiskQuota checks disk quota.
func (ti *TableImporter) CheckDiskQuota(ctx context.Context) {
var locker sync.Locker
Expand Down
Loading