diff --git a/br/pkg/lightning/backend/kv/base.go b/br/pkg/lightning/backend/kv/base.go index 6e3f165d92fc6..e4c0dacc2eb2f 100644 --- a/br/pkg/lightning/backend/kv/base.go +++ b/br/pkg/lightning/backend/kv/base.go @@ -130,9 +130,6 @@ type BaseKVEncoder struct { logger *zap.Logger recordCache []types.Datum - // the first auto-generated ID in the current encoder. - // if there's no auto-generated id column or the column value is not auto-generated, it will be 0. - LastInsertID uint64 } // NewBaseKVEncoder creates a new BaseKVEncoder. @@ -259,9 +256,6 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu // we still need a conversion, e.g. to catch overflow with a TINYINT column. value, err = table.CastValue(e.SessionCtx, types.NewIntDatum(rowID), col.ToInfo(), false, false) - if err == nil && e.LastInsertID == 0 { - e.LastInsertID = value.GetUint64() - } case e.IsAutoRandomCol(col.ToInfo()): var val types.Datum realRowID := e.AutoIDFn(rowID) @@ -271,9 +265,6 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu val = types.NewIntDatum(realRowID) } value, err = table.CastValue(e.SessionCtx, val, col.ToInfo(), false, false) - if err == nil && e.LastInsertID == 0 { - e.LastInsertID = value.GetUint64() - } case col.IsGenerated(): // inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil. // if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic. diff --git a/executor/importer/BUILD.bazel b/executor/importer/BUILD.bazel index 86442bc60e615..7953ac80caecf 100644 --- a/executor/importer/BUILD.bazel +++ b/executor/importer/BUILD.bazel @@ -78,28 +78,38 @@ go_test( srcs = [ "import_test.go", "job_test.go", + "precheck_test.go", "table_import_test.go", ], embed = [":importer"], flaky = True, race = "on", - shard_count = 12, + shard_count = 13, deps = [ "//br/pkg/errors", "//br/pkg/lightning/config", + "//br/pkg/streamhelper", + "//br/pkg/utils", "//config", "//expression", + "//infoschema", + "//kv", "//parser", "//parser/ast", + "//parser/model", "//planner/core", "//testkit", "//util/dbterror/exeerrors", + "//util/etcd", "//util/logutil", "//util/mock", "//util/sqlexec", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//util", + "@io_etcd_go_etcd_client_v3//:client", + "@io_etcd_go_etcd_server_v3//embed", "@org_uber_go_zap//:zap", ], ) diff --git a/executor/importer/engine_process.go b/executor/importer/engine_process.go index 487542c597e2d..58ffca7e6c880 100644 --- a/executor/importer/engine_process.go +++ b/executor/importer/engine_process.go @@ -102,6 +102,5 @@ func ProcessChunk( return err } progress.AddColSize(encoder.GetColumnSize()) - tableImporter.setLastInsertID(encoder.GetLastInsertID()) return nil } diff --git a/executor/importer/import.go b/executor/importer/import.go index 9b9bcb34b7bd8..cfa7f51a7350e 100644 --- a/executor/importer/import.go +++ b/executor/importer/import.go @@ -1133,11 +1133,10 @@ type JobImportParam struct { // JobImportResult is the result of the job import. type JobImportResult struct { - Msg string - LastInsertID uint64 - Affected uint64 - Warnings []stmtctx.SQLWarn - ColSizeMap map[int64]int64 + Msg string + Affected uint64 + Warnings []stmtctx.SQLWarn + ColSizeMap map[int64]int64 } // JobImporter is the interface for importing a job. diff --git a/executor/importer/kv_encode.go b/executor/importer/kv_encode.go index 8f411f22ada6a..dbf13f0833764 100644 --- a/executor/importer/kv_encode.go +++ b/executor/importer/kv_encode.go @@ -35,9 +35,6 @@ import ( type kvEncoder interface { Encode(row []types.Datum, rowID int64) (*kv.Pairs, error) - // GetLastInsertID returns the first auto-generated ID in the current encoder. - // if there's no auto-generated id column or the column value is not auto-generated, it will be 0. - GetLastInsertID() uint64 // GetColumnSize returns the size of each column in the current encoder. GetColumnSize() map[int64]int64 io.Closer @@ -94,11 +91,6 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err return en.Record2KV(record, row, rowID) } -// GetLastInsertID implements the kvEncoder interface. -func (en *tableKVEncoder) GetLastInsertID() uint64 { - return en.LastInsertID -} - func (en *tableKVEncoder) GetColumnSize() map[int64]int64 { sessionVars := en.SessionCtx.GetSessionVars() sessionVars.TxnCtxMu.Lock() diff --git a/executor/importer/precheck.go b/executor/importer/precheck.go index c35885b31f48f..07f0bad0096b8 100644 --- a/executor/importer/precheck.go +++ b/executor/importer/precheck.go @@ -35,6 +35,10 @@ const ( etcdDialTimeout = 5 * time.Second ) +// GetEtcdClient returns an etcd client. +// exported for testing. +var GetEtcdClient = getEtcdClient + // CheckRequirements checks the requirements for IMPORT INTO. // we check the following things here: // 1. target table should be empty @@ -42,9 +46,7 @@ const ( // // todo: check if there's running lightning tasks? // we check them one by one, and return the first error we meet. -// todo: check all items and return all errors at once. func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec.SQLExecutor) error { - // todo: maybe we can reuse checker in lightning if err := e.checkTotalFileSize(); err != nil { return err } @@ -112,9 +114,7 @@ func (*LoadDataController) checkCDCPiTRTasks(ctx context.Context) error { return nil } -// GetEtcdClient returns an etcd client. -// exported for testing. -func GetEtcdClient() (*etcd.Client, error) { +func getEtcdClient() (*etcd.Client, error) { tidbCfg := tidb.GetGlobalConfig() tls, err := util.NewTLSConfig( util.WithCAPath(tidbCfg.Security.ClusterSSLCA), diff --git a/executor/importer/precheck_test.go b/executor/importer/precheck_test.go new file mode 100644 index 0000000000000..add962062ae0a --- /dev/null +++ b/executor/importer/precheck_test.go @@ -0,0 +1,148 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importer_test + +import ( + "context" + "fmt" + "math/rand" + "net/url" + "testing" + "time" + + "github.com/pingcap/tidb/br/pkg/streamhelper" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/executor/importer" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/dbterror/exeerrors" + "github.com/pingcap/tidb/util/etcd" + "github.com/pingcap/tidb/util/sqlexec" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/util" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/embed" +) + +const addrFmt = "http://127.0.0.1:%d" + +func createMockETCD(t *testing.T) (string, *embed.Etcd) { + cfg := embed.NewConfig() + cfg.Dir = t.TempDir() + // rand port in [20000, 60000) + randPort := int(rand.Int31n(40000)) + 20000 + clientAddr := fmt.Sprintf(addrFmt, randPort) + lcurl, _ := url.Parse(clientAddr) + cfg.LCUrls, cfg.ACUrls = []url.URL{*lcurl}, []url.URL{*lcurl} + lpurl, _ := url.Parse(fmt.Sprintf(addrFmt, randPort+1)) + cfg.LPUrls, cfg.APUrls = []url.URL{*lpurl}, []url.URL{*lpurl} + cfg.InitialCluster = "default=" + lpurl.String() + cfg.Logger = "zap" + embedEtcd, err := embed.StartEtcd(cfg) + require.NoError(t, err) + + select { + case <-embedEtcd.Server.ReadyNotify(): + case <-time.After(5 * time.Second): + embedEtcd.Server.Stop() // trigger a shutdown + require.False(t, true, "server took too long to start") + } + return clientAddr, embedEtcd +} + +func TestCheckRequirements(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + ctx := util.WithInternalSourceType(context.Background(), kv.InternalImportInto) + conn := tk.Session().(sqlexec.SQLExecutor) + + _, err := conn.Execute(ctx, "create table test.t(id int primary key)") + require.NoError(t, err) + is := tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema) + tableObj, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + + c := &importer.LoadDataController{ + Plan: &importer.Plan{ + DBName: "test", + }, + Table: tableObj, + } + require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataPreCheckFailed) + + // now checkTotalFileSize pass, and try next pre-check item + c.TotalFileSize = 1 + // non-empty table + _, err = conn.Execute(ctx, "insert into test.t values(1)") + require.NoError(t, err) + require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataPreCheckFailed) + // table not exists + _, err = conn.Execute(ctx, "drop table if exists test.t") + require.NoError(t, err) + require.ErrorContains(t, c.CheckRequirements(ctx, conn), "doesn't exist") + + // create table again, now checkTableEmpty pass + _, err = conn.Execute(ctx, "create table test.t(id int primary key)") + require.NoError(t, err) + + clientAddr, embedEtcd := createMockETCD(t) + require.NotNil(t, embedEtcd) + t.Cleanup(func() { + embedEtcd.Close() + }) + backup := importer.GetEtcdClient + importer.GetEtcdClient = func() (*etcd.Client, error) { + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{clientAddr}, + }) + require.NoError(t, err) + return etcd.NewClient(etcdCli, ""), nil + } + t.Cleanup(func() { + importer.GetEtcdClient = backup + }) + // mock a PiTR task + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{clientAddr}, + }) + t.Cleanup(func() { + require.NoError(t, etcdCli.Close()) + }) + require.NoError(t, err) + pitrKey := streamhelper.PrefixOfTask() + "test" + _, err = etcdCli.Put(ctx, pitrKey, "") + require.NoError(t, err) + err = c.CheckRequirements(ctx, conn) + require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed) + require.ErrorContains(t, err, "found PiTR log streaming") + + // remove PiTR task, and mock a CDC task + _, err = etcdCli.Delete(ctx, pitrKey) + require.NoError(t, err) + // example: /tidb/cdc///changefeed/info/ + cdcKey := utils.CDCPrefix + "test_cluster/test_ns/changefeed/info/test_cf" + _, err = etcdCli.Put(ctx, cdcKey, `{"state":"normal"}`) + require.NoError(t, err) + err = c.CheckRequirements(ctx, conn) + require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed) + require.ErrorContains(t, err, "found CDC changefeed") + + // remove CDC task, pass + _, err = etcdCli.Delete(ctx, cdcKey) + require.NoError(t, err) + require.NoError(t, c.CheckRequirements(ctx, conn)) +} diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index 25156eabb755a..6311fb4539fad 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -248,11 +248,8 @@ type TableImporter struct { logger *zap.Logger regionSplitSize int64 regionSplitKeys int64 - // the smallest auto-generated ID in current import. - // if there's no auto-generated id column or the column value is not auto-generated, it will be 0. - lastInsertID uint64 - diskQuota int64 - diskQuotaLock *syncutil.RWMutex + diskQuota int64 + diskQuotaLock *syncutil.RWMutex } func (ti *TableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) { @@ -503,16 +500,6 @@ func (ti *TableImporter) Close() error { return nil } -func (ti *TableImporter) setLastInsertID(id uint64) { - // todo: if we run concurrently, we should use atomic operation here. - if id == 0 { - return - } - if ti.lastInsertID == 0 || id < ti.lastInsertID { - ti.lastInsertID = id - } -} - // CheckDiskQuota checks disk quota. func (ti *TableImporter) CheckDiskQuota(ctx context.Context) { var locker sync.Locker