From 5e19822d0df0860f139a6f118c85a47b70c09497 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 21 Mar 2024 22:45:43 +0800 Subject: [PATCH] importinto: precheck to avoid run 2 jobs to import the same table (#51997) ref pingcap/tidb#49008 --- pkg/executor/importer/import.go | 4 ++-- pkg/executor/importer/job.go | 10 +++++++--- pkg/executor/importer/job_test.go | 12 ++++++------ pkg/executor/importer/precheck.go | 16 +++++++++++++--- pkg/executor/importer/precheck_test.go | 16 +++++++++++++++- .../importintotest/import_into_test.go | 6 ++++++ 6 files changed, 49 insertions(+), 15 deletions(-) diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index a9351ad74c264..b6829e4880255 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -358,7 +358,7 @@ func NewPlanFromLoadDataPlan(userSctx sessionctx.Context, plan *plannercore.Load } return &Plan{ - DBName: plan.Table.Schema.O, + DBName: plan.Table.Schema.L, DBID: plan.Table.DBInfo.ID, Path: plan.Path, @@ -402,7 +402,7 @@ func NewImportPlan(ctx context.Context, userSctx sessionctx.Context, plan *plann p := &Plan{ TableInfo: tbl.Meta(), DesiredTableInfo: tbl.Meta(), - DBName: plan.Table.Schema.O, + DBName: plan.Table.Schema.L, DBID: plan.Table.DBInfo.ID, Path: plan.Path, diff --git a/pkg/executor/importer/job.go b/pkg/executor/importer/job.go index 83ce4b0ac8d5a..dc0dec7b20d90 100644 --- a/pkg/executor/importer/job.go +++ b/pkg/executor/importer/job.go @@ -163,11 +163,15 @@ func GetJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, user str // GetActiveJobCnt returns the count of active import jobs. // Active import jobs include pending and running jobs. -func GetActiveJobCnt(ctx context.Context, conn sqlexec.SQLExecutor) (int64, error) { +func GetActiveJobCnt(ctx context.Context, conn sqlexec.SQLExecutor, tableSchema, tableName string) (int64, error) { ctx = util.WithInternalSourceType(ctx, kv.InternalImportInto) - sql := `select count(1) from mysql.tidb_import_jobs where status in (%?, %?)` - rs, err := conn.ExecuteInternal(ctx, sql, jobStatusPending, JobStatusRunning) + sql := `select count(1) from mysql.tidb_import_jobs + where status in (%?, %?) + and table_schema = %? and table_name = %?; + ` + rs, err := conn.ExecuteInternal(ctx, sql, jobStatusPending, JobStatusRunning, + tableSchema, tableName) if err != nil { return 0, err } diff --git a/pkg/executor/importer/job_test.go b/pkg/executor/importer/job_test.go index 1c272c04a574f..a62c2778404c1 100644 --- a/pkg/executor/importer/job_test.go +++ b/pkg/executor/importer/job_test.go @@ -94,7 +94,7 @@ func TestJobHappyPath(t *testing.T) { require.True(t, gotJobInfo.StartTime.IsZero()) require.True(t, gotJobInfo.EndTime.IsZero()) jobInfoEqual(t, jobInfo, gotJobInfo) - cnt, err := importer.GetActiveJobCnt(ctx, conn) + cnt, err := importer.GetActiveJobCnt(ctx, conn, gotJobInfo.TableSchema, gotJobInfo.TableName) require.NoError(t, err) require.Equal(t, int64(1), cnt) @@ -114,13 +114,13 @@ func TestJobHappyPath(t *testing.T) { jobInfo.Status = "running" jobInfo.Step = importer.JobStepImporting jobInfoEqual(t, jobInfo, gotJobInfo) - cnt, err = importer.GetActiveJobCnt(ctx, conn) + cnt, err = importer.GetActiveJobCnt(ctx, conn, gotJobInfo.TableSchema, gotJobInfo.TableName) require.NoError(t, err) require.Equal(t, int64(1), cnt) // change job step require.NoError(t, importer.Job2Step(ctx, conn, jobID, importer.JobStepValidating)) - cnt, err = importer.GetActiveJobCnt(ctx, conn) + cnt, err = importer.GetActiveJobCnt(ctx, conn, gotJobInfo.TableSchema, gotJobInfo.TableName) require.NoError(t, err) require.Equal(t, int64(1), cnt) @@ -136,7 +136,7 @@ func TestJobHappyPath(t *testing.T) { jobInfo.Summary = c.expectedSummary jobInfo.ErrorMessage = c.expectedErrMsg jobInfoEqual(t, jobInfo, gotJobInfo) - cnt, err = importer.GetActiveJobCnt(ctx, conn) + cnt, err = importer.GetActiveJobCnt(ctx, conn, gotJobInfo.TableSchema, gotJobInfo.TableName) require.NoError(t, err) require.Equal(t, int64(0), cnt) @@ -183,7 +183,7 @@ func TestGetAndCancelJob(t *testing.T) { require.True(t, gotJobInfo.StartTime.IsZero()) require.True(t, gotJobInfo.EndTime.IsZero()) jobInfoEqual(t, jobInfo, gotJobInfo) - cnt, err := importer.GetActiveJobCnt(ctx, conn) + cnt, err := importer.GetActiveJobCnt(ctx, conn, gotJobInfo.TableSchema, gotJobInfo.TableName) require.NoError(t, err) require.Equal(t, int64(1), cnt) @@ -198,7 +198,7 @@ func TestGetAndCancelJob(t *testing.T) { jobInfo.Status = "cancelled" jobInfo.ErrorMessage = "cancelled by user" jobInfoEqual(t, jobInfo, gotJobInfo) - cnt, err = importer.GetActiveJobCnt(ctx, conn) + cnt, err = importer.GetActiveJobCnt(ctx, conn, gotJobInfo.TableSchema, gotJobInfo.TableName) require.NoError(t, err) require.Equal(t, int64(0), cnt) diff --git a/pkg/executor/importer/precheck.go b/pkg/executor/importer/precheck.go index 6a2f208318e36..c70d5891768c9 100644 --- a/pkg/executor/importer/precheck.go +++ b/pkg/executor/importer/precheck.go @@ -43,13 +43,23 @@ var GetEtcdClient = getEtcdClient // CheckRequirements checks the requirements for IMPORT INTO. // we check the following things here: -// 1. target table should be empty -// 2. no CDC or PiTR tasks running +// - when import from file +// 1. there is no active job on the target table +// 2. the total file size > 0 +// 3. if global sort, thread count >= 16 and have required privileges +// - target table should be empty +// - no CDC or PiTR tasks running // -// todo: check if there's running lightning tasks? // we check them one by one, and return the first error we meet. func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec.SQLExecutor) error { if e.DataSourceType == DataSourceTypeFile { + cnt, err := GetActiveJobCnt(ctx, conn, e.Plan.DBName, e.Plan.TableInfo.Name.L) + if err != nil { + return errors.Trace(err) + } + if cnt > 0 { + return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs("there is active job on the target table already") + } if err := e.checkTotalFileSize(); err != nil { return err } diff --git a/pkg/executor/importer/precheck_test.go b/pkg/executor/importer/precheck_test.go index db3a7532b4bea..db603ca63b289 100644 --- a/pkg/executor/importer/precheck_test.go +++ b/pkg/executor/importer/precheck_test.go @@ -79,14 +79,28 @@ func TestCheckRequirements(t *testing.T) { tableObj, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) - // source data file size = 0 c := &importer.LoadDataController{ Plan: &importer.Plan{ DBName: "test", DataSourceType: importer.DataSourceTypeFile, + TableInfo: tableObj.Meta(), }, Table: tableObj, } + + // create a dummy job + _, err = importer.CreateJob(ctx, conn, "test", "tttt", tableObj.Meta().ID, "root", &importer.ImportParameters{}, 0) + require.NoError(t, err) + // there is active job on the target table already + jobID, err := importer.CreateJob(ctx, conn, "test", "t", tableObj.Meta().ID, "root", &importer.ImportParameters{}, 0) + require.NoError(t, err) + err = c.CheckRequirements(ctx, conn) + require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed) + require.ErrorContains(t, err, "there is active job on the target table already") + // cancel the job + require.NoError(t, importer.CancelJob(ctx, conn, jobID)) + + // source data file size = 0 require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataPreCheckFailed) // make checkTotalFileSize pass diff --git a/tests/realtikvtest/importintotest/import_into_test.go b/tests/realtikvtest/importintotest/import_into_test.go index 8b8114773bb81..7103b4ba060b2 100644 --- a/tests/realtikvtest/importintotest/import_into_test.go +++ b/tests/realtikvtest/importintotest/import_into_test.go @@ -44,6 +44,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/auth" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" "github.com/pingcap/tidb/pkg/util/sem" "github.com/stretchr/testify/require" @@ -957,6 +958,11 @@ func (s *mockGCSSuite) TestRegisterTask() { }() // wait for the task to be registered <-importinto.TestSyncChan + // cannot run 2 import job to the same target table. + tk2 := testkit.NewTestKit(s.T(), s.store) + err = tk2.QueryToErr(sql) + s.ErrorIs(err, exeerrors.ErrLoadDataPreCheckFailed) + s.ErrorContains(err, "there is active job on the target table already") client, err := importer.GetEtcdClient() s.NoError(err)