diff --git a/br/pkg/restore/ingestrec/ingest_recorder.go b/br/pkg/restore/ingestrec/ingest_recorder.go index e7ce1ba459d2b..3be88777ea099 100644 --- a/br/pkg/restore/ingestrec/ingest_recorder.go +++ b/br/pkg/restore/ingestrec/ingest_recorder.go @@ -69,13 +69,23 @@ func notAddIndexJob(job *model.Job) bool { job.Type != model.ActionAddPrimaryKey } -func notSynced(job *model.Job) bool { - return job.State != model.JobStateSynced +// the final state of the sub jobs is done instead of synced. +// +-----+-------------------------------------+--------------+-----+--------+ +// | ... | JOB_TYPE | SCHEMA_STATE | ... | STATE | +// +-----+-------------------------------------+--------------+-----+--------+ +// | ... | add index /* ingest */ | public | ... | synced | +// +-----+-------------------------------------+--------------+-----+--------+ +// | ... | alter table multi-schema change | none | ... | synced | +// +-----+-------------------------------------+--------------+-----+--------+ +// | ... | add index /* subjob */ /* ingest */ | public | ... | done | +// +-----+-------------------------------------+--------------+-----+--------+ +func notSynced(job *model.Job, isSubJob bool) bool { + return (job.State != model.JobStateSynced) && !(isSubJob && job.State == model.JobStateDone) } // AddJob firstly filters the ingest index add operation job, and records it into IngestRecorder. -func (i *IngestRecorder) AddJob(job *model.Job) error { - if job == nil || notIngestJob(job) || notAddIndexJob(job) || notSynced(job) { +func (i *IngestRecorder) AddJob(job *model.Job, isSubJob bool) error { + if job == nil || notIngestJob(job) || notAddIndexJob(job) || notSynced(job, isSubJob) { return nil } diff --git a/br/pkg/restore/ingestrec/ingest_recorder_test.go b/br/pkg/restore/ingestrec/ingest_recorder_test.go index dbf10819b9d5c..683d3e008bb9b 100644 --- a/br/pkg/restore/ingestrec/ingest_recorder_test.go +++ b/br/pkg/restore/ingestrec/ingest_recorder_test.go @@ -147,7 +147,7 @@ func TestAddIngestRecorder(t *testing.T) { getIndex(1, []string{"x", "y"}), }, nil, - )) + ), false) require.NoError(t, err) recorder.UpdateIndexInfo(allSchemas) err = recorder.Iterate(noItem) @@ -163,7 +163,7 @@ func TestAddIngestRecorder(t *testing.T) { getIndex(1, []string{"x", "y"}), }, nil, - )) + ), false) require.NoError(t, err) recorder.UpdateIndexInfo(allSchemas) err = recorder.Iterate(noItem) @@ -179,7 +179,7 @@ func TestAddIngestRecorder(t *testing.T) { getIndex(1, []string{"x", "y"}), }, nil, - )) + ), false) require.NoError(t, err) recorder.UpdateIndexInfo(allSchemas) err = recorder.Iterate(noItem) @@ -197,7 +197,7 @@ func TestAddIngestRecorder(t *testing.T) { getIndex(1, []string{"x", "y"}), }, json.RawMessage(`[1, "a"]`), - )) + ), false) require.NoError(t, err) f, cnt := hasOneItem(1, "%n,%n", []interface{}{"x", "y"}) recorder.UpdateIndexInfo(allSchemas) @@ -218,7 +218,27 @@ func TestAddIngestRecorder(t *testing.T) { getIndex(1, []string{"x", "y"}), }, json.RawMessage(`[1, "a"]`), - )) + ), false) + require.NoError(t, err) + f, cnt := hasOneItem(1, "%n,%n", []interface{}{"x", "y"}) + recorder.UpdateIndexInfo(allSchemas) + err = recorder.Iterate(f) + require.NoError(t, err) + require.Equal(t, *cnt, 1) + } + + { + // a sub job as add primary index job + err = recorder.AddJob(fakeJob( + model.ReorgTypeLitMerge, + model.ActionAddPrimaryKey, + model.JobStateDone, + 1000, + []*model.IndexInfo{ + getIndex(1, []string{"x", "y"}), + }, + json.RawMessage(`[1, "a"]`), + ), true) require.NoError(t, err) f, cnt := hasOneItem(1, "%n,%n", []interface{}{"x", "y"}) recorder.UpdateIndexInfo(allSchemas) @@ -293,7 +313,7 @@ func TestIndexesKind(t *testing.T) { getIndex(1, []string{"x"}), }, json.RawMessage(`[1, "a"]`), - )) + ), false) require.NoError(t, err) recorder.UpdateIndexInfo(allSchemas) var ( @@ -371,7 +391,7 @@ func TestRewriteTableID(t *testing.T) { getIndex(1, []string{"x", "y"}), }, json.RawMessage(`[1, "a"]`), - )) + ), false) require.NoError(t, err) recorder.UpdateIndexInfo(allSchemas) recorder.RewriteTableID(func(tableID int64) (int64, bool, error) { diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index ec29ea6f8fb17..759549ef07155 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -650,7 +650,7 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err return nil, nil } - return nil, sr.restoreFromHistory(job) + return nil, sr.restoreFromHistory(job, false) } return nil, nil } @@ -681,14 +681,14 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err } } -func (sr *SchemasReplace) restoreFromHistory(job *model.Job) error { +func (sr *SchemasReplace) restoreFromHistory(job *model.Job, isSubJob bool) error { if !job.IsCancelled() { switch job.Type { case model.ActionAddIndex, model.ActionAddPrimaryKey: if job.State == model.JobStateRollbackDone { return sr.deleteRange(job) } - err := sr.ingestRecorder.AddJob(job) + err := sr.ingestRecorder.AddJob(job, isSubJob) return errors.Trace(err) case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey, model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, @@ -697,7 +697,8 @@ func (sr *SchemasReplace) restoreFromHistory(job *model.Job) error { case model.ActionMultiSchemaChange: for _, sub := range job.MultiSchemaInfo.SubJobs { proxyJob := sub.ToProxyJob(job) - if err := sr.restoreFromHistory(&proxyJob); err != nil { + // ASSERT: the proxyJob can not be MultiSchemaInfo anymore + if err := sr.restoreFromHistory(&proxyJob, true); err != nil { return err } } diff --git a/br/pkg/stream/rewrite_meta_rawkv_test.go b/br/pkg/stream/rewrite_meta_rawkv_test.go index fe8045c2a1d59..752796c2ac574 100644 --- a/br/pkg/stream/rewrite_meta_rawkv_test.go +++ b/br/pkg/stream/rewrite_meta_rawkv_test.go @@ -926,7 +926,7 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { } // drop indexes(multi-schema-change) for table0 - err = schemaReplace.restoreFromHistory(multiSchemaChangeJob0) + err = schemaReplace.restoreFromHistory(multiSchemaChangeJob0, false) require.NoError(t, err) for l := 0; l < 2; l++ { for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { @@ -939,7 +939,7 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { } // drop indexes(multi-schema-change) for table1 - err = schemaReplace.restoreFromHistory(multiSchemaChangeJob1) + err = schemaReplace.restoreFromHistory(multiSchemaChangeJob1, false) require.NoError(t, err) for l := 0; l < 2; l++ { iargs = <-midr.indexCh