Skip to content

Commit

Permalink
Merge branch 'release-4.0' into cherry-pick-1846-to-release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix authored Jun 21, 2021
2 parents e2b88b5 + ffbdae3 commit 9c6cd8f
Show file tree
Hide file tree
Showing 10 changed files with 662 additions and 224 deletions.
449 changes: 261 additions & 188 deletions cdc/kv/client.go

Large diffs are not rendered by default.

48 changes: 26 additions & 22 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,7 @@ ReceiveLoop:
}
}

// TestStreamSendWithErrorNormal mainly tests the scenario that the `Recv` call
// TestStreamRecvWithErrorNormal mainly tests the scenario that the `Recv` call
// of a gPRC stream in kv client meets a **logical related** error, and kv client
// logs the error and re-establish new request.
func (s *etcdSuite) TestStreamRecvWithErrorNormal(c *check.C) {
Expand All @@ -1497,7 +1497,7 @@ func (s *etcdSuite) TestStreamRecvWithErrorNormal(c *check.C) {
s.testStreamRecvWithError(c, "1*return(\"injected stream recv error\")")
}

// TestStreamSendWithErrorIOEOF mainly tests the scenario that the `Recv` call
// TestStreamRecvWithErrorIOEOF mainly tests the scenario that the `Recv` call
// of a gPRC stream in kv client meets error io.EOF, and kv client logs the error
// and re-establish new request
func (s *etcdSuite) TestStreamRecvWithErrorIOEOF(c *check.C) {
Expand Down Expand Up @@ -2422,7 +2422,7 @@ func (s *clientSuite) TestSingleRegionInfoClone(c *check.C) {
c.Assert(sri.span.String(), check.Equals, "[61, 63)")
c.Assert(sri2.ts, check.Equals, uint64(2000))
c.Assert(sri2.span.String(), check.Equals, "[61, 62)")
c.Assert(sri2.rpcCtx, check.IsNil)
c.Assert(sri2.rpcCtx, check.DeepEquals, &tikv.RPCContext{})
}

// TestResolveLockNoCandidate tests the resolved ts manager can work normally
Expand Down Expand Up @@ -3138,7 +3138,9 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) {
lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
// The buffer size of event channel must be large enough because in the test
// case we send events first, and then retrive all events from this channel.
eventCh := make(chan *model.RegionFeedEvent, 100)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh)
Expand All @@ -3159,10 +3161,29 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) {
}

// wait for all regions requested from cdc kv client
// Since there exists incremental scan limit in kv client, we must wait for
// the ready region and send initialized event.
sent := make(map[uint64]bool, regionNum)
err = retry.Run(time.Millisecond*200, 20, func() error {
count := 0
requestIDs.Range(func(_, _ interface{}) bool {
// send initialized event and a resolved ts event to each region
requestIDs.Range(func(key, value interface{}) bool {
count++
regionID := key.(uint64)
requestID := value.(uint64)
if _, ok := sent[regionID]; !ok {
initialized := mockInitializedEvent(regionID, requestID)
ch1 <- initialized
sent[regionID] = true
resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: regionID,
RequestId: requestID,
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120},
},
}}
ch1 <- resolved
}
return true
})
if count == regionNum {
Expand All @@ -3172,23 +3193,6 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) {
})
c.Assert(err, check.IsNil)

// send initialized event and a resolved ts event to each region
requestIDs.Range(func(key, value interface{}) bool {
regionID := key.(uint64)
requestID := value.(uint64)
initialized := mockInitializedEvent(regionID, requestID)
ch1 <- initialized
resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: regionID,
RequestId: requestID,
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120},
},
}}
ch1 <- resolved
return true
})

resolvedCount := 0
checkEvent:
for {
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (s *eventFeedSession) receiveFromStreamV2(
err := s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(),
})
}, false /* initialized */)
if err != nil {
// The only possible is that the ctx is cancelled. Simply return.
return
Expand Down
8 changes: 8 additions & 0 deletions cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ var (
Name: "channel_size",
Help: "size of each channel in kv client",
}, []string{"id", "channel"})
clientRegionTokenSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "region_token",
Help: "size of region token in kv client",
}, []string{"store", "table", "changefeed"})
batchResolvedEventSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand All @@ -93,6 +100,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(pullEventCounter)
registry.MustRegister(sendEventCounter)
registry.MustRegister(clientChannelSize)
registry.MustRegister(clientRegionTokenSize)
registry.MustRegister(batchResolvedEventSize)
registry.MustRegister(etcdRequestCounter)
}
10 changes: 6 additions & 4 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,11 @@ func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, s
}
})

revokeToken := !state.initialized
return w.session.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: state.sri,
err: err,
})
}, revokeToken)
}

