Skip to content

Commit

Permalink
Merge branch 'master' into fixChaos
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Sep 8, 2022
2 parents a01fa93 + cd17d86 commit c88aa9a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 53 deletions.
41 changes: 13 additions & 28 deletions cdc/kv/resolvedts_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,15 @@ import (
)

type tsItem struct {
sortByEvTime bool
resolvedTs uint64
eventTime time.Time
penalty int
resolvedTs uint64
eventTime time.Time
penalty int
}

func newResolvedTsItem(ts uint64) tsItem {
return tsItem{resolvedTs: ts, eventTime: time.Now()}
}

func newEventTimeItem() tsItem {
return tsItem{sortByEvTime: true, eventTime: time.Now()}
}

// regionTsInfo contains region resolvedTs information
type regionTsInfo struct {
regionID uint64
Expand All @@ -45,9 +40,6 @@ type regionTsHeap []*regionTsInfo
func (rh regionTsHeap) Len() int { return len(rh) }

func (rh regionTsHeap) Less(i, j int) bool {
if rh[i].ts.sortByEvTime {
return rh[i].ts.eventTime.Before(rh[j].ts.eventTime)
}
return rh[i].ts.resolvedTs < rh[j].ts.resolvedTs
}

Expand Down Expand Up @@ -92,23 +84,16 @@ func newRegionTsManager() *regionTsManager {
func (rm *regionTsManager) Upsert(item *regionTsInfo) {
if old, ok := rm.m[item.regionID]; ok {
// in a single resolved ts manager, we should not expect a fallback resolved event
// but it's ok that we use fallback resolved event to increase penalty
if !item.ts.sortByEvTime {
if item.ts.resolvedTs <= old.ts.resolvedTs && item.ts.eventTime.After(old.ts.eventTime) {
old.ts.penalty++
old.ts.eventTime = item.ts.eventTime
heap.Fix(&rm.h, old.index)
} else if item.ts.resolvedTs > old.ts.resolvedTs {
old.ts.resolvedTs = item.ts.resolvedTs
old.ts.eventTime = item.ts.eventTime
old.ts.penalty = 0
heap.Fix(&rm.h, old.index)
}
} else {
if item.ts.eventTime.After(old.ts.eventTime) {
old.ts.eventTime = item.ts.eventTime
heap.Fix(&rm.h, old.index)
}
// but, it's ok that we use fallback resolved event to increase penalty
if item.ts.resolvedTs <= old.ts.resolvedTs && item.ts.eventTime.After(old.ts.eventTime) {
old.ts.penalty++
old.ts.eventTime = item.ts.eventTime
heap.Fix(&rm.h, old.index)
} else if item.ts.resolvedTs > old.ts.resolvedTs {
old.ts.resolvedTs = item.ts.resolvedTs
old.ts.eventTime = item.ts.eventTime
old.ts.penalty = 0
heap.Fix(&rm.h, old.index)
}
} else {
heap.Push(&rm.h, item)
Expand Down
25 changes: 0 additions & 25 deletions cdc/kv/resolvedts_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package kv

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand All @@ -24,7 +23,6 @@ func checkRegionTsInfoWithoutEvTime(t *testing.T, obtained, expected *regionTsIn
require.Equal(t, expected.regionID, obtained.regionID)
require.Equal(t, expected.index, obtained.index)
require.Equal(t, expected.ts.resolvedTs, obtained.ts.resolvedTs)
require.False(t, obtained.ts.sortByEvTime)
}

func TestRegionTsManagerResolvedTs(t *testing.T) {
Expand Down Expand Up @@ -120,26 +118,3 @@ func TestRegionTsManagerPenaltyForFallBackEvent(t *testing.T) {
require.Equal(t, 0, rts.ts.penalty)
require.Equal(t, uint64(2000), rts.ts.resolvedTs)
}

func TestRegionTsManagerEvTime(t *testing.T) {
t.Parallel()
mgr := newRegionTsManager()
initRegions := []*regionTsInfo{
{regionID: 100, ts: newEventTimeItem()},
{regionID: 101, ts: newEventTimeItem()},
}
for _, item := range initRegions {
mgr.Upsert(item)
}
info := mgr.Remove(101)
require.Equal(t, uint64(101), info.regionID)

ts := time.Now()
mgr.Upsert(&regionTsInfo{regionID: 100, ts: newEventTimeItem()})
info = mgr.Pop()
require.Equal(t, uint64(100), info.regionID)
require.True(t, ts.Before(info.ts.eventTime))
require.True(t, time.Now().After(info.ts.eventTime))
info = mgr.Pop()
require.Nil(t, info)
}

0 comments on commit c88aa9a

Please sign in to comment.