Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#1899
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
amyangfei authored and ti-chi-bot committed Jun 4, 2021
1 parent a1067a0 commit 41001a7
Show file tree
Hide file tree
Showing 13 changed files with 652 additions and 171 deletions.
387 changes: 224 additions & 163 deletions cdc/kv/client.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1465,7 +1465,7 @@ ReceiveLoop:
}
}

// TestStreamSendWithErrorNormal mainly tests the scenario that the `Recv` call
// TestStreamRecvWithErrorNormal mainly tests the scenario that the `Recv` call
// of a gPRC stream in kv client meets a **logical related** error, and kv client
// logs the error and re-establish new request.
func (s *etcdSuite) TestStreamRecvWithErrorNormal(c *check.C) {
Expand All @@ -1485,7 +1485,7 @@ func (s *etcdSuite) TestStreamRecvWithErrorNormal(c *check.C) {
s.testStreamRecvWithError(c, "1*return(\"injected stream recv error\")")
}

// TestStreamSendWithErrorIOEOF mainly tests the scenario that the `Recv` call
// TestStreamRecvWithErrorIOEOF mainly tests the scenario that the `Recv` call
// of a gPRC stream in kv client meets error io.EOF, and kv client logs the error
// and re-establish new request
func (s *etcdSuite) TestStreamRecvWithErrorIOEOF(c *check.C) {
Expand Down Expand Up @@ -2398,7 +2398,7 @@ func (s *clientSuite) TestSingleRegionInfoClone(c *check.C) {
c.Assert(sri.span.String(), check.Equals, "[61, 63)")
c.Assert(sri2.ts, check.Equals, uint64(2000))
c.Assert(sri2.span.String(), check.Equals, "[61, 62)")
c.Assert(sri2.rpcCtx, check.IsNil)
c.Assert(sri2.rpcCtx, check.DeepEquals, &tikv.RPCContext{})
}

// TestResolveLockNoCandidate tests the resolved ts manager can work normally
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *eventFeedSession) receiveFromStreamV2(
err := s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(),
})
}, false /* initialized */)
if err != nil {
// The only possible is that the ctx is cancelled. Simply return.
return
Expand Down
8 changes: 8 additions & 0 deletions cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ var (
Name: "channel_size",
Help: "size of each channel in kv client",
}, []string{"id", "channel"})
clientRegionTokenSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "region_token",
Help: "size of region token in kv client",
}, []string{"store", "table", "changefeed"})
batchResolvedEventSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand All @@ -93,6 +100,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(pullEventCounter)
registry.MustRegister(sendEventCounter)
registry.MustRegister(clientChannelSize)
registry.MustRegister(clientRegionTokenSize)
registry.MustRegister(batchResolvedEventSize)
registry.MustRegister(etcdRequestCounter)
}
9 changes: 6 additions & 3 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,11 @@ func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, s
}
})

revokeToken := !state.initialized
return w.session.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: state.sri,
err: err,
})
}, revokeToken)
}

func (w *regionWorker) resolveLock(ctx context.Context) error {
Expand Down Expand Up @@ -359,12 +360,13 @@ func (w *regionWorker) handleEventEntry(
switch entry.Type {
case cdcpb.Event_INITIALIZED:
if time.Since(state.startFeedTime) > 20*time.Second {
log.Warn("The time cost of initializing is too mush",
log.Warn("The time cost of initializing is too much",
zap.Duration("timeCost", time.Since(state.startFeedTime)),
zap.Uint64("regionID", regionID))
}
metricPullEventInitializedCounter.Inc()
state.initialized = true
w.session.regionRouter.Release(state.sri.rpcCtx.Addr)
cachedEvents := state.matcher.matchCachedRow()
for _, cachedEvent := range cachedEvents {
revent, err := assembleRowEvent(regionID, cachedEvent, w.enableOldValue)
Expand Down Expand Up @@ -501,13 +503,14 @@ func (w *regionWorker) evictAllRegions(ctx context.Context) error {
if state.lastResolvedTs > singleRegionInfo.ts {
singleRegionInfo.ts = state.lastResolvedTs
}
revokeToken := !state.initialized
state.lock.Unlock()
err = w.session.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: singleRegionInfo,
err: &rpcCtxUnavailableErr{
verID: singleRegionInfo.verID,
},
})
}, revokeToken)
return err == nil
})
}
Expand Down
163 changes: 163 additions & 0 deletions cdc/kv/token_region.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/ticdc/pkg/util"
"github.com/prometheus/client_golang/prometheus"
)

