Skip to content

Commit f551d26

Browse files
authored
br: fix add ingest index as sub job (#47484) (#47556)
close #47482
1 parent 34db67e commit f551d26

File tree

4 files changed

+48
-17
lines changed

4 files changed

+48
-17
lines changed

br/pkg/restore/ingestrec/ingest_recorder.go

+14-4
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,23 @@ func notAddIndexJob(job *model.Job) bool {
6969
job.Type != model.ActionAddPrimaryKey
7070
}
7171

72-
func notSynced(job *model.Job) bool {
73-
return job.State != model.JobStateSynced
72+
// the final state of the sub jobs is done instead of synced.
73+
// +-----+-------------------------------------+--------------+-----+--------+
74+
// | ... | JOB_TYPE | SCHEMA_STATE | ... | STATE |
75+
// +-----+-------------------------------------+--------------+-----+--------+
76+
// | ... | add index /* ingest */ | public | ... | synced |
77+
// +-----+-------------------------------------+--------------+-----+--------+
78+
// | ... | alter table multi-schema change | none | ... | synced |
79+
// +-----+-------------------------------------+--------------+-----+--------+
80+
// | ... | add index /* subjob */ /* ingest */ | public | ... | done |
81+
// +-----+-------------------------------------+--------------+-----+--------+
82+
func notSynced(job *model.Job, isSubJob bool) bool {
83+
return (job.State != model.JobStateSynced) && !(isSubJob && job.State == model.JobStateDone)
7484
}
7585

7686
// AddJob firstly filters the ingest index add operation job, and records it into IngestRecorder.
77-
func (i *IngestRecorder) AddJob(job *model.Job) error {
78-
if job == nil || notIngestJob(job) || notAddIndexJob(job) || notSynced(job) {
87+
func (i *IngestRecorder) AddJob(job *model.Job, isSubJob bool) error {
88+
if job == nil || notIngestJob(job) || notAddIndexJob(job) || notSynced(job, isSubJob) {
7989
return nil
8090
}
8191

br/pkg/restore/ingestrec/ingest_recorder_test.go

+27-7
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func TestAddIngestRecorder(t *testing.T) {
147147
getIndex(1, []string{"x", "y"}),
148148
},
149149
nil,
150-
))
150+
), false)
151151
require.NoError(t, err)
152152
recorder.UpdateIndexInfo(allSchemas)
153153
err = recorder.Iterate(noItem)
@@ -163,7 +163,7 @@ func TestAddIngestRecorder(t *testing.T) {
163163
getIndex(1, []string{"x", "y"}),
164164
},
165165
nil,
166-
))
166+
), false)
167167
require.NoError(t, err)
168168
recorder.UpdateIndexInfo(allSchemas)
169169
err = recorder.Iterate(noItem)
@@ -179,7 +179,7 @@ func TestAddIngestRecorder(t *testing.T) {
179179
getIndex(1, []string{"x", "y"}),
180180
},
181181
nil,
182-
))
182+
), false)
183183
require.NoError(t, err)
184184
recorder.UpdateIndexInfo(allSchemas)
185185
err = recorder.Iterate(noItem)
@@ -197,7 +197,7 @@ func TestAddIngestRecorder(t *testing.T) {
197197
getIndex(1, []string{"x", "y"}),
198198
},
199199
json.RawMessage(`[1, "a"]`),
200-
))
200+
), false)
201201
require.NoError(t, err)
202202
f, cnt := hasOneItem(1, "%n,%n", []interface{}{"x", "y"})
203203
recorder.UpdateIndexInfo(allSchemas)
@@ -218,7 +218,27 @@ func TestAddIngestRecorder(t *testing.T) {
218218
getIndex(1, []string{"x", "y"}),
219219
},
220220
json.RawMessage(`[1, "a"]`),
221-
))
221+
), false)
222+
require.NoError(t, err)
223+
f, cnt := hasOneItem(1, "%n,%n", []interface{}{"x", "y"})
224+
recorder.UpdateIndexInfo(allSchemas)
225+
err = recorder.Iterate(f)
226+
require.NoError(t, err)
227+
require.Equal(t, *cnt, 1)
228+
}
229+
230+
{
231+
// a sub job as add primary index job
232+
err = recorder.AddJob(fakeJob(
233+
model.ReorgTypeLitMerge,
234+
model.ActionAddPrimaryKey,
235+
model.JobStateDone,
236+
1000,
237+
[]*model.IndexInfo{
238+
getIndex(1, []string{"x", "y"}),
239+
},
240+
json.RawMessage(`[1, "a"]`),
241+
), true)
222242
require.NoError(t, err)
223243
f, cnt := hasOneItem(1, "%n,%n", []interface{}{"x", "y"})
224244
recorder.UpdateIndexInfo(allSchemas)
@@ -293,7 +313,7 @@ func TestIndexesKind(t *testing.T) {
293313
getIndex(1, []string{"x"}),
294314
},
295315
json.RawMessage(`[1, "a"]`),
296-
))
316+
), false)
297317
require.NoError(t, err)
298318
recorder.UpdateIndexInfo(allSchemas)
299319
var (
@@ -371,7 +391,7 @@ func TestRewriteTableID(t *testing.T) {
371391
getIndex(1, []string{"x", "y"}),
372392
},
373393
json.RawMessage(`[1, "a"]`),
374-
))
394+
), false)
375395
require.NoError(t, err)
376396
recorder.UpdateIndexInfo(allSchemas)
377397
recorder.RewriteTableID(func(tableID int64) (int64, bool, error) {

br/pkg/stream/rewrite_meta_rawkv.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,7 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err
650650
return nil, nil
651651
}
652652

653-
return nil, sr.restoreFromHistory(job)
653+
return nil, sr.restoreFromHistory(job, false)
654654
}
655655
return nil, nil
656656
}
@@ -681,14 +681,14 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err
681681
}
682682
}
683683

