Skip to content

Commit

Permalink
puller(ticdc): remove some lock from region_state.go (#7424)
Browse files Browse the repository at this point in the history
close #7423
  • Loading branch information
sdojjy authored Nov 1, 2022
1 parent 876c22a commit d80cbd2
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 87 deletions.
16 changes: 3 additions & 13 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ func (s *eventFeedSession) receiveFromStream(
}
if cevent.ResolvedTs != nil {
metricSendEventBatchResolvedSize.Observe(float64(len(cevent.ResolvedTs.Regions)))
err = s.sendResolvedTs(ctx, cevent.ResolvedTs, worker, addr)
err = s.sendResolvedTs(ctx, cevent.ResolvedTs, worker)
if err != nil {
return err
}
Expand Down Expand Up @@ -1233,7 +1233,7 @@ func (s *eventFeedSession) sendRegionChangeEvents(
) error {
statefulEvents := make([][]*regionStatefulEvent, worker.concurrency)
for i := 0; i < worker.concurrency; i++ {
// Allocate a buffer with 1.5x length than average to reduce reallocate.
// Allocate a buffer with 2x length than average to reduce reallocate.
buffLen := len(events) / worker.concurrency * 3 / 2
statefulEvents[i] = make([]*regionStatefulEvent, 0, buffLen)
}
Expand Down Expand Up @@ -1314,13 +1314,12 @@ func (s *eventFeedSession) sendResolvedTs(
ctx context.Context,
resolvedTs *cdcpb.ResolvedTs,
worker *regionWorker,
addr string,
) error {
statefulEvents := make([]*regionStatefulEvent, worker.concurrency)
// split resolved ts
for i := 0; i < worker.concurrency; i++ {
// Allocate a buffer with 1.5x length than average to reduce reallocate.
buffLen := len(resolvedTs.Regions) / worker.concurrency * 3 / 2
buffLen := len(resolvedTs.Regions) / worker.concurrency * 2
statefulEvents[i] = &regionStatefulEvent{
resolvedTsEvent: &resolvedTsEvent{
resolvedTs: resolvedTs.Ts,
Expand All @@ -1332,15 +1331,6 @@ func (s *eventFeedSession) sendResolvedTs(
for _, regionID := range resolvedTs.Regions {
state, ok := worker.getRegionState(regionID)
if ok {
if state.isStopped() {
log.Debug("drop resolved ts due to region feed stopped",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", state.requestID),
zap.String("addr", addr))
continue
}
slot := worker.inputCalcSlot(regionID)
statefulEvents[slot].resolvedTsEvent.regions = append(
statefulEvents[slot].resolvedTsEvent.regions, state,
Expand Down
63 changes: 6 additions & 57 deletions cdc/kv/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ type regionFeedState struct {
requestID uint64
stopped int32

lock sync.RWMutex
initialized bool
initialized atomic.Bool
matcher *matcher
startFeedTime time.Time
lastResolvedTs uint64
Expand All @@ -75,12 +74,6 @@ func (s *regionFeedState) start() {
s.matcher = newMatcher()
}

func (s *regionFeedState) getStartTime() time.Time {
s.lock.RLock()
defer s.lock.RUnlock()
return s.startFeedTime
}

func (s *regionFeedState) markStopped() {
atomic.StoreInt32(&s.stopped, 1)
}
Expand All @@ -90,86 +83,42 @@ func (s *regionFeedState) isStopped() bool {
}

func (s *regionFeedState) isInitialized() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.initialized
return s.initialized.Load()
}

func (s *regionFeedState) setInitialized() {
s.lock.Lock()
defer s.lock.Unlock()
s.initialized = true
}

func (s *regionFeedState) getRegionSpan() regionspan.ComparableSpan {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri.span
s.initialized.Store(true)
}

func (s *regionFeedState) getRegionID() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri.verID.GetID()
}

func (s *regionFeedState) getRequestID() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.requestID
}

func (s *regionFeedState) getLastResolvedTs() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.lastResolvedTs
return atomic.LoadUint64(&s.lastResolvedTs)
}

// updateResolvedTs update the resolved ts of the current region feed
func (s *regionFeedState) updateResolvedTs(resolvedTs uint64) {
if resolvedTs > s.getLastResolvedTs() {
s.lock.Lock()
defer s.lock.Unlock()
s.lastResolvedTs = resolvedTs
atomic.StoreUint64(&s.lastResolvedTs, resolvedTs)
}
}

func (s *regionFeedState) getStoreAddr() string {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri.rpcCtx.Addr
}

// setRegionInfoResolvedTs is only called when the region disconnect,
// to update the `singleRegionInfo` which is reused by reconnect.
func (s *regionFeedState) setRegionInfoResolvedTs() {
s.lock.RLock()
if s.lastResolvedTs <= s.sri.resolvedTs {
s.lock.RUnlock()
if s.getLastResolvedTs() <= s.sri.resolvedTs {
return
}
s.lock.RUnlock()

s.lock.Lock()
defer s.lock.Unlock()
s.sri.resolvedTs = s.lastResolvedTs
}

func (s *regionFeedState) getRegionInfoResolvedTs() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri.resolvedTs
}

func (s *regionFeedState) getRegionInfo() singleRegionInfo {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri
}

func (s *regionFeedState) getRegionMeta() (uint64, regionspan.ComparableSpan, time.Time, string) {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri.verID.GetID(), s.sri.span, s.startFeedTime, s.sri.rpcCtx.Addr
}

Expand Down
12 changes: 6 additions & 6 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState
zap.String("namespace", w.session.client.changefeed.Namespace),
zap.String("changefeed", w.session.client.changefeed.ID),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", state.getRequestID()),
zap.Stringer("span", state.getRegionSpan()),
zap.Uint64("resolvedTs", state.getRegionInfoResolvedTs()),
zap.Uint64("requestID", state.requestID),
zap.Stringer("span", state.sri.span),
zap.Uint64("resolvedTs", state.sri.resolvedTs),
zap.Error(err))
// if state is already marked stopped, it must have been or would be processed by `onRegionFail`
if state.isStopped() {
Expand Down Expand Up @@ -325,7 +325,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
zap.String("changefeed", w.session.client.changefeed.ID),
zap.String("addr", w.storeAddr),
zap.Uint64("regionID", rts.regionID),
zap.Stringer("span", state.getRegionSpan()),
zap.Stringer("span", state.sri.span),
zap.Duration("duration", sinceLastResolvedTs),
zap.Duration("lastEvent", sinceLastEvent),
zap.Uint64("resolvedTs", lastResolvedTs),
Expand Down Expand Up @@ -727,7 +727,7 @@ func (w *regionWorker) handleResolvedTs(
regions := make([]uint64, 0, len(revents.regions))

for _, state := range revents.regions {
if !state.isInitialized() {
if state.isStopped() || !state.isInitialized() {
continue
}
regionID := state.getRegionID()
Expand Down Expand Up @@ -762,7 +762,7 @@ func (w *regionWorker) handleResolvedTs(
default:
}
for _, state := range revents.regions {
if !state.isInitialized() {
if state.isStopped() || !state.isInitialized() {
continue
}
state.updateResolvedTs(resolvedTs)
Expand Down
17 changes: 6 additions & 11 deletions cdc/kv/region_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ func TestRegionStateManagerThreadSafe(t *testing.T) {
regionID := regionIDs[idx]
s, ok := rsm.getState(regionID)
require.True(t, ok)
s.lock.RLock()
require.Equal(t, uint64(idx+1), s.requestID)
s.lock.RUnlock()
}
}()
}
Expand All @@ -79,10 +77,10 @@ func TestRegionStateManagerThreadSafe(t *testing.T) {
regionID := regionIDs[rand.Intn(regionCount)]
s, ok := rsm.getState(regionID)
require.True(t, ok)
s.lock.Lock()
s.lastResolvedTs += 10
s.lock.Unlock()
lastResolvedTs := s.getLastResolvedTs()
s.updateResolvedTs(s.getLastResolvedTs() + 10)
rsm.setState(regionID, s)
require.GreaterOrEqual(t, s.getLastResolvedTs(), lastResolvedTs)
}
}()
}
Expand All @@ -95,9 +93,6 @@ func TestRegionStateManagerThreadSafe(t *testing.T) {
require.Greater(t, s.lastResolvedTs, uint64(1000))
totalResolvedTs += s.lastResolvedTs
}
// 100 regions, initial resolved ts 1000;
// 2000 * resolved ts forward, increased by 10 each time, routine number is `concurrency`.
require.Equal(t, uint64(100*1000+2000*10*concurrency), totalResolvedTs)
}

func TestRegionStateManagerBucket(t *testing.T) {
Expand Down Expand Up @@ -273,19 +268,19 @@ func TestRegionWorkerHandleResolvedTs(t *testing.T) {
s1 := newRegionFeedState(singleRegionInfo{
verID: tikv.NewRegionVerID(1, 1, 1),
}, 1)
s1.initialized = true
s1.initialized.Store(true)
s1.lastResolvedTs = 9

s2 := newRegionFeedState(singleRegionInfo{
verID: tikv.NewRegionVerID(2, 2, 2),
}, 2)
s2.initialized = true
s2.initialized.Store(true)
s2.lastResolvedTs = 11

s3 := newRegionFeedState(singleRegionInfo{
verID: tikv.NewRegionVerID(3, 3, 3),
}, 3)
s3.initialized = false
s3.initialized.Store(false)
s3.lastResolvedTs = 8
err := w.handleResolvedTs(ctx, &resolvedTsEvent{
resolvedTs: 10,
Expand Down

0 comments on commit d80cbd2

Please sign in to comment.