diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index b9bb3d438f71a..4c1cfe7d60bce 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -150,7 +150,7 @@ func NewAddIndexIngestPipeline( srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey) scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt) - ingestOp := NewIndexIngestOperator(ctx, copCtx, sessPool, tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta) + ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool, tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta) sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, totalRowCount, metricCounter) operator.Compose[TableScanTask](srcOp, scanOp) @@ -534,16 +534,18 @@ func NewWriteExternalStoreOperator( writers = append(writers, writer) } - return &indexIngestWorker{ - ctx: ctx, - tbl: tbl, - indexes: indexes, - copCtx: copCtx, - se: nil, - sessPool: sessPool, - writers: writers, - srcChunkPool: srcChunkPool, - reorgMeta: reorgMeta, + return &indexIngestExternalWorker{ + indexIngestBaseWorker: indexIngestBaseWorker{ + ctx: ctx, + tbl: tbl, + indexes: indexes, + copCtx: copCtx, + se: nil, + sessPool: sessPool, + writers: writers, + srcChunkPool: srcChunkPool, + reorgMeta: reorgMeta, + }, } }) return &WriteExternalStoreOperator{ @@ -568,6 +570,7 @@ type IndexIngestOperator struct { func NewIndexIngestOperator( ctx *OperatorCtx, copCtx copr.CopContext, + backendCtx ingest.BackendCtx, sessPool opSessPool, tbl table.PhysicalTable, indexes []table.Index, @@ -593,16 +596,25 @@ func NewIndexIngestOperator( writers = append(writers, writer) } - return &indexIngestWorker{ - ctx: ctx, - tbl: tbl, - indexes: indexes, - copCtx: copCtx, - se: nil, - sessPool: sessPool, - writers: writers, - srcChunkPool: srcChunkPool, - reorgMeta: reorgMeta, + indexIDs := make([]int64, len(indexes)) + for i := 0; i < len(indexes); i++ { + indexIDs[i] = indexes[i].Meta().ID + } + return &indexIngestLocalWorker{ + indexIngestBaseWorker: indexIngestBaseWorker{ + ctx: ctx, + tbl: tbl, + indexes: indexes, + copCtx: copCtx, + + se: nil, + sessPool: sessPool, + writers: writers, + srcChunkPool: srcChunkPool, + reorgMeta: reorgMeta, + }, + indexIDs: indexIDs, + backendCtx: backendCtx, } }) return &IndexIngestOperator{ @@ -610,7 +622,47 @@ func NewIndexIngestOperator( } } -type indexIngestWorker struct { +type indexIngestExternalWorker struct { + indexIngestBaseWorker +} + +func (w *indexIngestExternalWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) { + defer tidbutil.Recover(metrics.LblAddIndex, "indexIngestExternalWorkerRecover", func() { + w.ctx.onError(errors.New("met panic in indexIngestExternalWorker")) + }, false) + defer func() { + if rs.Chunk != nil { + w.srcChunkPool <- rs.Chunk + } + }() + w.indexIngestBaseWorker.HandleTask(rs, send) +} + +type indexIngestLocalWorker struct { + indexIngestBaseWorker + indexIDs []int64 + backendCtx ingest.BackendCtx +} + +func (w *indexIngestLocalWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) { + defer tidbutil.Recover(metrics.LblAddIndex, "indexIngestLocalWorkerRecover", func() { + w.ctx.onError(errors.New("met panic in indexIngestLocalWorker")) + }, false) + defer func() { + if rs.Chunk != nil { + w.srcChunkPool <- rs.Chunk + } + }() + w.indexIngestBaseWorker.HandleTask(rs, send) + // needs to flush and import to avoid too much use of disk. + _, _, _, err := ingest.TryFlushAllIndexes(w.backendCtx, ingest.FlushModeAuto, w.indexIDs) + if err != nil { + w.ctx.onError(err) + return + } +} + +type indexIngestBaseWorker struct { ctx *OperatorCtx tbl table.PhysicalTable @@ -626,16 +678,7 @@ type indexIngestWorker struct { srcChunkPool chan *chunk.Chunk } -func (w *indexIngestWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) { - defer func() { - if rs.Chunk != nil { - w.srcChunkPool <- rs.Chunk - } - }() - defer tidbutil.Recover(metrics.LblAddIndex, "handleIndexIngtestTaskWithRecover", func() { - w.ctx.onError(errors.New("met panic in indexIngestWorker")) - }, false) - +func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk, send func(IndexWriteResult)) { failpoint.Inject("injectPanicForIndexIngest", func() { panic("mock panic") }) @@ -644,7 +687,7 @@ func (w *indexIngestWorker) HandleTask(rs IndexRecordChunk, send func(IndexWrite ID: rs.ID, } w.initSessCtx() - count, nextKey, err := w.WriteLocal(&rs) + count, nextKey, err := w.WriteChunk(&rs) if err != nil { w.ctx.onError(err) return @@ -662,7 +705,7 @@ func (w *indexIngestWorker) HandleTask(rs IndexRecordChunk, send func(IndexWrite send(result) } -func (w *indexIngestWorker) initSessCtx() { +func (w *indexIngestBaseWorker) initSessCtx() { if w.se == nil { sessCtx, err := w.sessPool.Get() if err != nil { @@ -681,7 +724,7 @@ func (w *indexIngestWorker) initSessCtx() { } } -func (w *indexIngestWorker) Close() { +func (w *indexIngestBaseWorker) Close() { for _, writer := range w.writers { err := writer.Close(w.ctx) if err != nil { @@ -694,8 +737,8 @@ func (w *indexIngestWorker) Close() { } } -// WriteLocal will write index records to lightning engine. -func (w *indexIngestWorker) WriteLocal(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) { +// WriteChunk will write index records to lightning engine. +func (w *indexIngestBaseWorker) WriteChunk(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) { failpoint.Inject("mockWriteLocalError", func(_ failpoint.Value) { failpoint.Return(0, nil, errors.New("mock write local error")) }) diff --git a/pkg/ddl/ingest/BUILD.bazel b/pkg/ddl/ingest/BUILD.bazel index 9e6435ed6c341..d6a6fb9dae60f 100644 --- a/pkg/ddl/ingest/BUILD.bazel +++ b/pkg/ddl/ingest/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "engine.go", "engine_mgr.go", "env.go", + "flush.go", "mem_root.go", "message.go", "mock.go", diff --git a/pkg/ddl/ingest/checkpoint.go b/pkg/ddl/ingest/checkpoint.go index 9be82954b39c5..67afd8dfd3a57 100644 --- a/pkg/ddl/ingest/checkpoint.go +++ b/pkg/ddl/ingest/checkpoint.go @@ -174,7 +174,7 @@ func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error { cp.currentKeys += added s.mu.Unlock() - flushed, imported, err := s.tryFlushAllIndexes(FlushModeAuto) + flushed, imported, _, err := TryFlushAllIndexes(s.flushCtrl, FlushModeAuto, s.indexIDs) if !flushed || err != nil { return err } @@ -194,20 +194,6 @@ func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error { return nil } -func (s *CheckpointManager) tryFlushAllIndexes(mode FlushMode) (flushed, imported bool, err error) { - allFlushed := true - allImported := true - for _, idxID := range s.indexIDs { - flushed, imported, err := s.flushCtrl.Flush(idxID, mode) - if err != nil { - return false, false, err - } - allFlushed = allFlushed && flushed - allImported = allImported && imported - } - return allFlushed, allImported, nil -} - func (s *CheckpointManager) progressLocalSyncMinKey() { for { cp := s.checkpoints[s.minTaskIDSynced+1] @@ -232,7 +218,7 @@ func (s *CheckpointManager) Close() { // Sync syncs the checkpoint. func (s *CheckpointManager) Sync() { - _, _, err := s.tryFlushAllIndexes(FlushModeForceLocal) + _, _, _, err := TryFlushAllIndexes(s.flushCtrl, FlushModeForceLocal, s.indexIDs) if err != nil { logutil.BgLogger().Warn("flush local engine failed", zap.String("category", "ddl-ingest"), zap.Error(err)) } diff --git a/pkg/ddl/ingest/flush.go b/pkg/ddl/ingest/flush.go new file mode 100644 index 0000000000000..4d407f8733eb1 --- /dev/null +++ b/pkg/ddl/ingest/flush.go @@ -0,0 +1,30 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingest + +// TryFlushAllIndexes tries to flush and import all indexes. +func TryFlushAllIndexes(flushCtrl FlushController, mode FlushMode, indexIDs []int64) (flushed, imported bool, failedIdxID int64, err error) { + allFlushed := true + allImported := true + for _, idxID := range indexIDs { + flushed, imported, err := flushCtrl.Flush(idxID, mode) + if err != nil { + return false, false, idxID, err + } + allFlushed = allFlushed && flushed + allImported = allImported && imported + } + return allFlushed, allImported, -1, nil +} diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index 067fdc672dd08..a1cf118e65227 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -135,13 +135,14 @@ func TestBackfillOperators(t *testing.T) { srcChkPool := make(chan *chunk.Chunk, regionCnt*2) pTbl := tbl.(table.PhysicalTable) index := tables.NewIndex(pTbl.GetPhysicalID(), tbl.Meta(), idxInfo) + mockBackendCtx := &ingest.MockBackendCtx{} mockEngine := ingest.NewMockEngineInfo(nil) mockEngine.SetHook(onWrite) src := newTestSource(chunkResults...) reorgMeta := ddl.NewDDLReorgMeta(tk.Session()) ingestOp := ddl.NewIndexIngestOperator( - opCtx, copCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine}, srcChkPool, 3, reorgMeta) + opCtx, copCtx, mockBackendCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine}, srcChkPool, 3, reorgMeta) sink := newTestSink[ddl.IndexWriteResult]() operator.Compose[ddl.IndexRecordChunk](src, ingestOp)