684-
func (sr *SchemasReplace) restoreFromHistory(job *model.Job) error {
684+
func (sr *SchemasReplace) restoreFromHistory(job *model.Job, isSubJob bool) error {
685685
if !job.IsCancelled() {
686686
switch job.Type {
687687
case model.ActionAddIndex, model.ActionAddPrimaryKey:
688688
if job.State == model.JobStateRollbackDone {
689689
return sr.deleteRange(job)
690690
}
691-
err := sr.ingestRecorder.AddJob(job)
691+
err := sr.ingestRecorder.AddJob(job, isSubJob)
692692
return errors.Trace(err)
693693
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
694694
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn,
@@ -697,7 +697,8 @@ func (sr *SchemasReplace) restoreFromHistory(job *model.Job) error {
697697
case model.ActionMultiSchemaChange:
698698
for _, sub := range job.MultiSchemaInfo.SubJobs {
699699
proxyJob := sub.ToProxyJob(job)
700-
if err := sr.restoreFromHistory(&proxyJob); err != nil {
700+
// ASSERT: the proxyJob can not be MultiSchemaInfo anymore
701+
if err := sr.restoreFromHistory(&proxyJob, true); err != nil {
701702
return err
702703
}
703704
}

br/pkg/stream/rewrite_meta_rawkv_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,7 @@ func TestDeleteRangeForMDDLJob(t *testing.T) {
926926
}
927927

928928
// drop indexes(multi-schema-change) for table0
929-
err = schemaReplace.restoreFromHistory(multiSchemaChangeJob0)
929+
err = schemaReplace.restoreFromHistory(multiSchemaChangeJob0, false)
930930
require.NoError(t, err)
931931
for l := 0; l < 2; l++ {
932932
for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ {
@@ -939,7 +939,7 @@ func TestDeleteRangeForMDDLJob(t *testing.T) {
939939
}
940940

941941
// drop indexes(multi-schema-change) for table1
942-
err = schemaReplace.restoreFromHistory(multiSchemaChangeJob1)
942+
err = schemaReplace.restoreFromHistory(multiSchemaChangeJob1, false)
943943
require.NoError(t, err)
944944
for l := 0; l < 2; l++ {
945945
iargs = <-midr.indexCh

0 commit comments

Comments
 (0)