Skip to content

Commit

Permalink
disttask/ddl: add log for add index operators (#51267)
Browse files Browse the repository at this point in the history
ref #48779
  • Loading branch information
ywqzzy authored Feb 26, 2024
1 parent b9e98ab commit 21cd1eb
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 9 deletions.
5 changes: 3 additions & 2 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ type OperatorCtx struct {
}

// NewOperatorCtx creates a new OperatorCtx.
func NewOperatorCtx(ctx context.Context) *OperatorCtx {
func NewOperatorCtx(ctx context.Context, taskID, subtaskID int64) *OperatorCtx {
opCtx, cancel := context.WithCancel(ctx)
opCtx = logutil.WithFields(opCtx, zap.Int64("task-id", taskID), zap.Int64("subtask-id", subtaskID))
return &OperatorCtx{
Context: opCtx,
cancel: cancel,
Expand Down Expand Up @@ -797,7 +798,7 @@ func (s *indexWriteResultSink) flush() error {
err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta())
return err
}
logutil.BgLogger().Error("flush error",
logutil.Logger(s.ctx).Error("flush error",
zap.String("category", "ddl"), zap.Error(err))
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
return err
}

opCtx := NewOperatorCtx(ctx)
opCtx := NewOperatorCtx(ctx, subtask.TaskID, subtask.ID)
defer opCtx.Cancel()
r.curRowCount.Store(0)

Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
Expand Down Expand Up @@ -197,7 +198,7 @@ func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported
se, _ := concurrency.NewSession(bc.etcdClient)
mu, err := acquireLock(bc.ctx, se, distLockKey)
if err != nil {
return true, false, err
return true, false, errors.Trace(err)
}
logutil.Logger(bc.ctx).Info("acquire distributed flush lock success", zap.Int64("jobID", bc.jobID))
defer func() {
Expand Down
10 changes: 5 additions & 5 deletions tests/realtikvtest/addindextest3/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestBackfillOperators(t *testing.T) {
var opTasks []ddl.TableScanTask
{
ctx := context.Background()
opCtx := ddl.NewOperatorCtx(ctx)
opCtx := ddl.NewOperatorCtx(ctx, 1, 1)
pTbl := tbl.(table.PhysicalTable)
src := ddl.NewTableScanTaskSource(opCtx, store, pTbl, startKey, endKey)
sink := newTestSink[ddl.TableScanTask]()
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestBackfillOperators(t *testing.T) {
}

ctx := context.Background()
opCtx := ddl.NewOperatorCtx(ctx)
opCtx := ddl.NewOperatorCtx(ctx, 1, 1)
src := newTestSource(opTasks...)
scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 3)
sink := newTestSink[ddl.IndexRecordChunk]()
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestBackfillOperators(t *testing.T) {
// Test IndexIngestOperator.
{
ctx := context.Background()
opCtx := ddl.NewOperatorCtx(ctx)
opCtx := ddl.NewOperatorCtx(ctx, 1, 1)
var keys, values [][]byte
onWrite := func(key, val []byte) {
keys = append(keys, key)
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestBackfillOperatorPipeline(t *testing.T) {
sessPool := newSessPoolForTest(t, store)

ctx := context.Background()
opCtx := ddl.NewOperatorCtx(ctx)
opCtx := ddl.NewOperatorCtx(ctx, 1, 1)
mockBackendCtx := &ingest.MockBackendCtx{}
mockEngine := ingest.NewMockEngineInfo(nil)
mockEngine.SetHook(func(key, val []byte) {})
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestBackfillOperatorPipelineException(t *testing.T) {
ddl.OperatorCallBackForTest = func() {
cancel()
}
opCtx := ddl.NewOperatorCtx(ctx)
opCtx := ddl.NewOperatorCtx(ctx, 1, 1)
pipeline, err := ddl.NewAddIndexIngestPipeline(
opCtx, store,
sessPool,
Expand Down

0 comments on commit 21cd1eb

Please sign in to comment.