const (
// buffer size for ranged region consumer
regionRouterChanSize = 16
// sizedRegionRouter checks region buffer every 100ms
sizedRegionCheckInterval = 100 * time.Millisecond
)

// LimitRegionRouter defines an interface that can buffer singleRegionInfo
// and provide token based consumption
type LimitRegionRouter interface {
// Chan returns a singleRegionInfo channel that can be consumed from
Chan() <-chan singleRegionInfo
// AddRegion adds an singleRegionInfo to buffer, this function is thread-safe
AddRegion(task singleRegionInfo)
// Acquire acquires one token
Acquire(id string)
// Release gives back one token, this function is thread-safe
Release(id string)
// Run runs in background and does some logic work
Run(ctx context.Context) error
}

type srrMetrics struct {
changefeed string
table string
tokens map[string]prometheus.Gauge
}

func newSrrMetrics(ctx context.Context) *srrMetrics {
changefeed := util.ChangefeedIDFromCtx(ctx)
_, table := util.TableIDFromCtx(ctx)
return &srrMetrics{
changefeed: changefeed,
table: table,
tokens: make(map[string]prometheus.Gauge),
}
}

type sizedRegionRouter struct {
buffer map[string][]singleRegionInfo
output chan singleRegionInfo
lock sync.Mutex
metrics *srrMetrics
tokens map[string]int
sizeLimit int
}

// NewSizedRegionRouter creates a new sizedRegionRouter
func NewSizedRegionRouter(ctx context.Context, sizeLimit int) *sizedRegionRouter {
return &sizedRegionRouter{
buffer: make(map[string][]singleRegionInfo),
output: make(chan singleRegionInfo, regionRouterChanSize),
sizeLimit: sizeLimit,
tokens: make(map[string]int),
metrics: newSrrMetrics(ctx),
}
}

func (r *sizedRegionRouter) Chan() <-chan singleRegionInfo {
return r.output
}

func (r *sizedRegionRouter) AddRegion(sri singleRegionInfo) {
r.lock.Lock()
var id string
// if rpcCtx is not provided, use the default "" bucket
if sri.rpcCtx != nil {
id = sri.rpcCtx.Addr
}
if r.sizeLimit > r.tokens[id] && len(r.output) < regionRouterChanSize {
r.output <- sri
} else {
r.buffer[id] = append(r.buffer[id], sri)
}
r.lock.Unlock()
}

func (r *sizedRegionRouter) Acquire(id string) {
r.lock.Lock()
defer r.lock.Unlock()
r.tokens[id]++
if _, ok := r.metrics.tokens[id]; !ok {
r.metrics.tokens[id] = clientRegionTokenSize.WithLabelValues(id, r.metrics.table, r.metrics.changefeed)
}
r.metrics.tokens[id].Inc()
}

func (r *sizedRegionRouter) Release(id string) {
r.lock.Lock()
defer r.lock.Unlock()
r.tokens[id]--
if _, ok := r.metrics.tokens[id]; !ok {
r.metrics.tokens[id] = clientRegionTokenSize.WithLabelValues(id, r.metrics.table, r.metrics.changefeed)
}
r.metrics.tokens[id].Dec()
}

func (r *sizedRegionRouter) Run(ctx context.Context) error {
ticker := time.NewTicker(sizedRegionCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
r.lock.Lock()
for id, buf := range r.buffer {
available := r.sizeLimit - r.tokens[id]
// the tokens used could be more then size limit, since we have
// a sized channel as level1 cache
if available <= 0 {
continue
}
if available > len(buf) {
available = len(buf)
}
// to avoid deadlock because when consuming from the output channel.
// onRegionFail could decrease tokens, which requires lock protection.
if available > regionRouterChanSize-len(r.output) {
available = regionRouterChanSize - len(r.output)
}
if available == 0 {
continue
}
for i := 0; i < available; i++ {
select {
case <-ctx.Done():
r.lock.Unlock()
return errors.Trace(ctx.Err())
case r.output <- buf[i]:
}
}
r.buffer[id] = r.buffer[id][available:]
}
r.lock.Unlock()
}
}
}
Loading

0 comments on commit 41001a7

Please sign in to comment.