Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "cdc: log slow conflict detect every 60s (#11251) (#11287)" #11324

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 1 addition & 34 deletions cdc/sinkv2/eventsink/txn/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics/txn"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state"
"github.com/pingcap/tiflow/pkg/causality"
Expand All @@ -47,8 +46,6 @@ type worker struct {
flushInterval time.Duration
hasPending bool
postTxnExecutedCallbacks []func()

lastSlowConflictDetectLog map[model.TableID]time.Time
}

func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *worker {
Expand All @@ -71,8 +68,6 @@ func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *w
flushInterval: backend.MaxFlushInterval(),
hasPending: false,
postTxnExecutedCallbacks: make([]func(), 0, 1024),

lastSlowConflictDetectLog: make(map[model.TableID]time.Time),
}
}

Expand All @@ -90,9 +85,6 @@ func (w *worker) run(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error {
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID))

cleanSlowLogHistory := time.NewTicker(time.Hour)
defer cleanSlowLogHistory.Stop()

start := time.Now()
for {
select {
Expand All @@ -101,15 +93,6 @@ func (w *worker) run(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error {
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID))
return nil
case <-cleanSlowLogHistory.C:
lastSlowConflictDetectLog := w.lastSlowConflictDetectLog
w.lastSlowConflictDetectLog = make(map[model.TableID]time.Time)
now := time.Now()
for tableID, lastLog := range lastSlowConflictDetectLog {
if now.Sub(lastLog) <= time.Minute {
w.lastSlowConflictDetectLog[tableID] = lastLog
}
}
case txn := <-txnCh:
// we get the data from txnCh.out until no more data here or reach the state that can be flushed.
// If no more data in txnCh.out, and also not reach the state that can be flushed,
Expand Down Expand Up @@ -166,24 +149,8 @@ func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool {
return false
}

conflictDetectTime := txn.conflictResolved.Sub(txn.start).Seconds()
w.metricConflictDetectDuration.Observe(conflictDetectTime)
w.metricConflictDetectDuration.Observe(txn.conflictResolved.Sub(txn.start).Seconds())
w.metricQueueDuration.Observe(time.Since(txn.start).Seconds())

// Log tables which conflict detect time larger than 1 minute.
if conflictDetectTime > float64(60) {
now := time.Now()
// Log slow conflict detect tables every minute.
if lastLog, ok := w.lastSlowConflictDetectLog[txn.Event.TableInfo.ID]; !ok || now.Sub(lastLog) > time.Minute {
log.Warn("Transaction dmlSink finds a slow transaction in conflict detector",
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID),
zap.Int64("TableID", txn.Event.TableInfo.ID),
zap.Float64("seconds", conflictDetectTime))
w.lastSlowConflictDetectLog[txn.Event.Table.TableID] = now
}
}

w.metricTxnWorkerHandledRows.Add(float64(len(txn.Event.Rows)))
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, postTxnExecuted)
return w.backend.OnTxnEvent(txn.TxnCallbackableEvent)
Expand Down
Loading