diff --git a/cdc/sinkv2/eventsink/txn/worker.go b/cdc/sinkv2/eventsink/txn/worker.go index c387cec835f..b85cafa5c4f 100644 --- a/cdc/sinkv2/eventsink/txn/worker.go +++ b/cdc/sinkv2/eventsink/txn/worker.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/contextutil" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/metrics/txn" "github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state" "github.com/pingcap/tiflow/pkg/causality" @@ -46,6 +47,8 @@ type worker struct { flushInterval time.Duration hasPending bool postTxnExecutedCallbacks []func() + + lastSlowConflictDetectLog map[model.TableID]time.Time } func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *worker { @@ -68,6 +71,8 @@ func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *w flushInterval: backend.MaxFlushInterval(), hasPending: false, postTxnExecutedCallbacks: make([]func(), 0, 1024), + + lastSlowConflictDetectLog: make(map[model.TableID]time.Time), } } @@ -85,6 +90,9 @@ func (w *worker) run(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error { zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID)) + cleanSlowLogHistory := time.NewTicker(time.Hour) + defer cleanSlowLogHistory.Stop() + start := time.Now() for { select { @@ -93,6 +101,15 @@ func (w *worker) run(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error { zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID)) return nil + case <-cleanSlowLogHistory.C: + lastSlowConflictDetectLog := w.lastSlowConflictDetectLog + w.lastSlowConflictDetectLog = make(map[model.TableID]time.Time) + now := time.Now() + for tableID, lastLog := range lastSlowConflictDetectLog { + if now.Sub(lastLog) <= time.Minute { + w.lastSlowConflictDetectLog[tableID] = lastLog + } + } case txn := <-txnCh: // we get the data from txnCh.out until no more data here or reach the state that can be flushed. // If no more data in txnCh.out, and also not reach the state that can be flushed, @@ -149,8 +166,24 @@ func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool { return false } - w.metricConflictDetectDuration.Observe(txn.conflictResolved.Sub(txn.start).Seconds()) + conflictDetectTime := txn.conflictResolved.Sub(txn.start).Seconds() + w.metricConflictDetectDuration.Observe(conflictDetectTime) w.metricQueueDuration.Observe(time.Since(txn.start).Seconds()) + + // Log tables which conflict detect time larger than 1 minute. + if conflictDetectTime > float64(60) { + now := time.Now() + // Log slow conflict detect tables every minute. + if lastLog, ok := w.lastSlowConflictDetectLog[txn.Event.TableInfo.ID]; !ok || now.Sub(lastLog) > time.Minute { + log.Warn("Transaction dmlSink finds a slow transaction in conflict detector", + zap.String("changefeedID", w.changefeed), + zap.Int("workerID", w.ID), + zap.Int64("TableID", txn.Event.TableInfo.ID), + zap.Float64("seconds", conflictDetectTime)) + w.lastSlowConflictDetectLog[txn.Event.Table.TableID] = now + } + } + w.metricTxnWorkerHandledRows.Add(float64(len(txn.Event.Rows))) w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, postTxnExecuted) return w.backend.OnTxnEvent(txn.TxnCallbackableEvent)