func (w *regionWorker) checkUnInitRegions(ctx context.Context) error {
Expand Down Expand Up @@ -452,7 +453,7 @@ func (w *regionWorker) handleEventEntry(
switch entry.Type {
case cdcpb.Event_INITIALIZED:
if time.Since(state.startFeedTime) > 20*time.Second {
log.Warn("The time cost of initializing is too mush",
log.Warn("The time cost of initializing is too much",
zap.Duration("timeCost", time.Since(state.startFeedTime)),
zap.Uint64("regionID", regionID))
}
Expand All @@ -468,7 +469,7 @@ func (w *regionWorker) handleEventEntry(

metricPullEventInitializedCounter.Inc()
state.initialized = true

w.session.regionRouter.Release(state.sri.rpcCtx.Addr)
cachedEvents := state.matcher.matchCachedRow()
for _, cachedEvent := range cachedEvents {
revent, err := assembleRowEvent(regionID, cachedEvent, w.enableOldValue)
Expand Down Expand Up @@ -605,13 +606,14 @@ func (w *regionWorker) evictAllRegions(ctx context.Context) error {
if state.lastResolvedTs > singleRegionInfo.ts {
singleRegionInfo.ts = state.lastResolvedTs
}
revokeToken := !state.initialized
state.lock.Unlock()
err = w.session.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: singleRegionInfo,
err: &rpcCtxUnavailableErr{
verID: singleRegionInfo.verID,
},
})
}, revokeToken)
return err == nil
})
}
Expand Down
163 changes: 163 additions & 0 deletions cdc/kv/token_region.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/ticdc/pkg/util"
"github.com/prometheus/client_golang/prometheus"
)

const (
// buffer size for ranged region consumer
regionRouterChanSize = 16
// sizedRegionRouter checks region buffer every 100ms
sizedRegionCheckInterval = 100 * time.Millisecond
)

// LimitRegionRouter defines an interface that can buffer singleRegionInfo
// and provide token based consumption
type LimitRegionRouter interface {
// Chan returns a singleRegionInfo channel that can be consumed from
Chan() <-chan singleRegionInfo
// AddRegion adds an singleRegionInfo to buffer, this function is thread-safe
AddRegion(task singleRegionInfo)
// Acquire acquires one token
Acquire(id string)
// Release gives back one token, this function is thread-safe
Release(id string)
// Run runs in background and does some logic work
Run(ctx context.Context) error
}

type srrMetrics struct {
changefeed string
table string
tokens map[string]prometheus.Gauge
}

func newSrrMetrics(ctx context.Context) *srrMetrics {
changefeed := util.ChangefeedIDFromCtx(ctx)
_, table := util.TableIDFromCtx(ctx)
return &srrMetrics{
changefeed: changefeed,
table: table,
tokens: make(map[string]prometheus.Gauge),
}
}

type sizedRegionRouter struct {
buffer map[string][]singleRegionInfo
output chan singleRegionInfo
lock sync.Mutex
metrics *srrMetrics
tokens map[string]int
sizeLimit int
}

// NewSizedRegionRouter creates a new sizedRegionRouter
func NewSizedRegionRouter(ctx context.Context, sizeLimit int) *sizedRegionRouter {
return &sizedRegionRouter{
buffer: make(map[string][]singleRegionInfo),
output: make(chan singleRegionInfo, regionRouterChanSize),
sizeLimit: sizeLimit,
tokens: make(map[string]int),
metrics: newSrrMetrics(ctx),
}
}

func (r *sizedRegionRouter) Chan() <-chan singleRegionInfo {
return r.output
}

func (r *sizedRegionRouter) AddRegion(sri singleRegionInfo) {
r.lock.Lock()
var id string
// if rpcCtx is not provided, use the default "" bucket
if sri.rpcCtx != nil {
id = sri.rpcCtx.Addr
}
if r.sizeLimit > r.tokens[id] && len(r.output) < regionRouterChanSize {
r.output <- sri
} else {
r.buffer[id] = append(r.buffer[id], sri)
}
r.lock.Unlock()
}

func (r *sizedRegionRouter) Acquire(id string) {
r.lock.Lock()
defer r.lock.Unlock()
r.tokens[id]++
if _, ok := r.metrics.tokens[id]; !ok {
r.metrics.tokens[id] = clientRegionTokenSize.WithLabelValues(id, r.metrics.table, r.metrics.changefeed)
}
r.metrics.tokens[id].Inc()
}

func (r *sizedRegionRouter) Release(id string) {
r.lock.Lock()
defer r.lock.Unlock()
r.tokens[id]--
if _, ok := r.metrics.tokens[id]; !ok {
r.metrics.tokens[id] = clientRegionTokenSize.WithLabelValues(id, r.metrics.table, r.metrics.changefeed)
}
r.metrics.tokens[id].Dec()
}

func (r *sizedRegionRouter) Run(ctx context.Context) error {
ticker := time.NewTicker(sizedRegionCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
r.lock.Lock()
for id, buf := range r.buffer {
available := r.sizeLimit - r.tokens[id]
// the tokens used could be more then size limit, since we have
// a sized channel as level1 cache
if available <= 0 {
continue
}
if available > len(buf) {
available = len(buf)
}
// to avoid deadlock because when consuming from the output channel.
// onRegionFail could decrease tokens, which requires lock protection.
if available > regionRouterChanSize-len(r.output) {
available = regionRouterChanSize - len(r.output)
}
if available == 0 {
continue
}
for i := 0; i < available; i++ {
select {
case <-ctx.Done():
r.lock.Unlock()
return errors.Trace(ctx.Err())
case r.output <- buf[i]:
}
}
r.buffer[id] = r.buffer[id][available:]
}
r.lock.Unlock()
}
}
}
Loading

0 comments on commit 9c6cd8f

Please sign in to comment.