Skip to content

Commit

Permalink
importinto: precheck to avoid run 2 jobs to import the same table (#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Mar 21, 2024
1 parent 61ba639 commit 5e19822
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 15 deletions.
4 changes: 2 additions & 2 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func NewPlanFromLoadDataPlan(userSctx sessionctx.Context, plan *plannercore.Load
}

return &Plan{
DBName: plan.Table.Schema.O,
DBName: plan.Table.Schema.L,
DBID: plan.Table.DBInfo.ID,

Path: plan.Path,
Expand Down Expand Up @@ -402,7 +402,7 @@ func NewImportPlan(ctx context.Context, userSctx sessionctx.Context, plan *plann
p := &Plan{
TableInfo: tbl.Meta(),
DesiredTableInfo: tbl.Meta(),
DBName: plan.Table.Schema.O,
DBName: plan.Table.Schema.L,
DBID: plan.Table.DBInfo.ID,

Path: plan.Path,
Expand Down
10 changes: 7 additions & 3 deletions pkg/executor/importer/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,15 @@ func GetJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, user str

// GetActiveJobCnt returns the count of active import jobs.
// Active import jobs include pending and running jobs.
func GetActiveJobCnt(ctx context.Context, conn sqlexec.SQLExecutor) (int64, error) {
func GetActiveJobCnt(ctx context.Context, conn sqlexec.SQLExecutor, tableSchema, tableName string) (int64, error) {
ctx = util.WithInternalSourceType(ctx, kv.InternalImportInto)

sql := `select count(1) from mysql.tidb_import_jobs where status in (%?, %?)`
rs, err := conn.ExecuteInternal(ctx, sql, jobStatusPending, JobStatusRunning)
sql := `select count(1) from mysql.tidb_import_jobs
where status in (%?, %?)
and table_schema = %? and table_name = %?;
`
rs, err := conn.ExecuteInternal(ctx, sql, jobStatusPending, JobStatusRunning,
tableSchema, tableName)
if err != nil {
return 0, err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/executor/importer/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestJobHappyPath(t *testing.T) {
require.True(t, gotJobInfo.StartTime.IsZero())
require.True(t, gotJobInfo.EndTime.IsZero())
jobInfoEqual(t, jobInfo, gotJobInfo)
cnt, err := importer.GetActiveJobCnt(ctx, conn)
cnt, err := importer.GetActiveJobCnt(ctx, conn, gotJobInfo.TableSchema, gotJobInfo.TableName)
require.NoError(t, err)
require.Equal(t, int64(1), cnt)

Expand All @@ -114,13 +114,13 @@ func TestJobHappyPath(t *testing.T) {
jobInfo.Status = "running"
jobInfo.Step = importer.JobStepImporting
jobInfoEqual(t, jobInfo, gotJobInfo)
cnt, err = importer.GetActiveJobCnt(ctx, conn)
cnt, err = importer.GetActiveJobCnt(ctx, conn, gotJobInfo.TableSchema, gotJobInfo.TableName)
require.NoError(t, err)
require.Equal(t, int64(1), cnt)

// change job step
require.NoError(t, importer.Job2Step(ctx, conn, jobID, importer.JobStepValidating))
cnt, err = importer.GetActiveJobCnt(ctx, conn)
cnt, err = importer.GetActiveJobCnt(ctx, conn, gotJobInfo.TableSchema, gotJobInfo.TableName)
require.NoError(t, err)
require.Equal(t, int64(1), cnt)

Expand All @@ -136,7 +136,7 @@ func TestJobHappyPath(t *testing.T) {
jobInfo.Summary = c.expectedSummary
jobInfo.ErrorMessage = c.expectedErrMsg
jobInfoEqual(t, jobInfo, gotJobInfo)
cnt, err = importer.GetActiveJobCnt(ctx, conn)
cnt, err = importer.GetActiveJobCnt(ctx, conn, gotJobInfo.TableSchema, gotJobInfo.TableName)
require.NoError(t, err)
require.Equal(t, int64(0), cnt)

Expand Down Expand Up @@ -183,7 +183,7 @@ func TestGetAndCancelJob(t *testing.T) {
require.True(t, gotJobInfo.StartTime.IsZero())
require.True(t, gotJobInfo.EndTime.IsZero())
jobInfoEqual(t, jobInfo, gotJobInfo)
cnt, err := importer.GetActiveJobCnt(ctx, conn)
cnt, err := importer.GetActiveJobCnt(ctx, conn, gotJobInfo.TableSchema, gotJobInfo.TableName)
require.NoError(t, err)
require.Equal(t, int64(1), cnt)

Expand All @@ -198,7 +198,7 @@ func TestGetAndCancelJob(t *testing.T) {
jobInfo.Status = "cancelled"
jobInfo.ErrorMessage = "cancelled by user"
jobInfoEqual(t, jobInfo, gotJobInfo)
cnt, err = importer.GetActiveJobCnt(ctx, conn)
cnt, err = importer.GetActiveJobCnt(ctx, conn, gotJobInfo.TableSchema, gotJobInfo.TableName)
require.NoError(t, err)
require.Equal(t, int64(0), cnt)

Expand Down
16 changes: 13 additions & 3 deletions pkg/executor/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,23 @@ var GetEtcdClient = getEtcdClient

// CheckRequirements checks the requirements for IMPORT INTO.
// we check the following things here:
// 1. target table should be empty
// 2. no CDC or PiTR tasks running
// - when import from file
// 1. there is no active job on the target table
// 2. the total file size > 0
// 3. if global sort, thread count >= 16 and have required privileges
// - target table should be empty
// - no CDC or PiTR tasks running
//
// todo: check if there's running lightning tasks?
// we check them one by one, and return the first error we meet.
func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec.SQLExecutor) error {
if e.DataSourceType == DataSourceTypeFile {
cnt, err := GetActiveJobCnt(ctx, conn, e.Plan.DBName, e.Plan.TableInfo.Name.L)
if err != nil {
return errors.Trace(err)
}
if cnt > 0 {
return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs("there is active job on the target table already")
}
if err := e.checkTotalFileSize(); err != nil {
return err
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/executor/importer/precheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,28 @@ func TestCheckRequirements(t *testing.T) {
tableObj, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

// source data file size = 0
c := &importer.LoadDataController{
Plan: &importer.Plan{
DBName: "test",
DataSourceType: importer.DataSourceTypeFile,
TableInfo: tableObj.Meta(),
},
Table: tableObj,
}

// create a dummy job
_, err = importer.CreateJob(ctx, conn, "test", "tttt", tableObj.Meta().ID, "root", &importer.ImportParameters{}, 0)
require.NoError(t, err)
// there is active job on the target table already
jobID, err := importer.CreateJob(ctx, conn, "test", "t", tableObj.Meta().ID, "root", &importer.ImportParameters{}, 0)
require.NoError(t, err)
err = c.CheckRequirements(ctx, conn)
require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed)
require.ErrorContains(t, err, "there is active job on the target table already")
// cancel the job
require.NoError(t, importer.CancelJob(ctx, conn, jobID))

// source data file size = 0
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataPreCheckFailed)

// make checkTotalFileSize pass
Expand Down
6 changes: 6 additions & 0 deletions tests/realtikvtest/importintotest/import_into_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/pingcap/tidb/pkg/util/sem"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -957,6 +958,11 @@ func (s *mockGCSSuite) TestRegisterTask() {
}()
// wait for the task to be registered
<-importinto.TestSyncChan
// cannot run 2 import job to the same target table.
tk2 := testkit.NewTestKit(s.T(), s.store)
err = tk2.QueryToErr(sql)
s.ErrorIs(err, exeerrors.ErrLoadDataPreCheckFailed)
s.ErrorContains(err, "there is active job on the target table already")

client, err := importer.GetEtcdClient()
s.NoError(err)
Expand Down

0 comments on commit 5e19822

Please sign in to comment.