From 582894eccd33f4b51c33e71d06c56d5b9c965401 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 1 Jun 2021 11:55:36 +0800 Subject: [PATCH 1/3] mysql sink: refine log (#1883) (#1897) --- cdc/sink/mysql.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 10008810de7..177a4aa6136 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -637,6 +637,7 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) error { select { case s.errCh <- err: default: + log.Info("mysql sink receives redundant error", zap.Error(err)) } } }() @@ -1339,7 +1340,7 @@ func (s *mysqlSyncpointStore) CreateSynctable(ctx context.Context) error { if err != nil { err2 := tx.Rollback() if err2 != nil { - log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error()) + log.Error("failed to create syncpoint table", zap.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2))) } return cerror.WrapError(cerror.ErrMySQLTxnError, err) } @@ -1347,7 +1348,7 @@ func (s *mysqlSyncpointStore) CreateSynctable(ctx context.Context) error { if err != nil { err2 := tx.Rollback() if err2 != nil { - log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error()) + log.Error("failed to create syncpoint table", zap.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2))) } return cerror.WrapError(cerror.ErrMySQLTxnError, err) } @@ -1355,7 +1356,7 @@ func (s *mysqlSyncpointStore) CreateSynctable(ctx context.Context) error { if err != nil { err2 := tx.Rollback() if err2 != nil { - log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error()) + log.Error("failed to create syncpoint table", zap.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2))) } return cerror.WrapError(cerror.ErrMySQLTxnError, err) } @@ -1376,7 +1377,7 @@ func (s *mysqlSyncpointStore) SinkSyncpoint(ctx context.Context, id string, chec log.Info("sync table: get tidb_current_ts err") err2 := tx.Rollback() if err2 != nil { - log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error()) + log.Error("failed to write syncpoint table", zap.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2))) } return cerror.WrapError(cerror.ErrMySQLTxnError, err) } @@ -1384,7 +1385,7 @@ func (s *mysqlSyncpointStore) SinkSyncpoint(ctx context.Context, id string, chec if err != nil { err2 := tx.Rollback() if err2 != nil { - log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error()) + log.Error("failed to write syncpoint table", zap.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2))) } return cerror.WrapError(cerror.ErrMySQLTxnError, err) } From 032f4b07766466c097374867eb00fc879870130d Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Sat, 5 Jun 2021 10:18:27 +0800 Subject: [PATCH 2/3] tests: fix leak test in embeded etcd test (#1916) (#1943) --- pkg/etcd/etcd_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index a99e378aafe..c414e3a6238 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -41,13 +41,19 @@ func (s *etcdSuite) SetUpTest(c *check.C) { c.Assert(err, check.IsNil) s.clientURL = curl s.etcd = e - go func() { - c.Log(<-e.Err()) - }() } func (s *etcdSuite) TearDownTest(c *check.C) { s.etcd.Close() +logEtcdError: + for { + select { + case err := <-s.etcd.Err(): + c.Logf("etcd server error: %v", err) + default: + break logEtcdError + } + } } func (s *etcdSuite) TestEmbedEtcd(c *check.C) { From ffbdae307186c5ca110bc2ba60302ab1208c9627 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 10 Jun 2021 00:22:29 +0800 Subject: [PATCH 3/3] kv/client: add incremental scan region count limit (#1926) --- cdc/kv/client.go | 449 +++++++++++++++++++-------------- cdc/kv/client_test.go | 48 ++-- cdc/kv/client_v2.go | 2 +- cdc/kv/metrics.go | 8 + cdc/kv/region_worker.go | 10 +- cdc/kv/token_region.go | 163 ++++++++++++ cdc/kv/token_region_test.go | 181 +++++++++++++ tests/move_table/conf/workload | 2 +- 8 files changed, 647 insertions(+), 216 deletions(-) create mode 100644 cdc/kv/token_region.go create mode 100644 cdc/kv/token_region_test.go diff --git a/cdc/kv/client.go b/cdc/kv/client.go index b93cc8ea8ce..64a39ea76df 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -68,6 +68,9 @@ const ( // failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we // don't need to force reload region any more. regionScheduleReload = false + + // defines the scan region limit for each table + regionScanLimitPerTable = 6 ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -92,6 +95,8 @@ var ( metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest") metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown") metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable") + metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore") + metricConnectToStoreErr = eventFeedErrorCounter.WithLabelValues("ConnectToStore") ) var ( @@ -115,9 +120,13 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan, // happens, kv client needs to recover region request from singleRegionInfo func (s *singleRegionInfo) partialClone() singleRegionInfo { sri := singleRegionInfo{ - verID: s.verID, - span: s.span.Clone(), - ts: s.ts, + verID: s.verID, + span: s.span.Clone(), + ts: s.ts, + rpcCtx: &tikv.RPCContext{}, + } + if s.rpcCtx != nil { + sri.rpcCtx.Addr = s.rpcCtx.Addr } return sri } @@ -473,7 +482,7 @@ func (c *CDCClient) EventFeed( isPullerInit PullerInitialization, eventCh chan<- *model.RegionFeedEvent, ) error { - s := newEventFeedSession(c, c.regionCache, c.kvStorage, span, + s := newEventFeedSession(ctx, c, c.regionCache, c.kvStorage, span, lockResolver, isPullerInit, enableOldValue, ts, eventCh) return s.eventFeed(ctx, ts) @@ -503,6 +512,9 @@ type eventFeedSession struct { // The channel to send the processed events. eventCh chan<- *model.RegionFeedEvent + // The token based region router, it controls the uninitialzied regions with + // a given size limit. + regionRouter LimitRegionRouter // The channel to put the region that will be sent requests. regionCh chan singleRegionInfo // The channel to notify that an error is happening, so that the error will be handled and the affected region @@ -535,6 +547,7 @@ type rangeRequestTask struct { } func newEventFeedSession( + ctx context.Context, client *CDCClient, regionCache *tikv.RegionCache, kvStorage tikv.Storage, @@ -552,6 +565,7 @@ func newEventFeedSession( kvStorage: kvStorage, totalSpan: totalSpan, eventCh: eventCh, + regionRouter: NewSizedRegionRouter(ctx, regionScanLimitPerTable), regionCh: make(chan singleRegionInfo, 16), errCh: make(chan regionErrorInfo, 16), requestRangeCh: make(chan rangeRequestTask, 16), @@ -582,6 +596,10 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { return s.dispatchRequest(ctx, g) }) + g.Go(func() error { + return s.requestRegionToStore(ctx, g) + }) + g.Go(func() error { for { select { @@ -619,6 +637,10 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { } }) + g.Go(func() error { + return s.regionRouter.Run(ctx) + }) + s.requestRangeCh <- rangeRequestTask{span: s.totalSpan, ts: ts} s.rangeChSizeGauge.Inc() @@ -648,7 +670,6 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single s.regionChSizeGauge.Inc() case <-ctx.Done(): } - case regionspan.LockRangeStatusStale: log.Info("request expired", zap.Uint64("regionID", sri.verID.GetID()), @@ -697,9 +718,12 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single // onRegionFail handles a region's failure, which means, unlock the region's range and send the error to the errCh for // error handling. This function is non blocking even if error channel is full. // CAUTION: Note that this should only be called in a context that the region has locked it's range. -func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo) error { +func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) error { log.Debug("region failed", zap.Uint64("regionID", errorInfo.verID.GetID()), zap.Error(errorInfo.err)) s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End, errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.ts) + if revokeToken { + s.regionRouter.Release(errorInfo.rpcCtx.Addr) + } select { case s.errCh <- errorInfo: s.errChSizeGauge.Inc() @@ -715,14 +739,13 @@ func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErr return nil } -// dispatchRequest manages a set of streams and dispatch event feed requests -// to these streams. Streams to each store will be created on need. After -// establishing new stream, a goroutine will be spawned to handle events from -// the stream. -// Regions from `regionCh` will be connected. If any error happens to a -// region, the error will be send to `errCh` and the receiver of `errCh` is -// responsible for handling the error. -func (s *eventFeedSession) dispatchRequest( +// requestRegionToStore gets singleRegionInfo from regionRouter, which is a token +// based limitter, sends request to TiKV. +// If the send request to TiKV returns error, fail the region with sendRequestToStoreErr +// and kv client will redispatch the region. +// If initialize gPRC stream with an error, fail the region with connectToStoreErr +// and kv client will also redispatch the region. +func (s *eventFeedSession) requestRegionToStore( ctx context.Context, g *errgroup.Group, ) error { @@ -731,179 +754,202 @@ func (s *eventFeedSession) dispatchRequest( // to pass the region info to the receiver since the region info cannot be inferred from the response from TiKV. storePendingRegions := make(map[string]*syncRegionFeedStateMap) -MainLoop: + var sri singleRegionInfo for { - // Note that when a region is received from the channel, it's range has been already locked. - var sri singleRegionInfo select { case <-ctx.Done(): - return ctx.Err() - case sri = <-s.regionCh: - s.regionChSizeGauge.Dec() + return errors.Trace(ctx.Err()) + case sri = <-s.regionRouter.Chan(): } + requestID := allocID() - log.Debug("dispatching region", zap.Uint64("regionID", sri.verID.GetID())) + extraOp := kvrpcpb.ExtraOp_Noop + if s.enableOldValue { + extraOp = kvrpcpb.ExtraOp_ReadOldValue + } - // Loop for retrying in case the stream has disconnected. - // TODO: Should we break if retries and fails too many times? - for { - rpcCtx, err := s.getRPCContextForRegion(ctx, sri.verID) - if err != nil { - return errors.Trace(err) + rpcCtx := sri.rpcCtx + regionID := rpcCtx.Meta.GetId() + req := &cdcpb.ChangeDataRequest{ + Header: &cdcpb.Header{ + ClusterId: s.client.clusterID, + TicdcVersion: version.ReleaseSemver(), + }, + RegionId: regionID, + RequestId: requestID, + RegionEpoch: rpcCtx.Meta.RegionEpoch, + CheckpointTs: sri.ts, + StartKey: sri.span.Start, + EndKey: sri.span.End, + ExtraOp: extraOp, + } + + failpoint.Inject("kvClientPendingRegionDelay", nil) + + // each TiKV store has an independent pendingRegions. + var pendingRegions *syncRegionFeedStateMap + + var err error + stream, ok := s.getStream(rpcCtx.Addr) + if ok { + var ok bool + pendingRegions, ok = storePendingRegions[rpcCtx.Addr] + if !ok { + // Should never happen + log.Panic("pending regions is not found for store", zap.String("store", rpcCtx.Addr)) } - if rpcCtx == nil { - // The region info is invalid. Retry the span. - log.Info("cannot get rpcCtx, retry span", + } else { + // when a new stream is established, always create a new pending + // regions map, the old map will be used in old `receiveFromStream` + // and won't be deleted until that goroutine exits. + pendingRegions = newSyncRegionFeedStateMap() + storePendingRegions[rpcCtx.Addr] = pendingRegions + storeID := rpcCtx.Peer.GetStoreId() + log.Info("creating new stream to store to send request", + zap.Uint64("regionID", sri.verID.GetID()), + zap.Uint64("requestID", requestID), + zap.Uint64("storeID", storeID), + zap.String("addr", rpcCtx.Addr)) + streamCtx, streamCancel := context.WithCancel(ctx) + _ = streamCancel // to avoid possible context leak warning from govet + stream, err = s.client.newStream(streamCtx, rpcCtx.Addr, storeID) + if err != nil { + // if get stream failed, maybe the store is down permanently, we should try to relocate the active store + log.Warn("get grpc stream client failed", zap.Uint64("regionID", sri.verID.GetID()), - zap.Stringer("span", sri.span)) + zap.Uint64("requestID", requestID), + zap.Uint64("storeID", storeID), + zap.String("error", err.Error())) + if cerror.ErrVersionIncompatible.Equal(err) { + // It often occurs on rolling update. Sleep 20s to reduce logs. + delay := 20 * time.Second + failpoint.Inject("kvClientDelayWhenIncompatible", func() { + delay = 100 * time.Millisecond + }) + time.Sleep(delay) + } + bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err) err = s.onRegionFail(ctx, regionErrorInfo{ singleRegionInfo: sri, - err: &rpcCtxUnavailableErr{ - verID: sri.verID, - }, - }) + err: &connectToStoreErr{}, + }, false /* revokeToken */) if err != nil { return errors.Trace(err) } - continue MainLoop - } - sri.rpcCtx = rpcCtx - - requestID := allocID() - - extraOp := kvrpcpb.ExtraOp_Noop - if s.enableOldValue { - extraOp = kvrpcpb.ExtraOp_ReadOldValue + continue } + s.addStream(rpcCtx.Addr, stream, streamCancel) - regionID := rpcCtx.Meta.GetId() - req := &cdcpb.ChangeDataRequest{ - Header: &cdcpb.Header{ - ClusterId: s.client.clusterID, - TicdcVersion: version.ReleaseSemver(), - }, - RegionId: regionID, - RequestId: requestID, - RegionEpoch: rpcCtx.Meta.RegionEpoch, - CheckpointTs: sri.ts, - StartKey: sri.span.Start, - EndKey: sri.span.End, - ExtraOp: extraOp, - } + limiter := s.client.getRegionLimiter(regionID) + g.Go(func() error { + if !s.enableKVClientV2 { + return s.receiveFromStream(ctx, g, rpcCtx.Addr, getStoreID(rpcCtx), stream, pendingRegions, limiter) + } + return s.receiveFromStreamV2(ctx, g, rpcCtx.Addr, getStoreID(rpcCtx), stream, pendingRegions, limiter) + }) + } - failpoint.Inject("kvClientPendingRegionDelay", nil) + state := newRegionFeedState(sri, requestID) + pendingRegions.insert(requestID, state) - // each TiKV store has an independent pendingRegions. - var pendingRegions *syncRegionFeedStateMap + logReq := log.Debug + if s.isPullerInit.IsInitialized() { + logReq = log.Info + } + logReq("start new request", zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr)) - stream, ok := s.getStream(rpcCtx.Addr) - if ok { - var ok bool - pendingRegions, ok = storePendingRegions[rpcCtx.Addr] - if !ok { - // Should never happen - log.Panic("pending regions is not found for store", zap.String("store", rpcCtx.Addr)) - } - } else { - // when a new stream is established, always create a new pending - // regions map, the old map will be used in old `receiveFromStream` - // and won't be deleted until that goroutine exits. - pendingRegions = newSyncRegionFeedStateMap() - storePendingRegions[rpcCtx.Addr] = pendingRegions - storeID := rpcCtx.Peer.GetStoreId() - log.Info("creating new stream to store to send request", - zap.Uint64("regionID", sri.verID.GetID()), - zap.Uint64("requestID", requestID), - zap.Uint64("storeID", storeID), - zap.String("addr", rpcCtx.Addr)) - streamCtx, streamCancel := context.WithCancel(ctx) - _ = streamCancel // to avoid possible context leak warning from govet - stream, err = s.client.newStream(streamCtx, rpcCtx.Addr, storeID) - if err != nil { - // if get stream failed, maybe the store is down permanently, we should try to relocate the active store - log.Warn("get grpc stream client failed", - zap.Uint64("regionID", sri.verID.GetID()), - zap.Uint64("requestID", requestID), - zap.Uint64("storeID", storeID), - zap.String("error", err.Error())) - if cerror.ErrVersionIncompatible.Equal(err) { - // It often occurs on rolling update. Sleep 20s to reduce logs. - delay := 20 * time.Second - failpoint.Inject("kvClientDelayWhenIncompatible", func() { - delay = 100 * time.Millisecond - }) - time.Sleep(delay) - } - bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) - s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err) - continue - } - s.addStream(rpcCtx.Addr, stream, streamCancel) + err = stream.Send(req) - limiter := s.client.getRegionLimiter(regionID) - g.Go(func() error { - if !s.enableKVClientV2 { - return s.receiveFromStream(ctx, g, rpcCtx.Addr, getStoreID(rpcCtx), stream, pendingRegions, limiter) - } - return s.receiveFromStreamV2(ctx, g, rpcCtx.Addr, getStoreID(rpcCtx), stream, pendingRegions, limiter) - }) + // If Send error, the receiver should have received error too or will receive error soon. So we doesn't need + // to do extra work here. + if err != nil { + log.Error("send request to stream failed", + zap.String("addr", rpcCtx.Addr), + zap.Uint64("storeID", getStoreID(rpcCtx)), + zap.Uint64("regionID", sri.verID.GetID()), + zap.Uint64("requestID", requestID), + zap.Error(err)) + err1 := stream.CloseSend() + if err1 != nil { + log.Error("failed to close stream", zap.Error(err1)) } - - state := newRegionFeedState(sri, requestID) - pendingRegions.insert(requestID, state) - - logReq := log.Debug - if s.isPullerInit.IsInitialized() { - logReq = log.Info + // Delete the stream from the map so that the next time the store is accessed, the stream will be + // re-established. + s.deleteStream(rpcCtx.Addr) + // Delete `pendingRegions` from `storePendingRegions` so that the next time a region of this store is + // requested, it will create a new one. So if the `receiveFromStream` goroutine tries to stop all + // pending regions, the new pending regions that are requested after reconnecting won't be stopped + // incorrectly. + delete(storePendingRegions, rpcCtx.Addr) + + // Remove the region from pendingRegions. If it's already removed, it should be already retried by + // `receiveFromStream`, so no need to retry here. + _, ok := pendingRegions.take(requestID) + if !ok { + continue } - logReq("start new request", zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr)) - err = stream.Send(req) - - // If Send error, the receiver should have received error too or will receive error soon. So we doesn't need - // to do extra work here. + // Wait for a while and retry sending the request + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + err = s.onRegionFail(ctx, regionErrorInfo{ + singleRegionInfo: sri, + err: &sendRequestToStoreErr{}, + }, false /* revokeToken */) if err != nil { + return errors.Trace(err) + } + } else { + s.regionRouter.Acquire(rpcCtx.Addr) + } + } +} - log.Error("send request to stream failed", - zap.String("addr", rpcCtx.Addr), - zap.Uint64("storeID", getStoreID(rpcCtx)), - zap.Uint64("regionID", sri.verID.GetID()), - zap.Uint64("requestID", requestID), - zap.Error(err)) - err1 := stream.CloseSend() - if err1 != nil { - log.Error("failed to close stream", zap.Error(err1)) - } - // Delete the stream from the map so that the next time the store is accessed, the stream will be - // re-established. - s.deleteStream(rpcCtx.Addr) - // Delete `pendingRegions` from `storePendingRegions` so that the next time a region of this store is - // requested, it will create a new one. So if the `receiveFromStream` goroutine tries to stop all - // pending regions, the new pending regions that are requested after reconnecting won't be stopped - // incorrectly. - delete(storePendingRegions, rpcCtx.Addr) - - // Remove the region from pendingRegions. If it's already removed, it should be already retried by - // `receiveFromStream`, so no need to retry here. - _, ok := pendingRegions.take(requestID) - if !ok { - break - } +// dispatchRequest manages a set of streams and dispatch event feed requests +// to these streams. Streams to each store will be created on need. After +// establishing new stream, a goroutine will be spawned to handle events from +// the stream. +// Regions from `regionCh` will be connected. If any error happens to a +// region, the error will be send to `errCh` and the receiver of `errCh` is +// responsible for handling the error. +func (s *eventFeedSession) dispatchRequest( + ctx context.Context, + g *errgroup.Group, +) error { + for { + // Note that when a region is received from the channel, it's range has been already locked. + var sri singleRegionInfo + select { + case <-ctx.Done(): + return ctx.Err() + case sri = <-s.regionCh: + s.regionChSizeGauge.Dec() + } - // Wait for a while and retry sending the request - time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - // Break if ctx has been canceled. - select { - case <-ctx.Done(): - return ctx.Err() - default: - } + log.Debug("dispatching region", zap.Uint64("regionID", sri.verID.GetID())) - continue + rpcCtx, err := s.getRPCContextForRegion(ctx, sri.verID) + if err != nil { + return errors.Trace(err) + } + if rpcCtx == nil { + // The region info is invalid. Retry the span. + log.Info("cannot get rpcCtx, retry span", + zap.Uint64("regionID", sri.verID.GetID()), + zap.Stringer("span", sri.span)) + err = s.onRegionFail(ctx, regionErrorInfo{ + singleRegionInfo: sri, + err: &rpcCtxUnavailableErr{ + verID: sri.verID, + }, + }, false /* revokeToken */) + if err != nil { + return errors.Trace(err) } - - break + continue } + sri.rpcCtx = rpcCtx + s.regionRouter.AddRegion(sri) } } @@ -933,7 +979,8 @@ func (s *eventFeedSession) partialRegionFeed( }() ts := state.sri.ts - maxTs, err := s.singleEventFeed(ctx, state.sri.verID.GetID(), state.sri.span, state.sri.ts, receiver) + maxTs, initialized, err := s.singleEventFeed(ctx, state.sri.verID.GetID(), state.sri.span, + state.sri.ts, state.sri.rpcCtx.Addr, receiver) log.Debug("singleEventFeed quit") if err == nil || errors.Cause(err) == context.Canceled { @@ -994,10 +1041,11 @@ func (s *eventFeedSession) partialRegionFeed( } } + revokeToken := !initialized return s.onRegionFail(ctx, regionErrorInfo{ singleRegionInfo: state.sri, err: err, - }) + }, revokeToken) } // divideAndSendEventFeedToRegions split up the input span into spans aligned @@ -1114,6 +1162,10 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI metricFeedRPCCtxUnavailable.Inc() s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.ts) return nil + case *connectToStoreErr: + metricConnectToStoreErr.Inc() + case *sendRequestToStoreErr: + metricStoreSendRequestErr.Inc() default: bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) if errInfo.rpcCtx.Meta != nil { @@ -1157,7 +1209,7 @@ func (s *eventFeedSession) receiveFromStream( err := s.onRegionFail(ctx, regionErrorInfo{ singleRegionInfo: state.sri, err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(), - }) + }, true /* revokeToken */) if err != nil { // The only possible is that the ctx is cancelled. Simply return. return @@ -1360,8 +1412,9 @@ func (s *eventFeedSession) singleEventFeed( regionID uint64, span regionspan.ComparableSpan, startTs uint64, + storeAddr string, receiverCh <-chan *regionEvent, -) (uint64, error) { +) (lastResolvedTs uint64, initialized bool, err error) { captureAddr := util.CaptureAddrFromCtx(ctx) changefeedID := util.ChangefeedIDFromCtx(ctx) metricEventSize := eventSize.WithLabelValues(captureAddr) @@ -1374,14 +1427,12 @@ func (s *eventFeedSession) singleEventFeed( metricSendEventCommitCounter := sendEventCounter.WithLabelValues("commit", captureAddr, changefeedID) metricSendEventCommittedCounter := sendEventCounter.WithLabelValues("committed", captureAddr, changefeedID) - initialized := false - matcher := newMatcher() advanceCheckTicker := time.NewTicker(time.Second * 5) defer advanceCheckTicker.Stop() lastReceivedEventTime := time.Now() startFeedTime := time.Now() - lastResolvedTs := startTs + lastResolvedTs = startTs handleResolvedTs := func(resolvedTs uint64) error { if !initialized { return nil @@ -1422,7 +1473,8 @@ func (s *eventFeedSession) singleEventFeed( }, }: case <-ctx.Done(): - return lastResolvedTs, errors.Trace(ctx.Err()) + err = errors.Trace(ctx.Err()) + return } resolveLockInterval := 20 * time.Second failpoint.Inject("kvClientResolveLockInterval", func(val failpoint.Value) { @@ -1437,7 +1489,8 @@ func (s *eventFeedSession) singleEventFeed( var ok bool select { case <-ctx.Done(): - return lastResolvedTs, ctx.Err() + err = errors.Trace(err) + return case <-advanceCheckTicker.C: if time.Since(startFeedTime) < resolveLockInterval { continue @@ -1453,7 +1506,8 @@ func (s *eventFeedSession) singleEventFeed( } if sinceLastEvent > reconnectInterval { log.Warn("kv client reconnect triggered", zap.Duration("duration", sinceLastEvent)) - return lastResolvedTs, errReconnect + err = errReconnect + return } version, err := s.kvStorage.(*StorageWithCurVersionCache).GetCachedCurrentVersion() if err != nil { @@ -1480,8 +1534,10 @@ func (s *eventFeedSession) singleEventFeed( if !ok || event == nil { log.Debug("singleEventFeed closed by error") - return lastResolvedTs, cerror.ErrEventFeedAborted.GenWithStackByArgs() + err = cerror.ErrEventFeedAborted.GenWithStackByArgs() + return } + var revent *model.RegionFeedEvent lastReceivedEventTime = time.Now() if event.changeEvent != nil { metricEventSize.Observe(float64(event.changeEvent.Event.Size())) @@ -1500,30 +1556,33 @@ func (s *eventFeedSession) singleEventFeed( switch entry.Type { case cdcpb.Event_INITIALIZED: if time.Since(startFeedTime) > 20*time.Second { - log.Warn("The time cost of initializing is too mush", + log.Warn("The time cost of initializing is too much", zap.Duration("timeCost", time.Since(startFeedTime)), zap.Uint64("regionID", regionID)) } metricPullEventInitializedCounter.Inc() initialized = true + s.regionRouter.Release(storeAddr) cachedEvents := matcher.matchCachedRow() for _, cachedEvent := range cachedEvents { - revent, err := assembleRowEvent(regionID, cachedEvent, s.enableOldValue) + revent, err = assembleRowEvent(regionID, cachedEvent, s.enableOldValue) if err != nil { - return lastResolvedTs, errors.Trace(err) + err = errors.Trace(err) + return } select { case s.eventCh <- revent: metricSendEventCommitCounter.Inc() case <-ctx.Done(): - return lastResolvedTs, errors.Trace(ctx.Err()) + err = errors.Trace(err) + return } } case cdcpb.Event_COMMITTED: metricPullEventCommittedCounter.Inc() - revent, err := assembleRowEvent(regionID, entry, s.enableOldValue) + revent, err = assembleRowEvent(regionID, entry, s.enableOldValue) if err != nil { - return lastResolvedTs, errors.Trace(err) + return } if entry.CommitTs <= lastResolvedTs { @@ -1532,13 +1591,15 @@ func (s *eventFeedSession) singleEventFeed( zap.Uint64("CommitTs", entry.CommitTs), zap.Uint64("resolvedTs", lastResolvedTs), zap.Uint64("regionID", regionID)) - return lastResolvedTs, errUnreachable + err = errUnreachable + return } select { case s.eventCh <- revent: metricSendEventCommittedCounter.Inc() case <-ctx.Done(): - return lastResolvedTs, errors.Trace(ctx.Err()) + err = errors.Trace(ctx.Err()) + return } case cdcpb.Event_PREWRITE: metricPullEventPrewriteCounter.Inc() @@ -1551,7 +1612,8 @@ func (s *eventFeedSession) singleEventFeed( zap.Uint64("CommitTs", entry.CommitTs), zap.Uint64("resolvedTs", lastResolvedTs), zap.Uint64("regionID", regionID)) - return lastResolvedTs, errUnreachable + err = errUnreachable + return } ok := matcher.matchRow(entry) if !ok { @@ -1559,19 +1621,21 @@ func (s *eventFeedSession) singleEventFeed( matcher.cacheCommitRow(entry) continue } - return lastResolvedTs, cerror.ErrPrewriteNotMatch.GenWithStackByArgs(entry.GetKey(), entry.GetStartTs()) + err = cerror.ErrPrewriteNotMatch.GenWithStackByArgs(entry.GetKey(), entry.GetStartTs()) + return } - revent, err := assembleRowEvent(regionID, entry, s.enableOldValue) + revent, err = assembleRowEvent(regionID, entry, s.enableOldValue) if err != nil { - return lastResolvedTs, errors.Trace(err) + return } select { case s.eventCh <- revent: metricSendEventCommitCounter.Inc() case <-ctx.Done(): - return lastResolvedTs, errors.Trace(ctx.Err()) + err = errors.Trace(ctx.Err()) + return } case cdcpb.Event_ROLLBACK: metricPullEventRollbackCounter.Inc() @@ -1581,17 +1645,18 @@ func (s *eventFeedSession) singleEventFeed( case *cdcpb.Event_Admin_: log.Info("receive admin event", zap.Stringer("event", event.changeEvent)) case *cdcpb.Event_Error: - return lastResolvedTs, cerror.WrapError(cerror.ErrEventFeedEventError, &eventError{err: x.Error}) + err = cerror.WrapError(cerror.ErrEventFeedEventError, &eventError{err: x.Error}) + return case *cdcpb.Event_ResolvedTs: - if err := handleResolvedTs(x.ResolvedTs); err != nil { - return lastResolvedTs, errors.Trace(err) + if err = handleResolvedTs(x.ResolvedTs); err != nil { + return } } } if event.resolvedTs != nil { - if err := handleResolvedTs(event.resolvedTs.Ts); err != nil { - return lastResolvedTs, errors.Trace(err) + if err = handleResolvedTs(event.resolvedTs.Ts); err != nil { + return } } } @@ -1675,6 +1740,14 @@ func (e *rpcCtxUnavailableErr) Error() string { e.verID.GetID(), e.verID.GetVer(), e.verID.GetConfVer()) } +type connectToStoreErr struct{} + +func (e *connectToStoreErr) Error() string { return "connect to store error" } + +type sendRequestToStoreErr struct{} + +func (e *sendRequestToStoreErr) Error() string { return "send request to store error" } + func getStoreID(rpcCtx *tikv.RPCContext) uint64 { if rpcCtx != nil && rpcCtx.Peer != nil { return rpcCtx.Peer.GetStoreId() diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 3ad3a6d18c5..7cda292a153 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -1479,7 +1479,7 @@ ReceiveLoop: } } -// TestStreamSendWithErrorNormal mainly tests the scenario that the `Recv` call +// TestStreamRecvWithErrorNormal mainly tests the scenario that the `Recv` call // of a gPRC stream in kv client meets a **logical related** error, and kv client // logs the error and re-establish new request. func (s *etcdSuite) TestStreamRecvWithErrorNormal(c *check.C) { @@ -1497,7 +1497,7 @@ func (s *etcdSuite) TestStreamRecvWithErrorNormal(c *check.C) { s.testStreamRecvWithError(c, "1*return(\"injected stream recv error\")") } -// TestStreamSendWithErrorIOEOF mainly tests the scenario that the `Recv` call +// TestStreamRecvWithErrorIOEOF mainly tests the scenario that the `Recv` call // of a gPRC stream in kv client meets error io.EOF, and kv client logs the error // and re-establish new request func (s *etcdSuite) TestStreamRecvWithErrorIOEOF(c *check.C) { @@ -2422,7 +2422,7 @@ func (s *clientSuite) TestSingleRegionInfoClone(c *check.C) { c.Assert(sri.span.String(), check.Equals, "[61, 63)") c.Assert(sri2.ts, check.Equals, uint64(2000)) c.Assert(sri2.span.String(), check.Equals, "[61, 62)") - c.Assert(sri2.rpcCtx, check.IsNil) + c.Assert(sri2.rpcCtx, check.DeepEquals, &tikv.RPCContext{}) } // TestResolveLockNoCandidate tests the resolved ts manager can work normally @@ -3138,7 +3138,9 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage)) isPullInit := &mockPullerInit{} cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{}) - eventCh := make(chan *model.RegionFeedEvent, 10) + // The buffer size of event channel must be large enough because in the test + // case we send events first, and then retrive all events from this channel. + eventCh := make(chan *model.RegionFeedEvent, 100) wg.Add(1) go func() { err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh) @@ -3159,10 +3161,29 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { } // wait for all regions requested from cdc kv client + // Since there exists incremental scan limit in kv client, we must wait for + // the ready region and send initialized event. + sent := make(map[uint64]bool, regionNum) err = retry.Run(time.Millisecond*200, 20, func() error { count := 0 - requestIDs.Range(func(_, _ interface{}) bool { + // send initialized event and a resolved ts event to each region + requestIDs.Range(func(key, value interface{}) bool { count++ + regionID := key.(uint64) + requestID := value.(uint64) + if _, ok := sent[regionID]; !ok { + initialized := mockInitializedEvent(regionID, requestID) + ch1 <- initialized + sent[regionID] = true + resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ + { + RegionId: regionID, + RequestId: requestID, + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120}, + }, + }} + ch1 <- resolved + } return true }) if count == regionNum { @@ -3172,23 +3193,6 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { }) c.Assert(err, check.IsNil) - // send initialized event and a resolved ts event to each region - requestIDs.Range(func(key, value interface{}) bool { - regionID := key.(uint64) - requestID := value.(uint64) - initialized := mockInitializedEvent(regionID, requestID) - ch1 <- initialized - resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ - { - RegionId: regionID, - RequestId: requestID, - Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120}, - }, - }} - ch1 <- resolved - return true - }) - resolvedCount := 0 checkEvent: for { diff --git a/cdc/kv/client_v2.go b/cdc/kv/client_v2.go index 4eb18d121fa..3a792581471 100644 --- a/cdc/kv/client_v2.go +++ b/cdc/kv/client_v2.go @@ -178,7 +178,7 @@ func (s *eventFeedSession) receiveFromStreamV2( err := s.onRegionFail(ctx, regionErrorInfo{ singleRegionInfo: state.sri, err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(), - }) + }, false /* initialized */) if err != nil { // The only possible is that the ctx is cancelled. Simply return. return diff --git a/cdc/kv/metrics.go b/cdc/kv/metrics.go index bacfe8d0489..b9fe36c12c3 100644 --- a/cdc/kv/metrics.go +++ b/cdc/kv/metrics.go @@ -67,6 +67,13 @@ var ( Name: "channel_size", Help: "size of each channel in kv client", }, []string{"id", "channel"}) + clientRegionTokenSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "region_token", + Help: "size of region token in kv client", + }, []string{"store", "table", "changefeed"}) batchResolvedEventSize = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "ticdc", @@ -93,6 +100,7 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(pullEventCounter) registry.MustRegister(sendEventCounter) registry.MustRegister(clientChannelSize) + registry.MustRegister(clientRegionTokenSize) registry.MustRegister(batchResolvedEventSize) registry.MustRegister(etcdRequestCounter) } diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 8e2dd09c121..f71e0fb7bf1 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -199,10 +199,11 @@ func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, s } }) + revokeToken := !state.initialized return w.session.onRegionFail(ctx, regionErrorInfo{ singleRegionInfo: state.sri, err: err, - }) + }, revokeToken) } func (w *regionWorker) checkUnInitRegions(ctx context.Context) error { @@ -452,7 +453,7 @@ func (w *regionWorker) handleEventEntry( switch entry.Type { case cdcpb.Event_INITIALIZED: if time.Since(state.startFeedTime) > 20*time.Second { - log.Warn("The time cost of initializing is too mush", + log.Warn("The time cost of initializing is too much", zap.Duration("timeCost", time.Since(state.startFeedTime)), zap.Uint64("regionID", regionID)) } @@ -468,7 +469,7 @@ func (w *regionWorker) handleEventEntry( metricPullEventInitializedCounter.Inc() state.initialized = true - + w.session.regionRouter.Release(state.sri.rpcCtx.Addr) cachedEvents := state.matcher.matchCachedRow() for _, cachedEvent := range cachedEvents { revent, err := assembleRowEvent(regionID, cachedEvent, w.enableOldValue) @@ -605,13 +606,14 @@ func (w *regionWorker) evictAllRegions(ctx context.Context) error { if state.lastResolvedTs > singleRegionInfo.ts { singleRegionInfo.ts = state.lastResolvedTs } + revokeToken := !state.initialized state.lock.Unlock() err = w.session.onRegionFail(ctx, regionErrorInfo{ singleRegionInfo: singleRegionInfo, err: &rpcCtxUnavailableErr{ verID: singleRegionInfo.verID, }, - }) + }, revokeToken) return err == nil }) } diff --git a/cdc/kv/token_region.go b/cdc/kv/token_region.go new file mode 100644 index 00000000000..fee545ce375 --- /dev/null +++ b/cdc/kv/token_region.go @@ -0,0 +1,163 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/pkg/util" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + // buffer size for ranged region consumer + regionRouterChanSize = 16 + // sizedRegionRouter checks region buffer every 100ms + sizedRegionCheckInterval = 100 * time.Millisecond +) + +// LimitRegionRouter defines an interface that can buffer singleRegionInfo +// and provide token based consumption +type LimitRegionRouter interface { + // Chan returns a singleRegionInfo channel that can be consumed from + Chan() <-chan singleRegionInfo + // AddRegion adds an singleRegionInfo to buffer, this function is thread-safe + AddRegion(task singleRegionInfo) + // Acquire acquires one token + Acquire(id string) + // Release gives back one token, this function is thread-safe + Release(id string) + // Run runs in background and does some logic work + Run(ctx context.Context) error +} + +type srrMetrics struct { + changefeed string + table string + tokens map[string]prometheus.Gauge +} + +func newSrrMetrics(ctx context.Context) *srrMetrics { + changefeed := util.ChangefeedIDFromCtx(ctx) + _, table := util.TableIDFromCtx(ctx) + return &srrMetrics{ + changefeed: changefeed, + table: table, + tokens: make(map[string]prometheus.Gauge), + } +} + +type sizedRegionRouter struct { + buffer map[string][]singleRegionInfo + output chan singleRegionInfo + lock sync.Mutex + metrics *srrMetrics + tokens map[string]int + sizeLimit int +} + +// NewSizedRegionRouter creates a new sizedRegionRouter +func NewSizedRegionRouter(ctx context.Context, sizeLimit int) *sizedRegionRouter { + return &sizedRegionRouter{ + buffer: make(map[string][]singleRegionInfo), + output: make(chan singleRegionInfo, regionRouterChanSize), + sizeLimit: sizeLimit, + tokens: make(map[string]int), + metrics: newSrrMetrics(ctx), + } +} + +func (r *sizedRegionRouter) Chan() <-chan singleRegionInfo { + return r.output +} + +func (r *sizedRegionRouter) AddRegion(sri singleRegionInfo) { + r.lock.Lock() + var id string + // if rpcCtx is not provided, use the default "" bucket + if sri.rpcCtx != nil { + id = sri.rpcCtx.Addr + } + if r.sizeLimit > r.tokens[id] && len(r.output) < regionRouterChanSize { + r.output <- sri + } else { + r.buffer[id] = append(r.buffer[id], sri) + } + r.lock.Unlock() +} + +func (r *sizedRegionRouter) Acquire(id string) { + r.lock.Lock() + defer r.lock.Unlock() + r.tokens[id]++ + if _, ok := r.metrics.tokens[id]; !ok { + r.metrics.tokens[id] = clientRegionTokenSize.WithLabelValues(id, r.metrics.table, r.metrics.changefeed) + } + r.metrics.tokens[id].Inc() +} + +func (r *sizedRegionRouter) Release(id string) { + r.lock.Lock() + defer r.lock.Unlock() + r.tokens[id]-- + if _, ok := r.metrics.tokens[id]; !ok { + r.metrics.tokens[id] = clientRegionTokenSize.WithLabelValues(id, r.metrics.table, r.metrics.changefeed) + } + r.metrics.tokens[id].Dec() +} + +func (r *sizedRegionRouter) Run(ctx context.Context) error { + ticker := time.NewTicker(sizedRegionCheckInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case <-ticker.C: + r.lock.Lock() + for id, buf := range r.buffer { + available := r.sizeLimit - r.tokens[id] + // the tokens used could be more then size limit, since we have + // a sized channel as level1 cache + if available <= 0 { + continue + } + if available > len(buf) { + available = len(buf) + } + // to avoid deadlock because when consuming from the output channel. + // onRegionFail could decrease tokens, which requires lock protection. + if available > regionRouterChanSize-len(r.output) { + available = regionRouterChanSize - len(r.output) + } + if available == 0 { + continue + } + for i := 0; i < available; i++ { + select { + case <-ctx.Done(): + r.lock.Unlock() + return errors.Trace(ctx.Err()) + case r.output <- buf[i]: + } + } + r.buffer[id] = r.buffer[id][available:] + } + r.lock.Unlock() + } + } +} diff --git a/cdc/kv/token_region_test.go b/cdc/kv/token_region_test.go new file mode 100644 index 00000000000..b26d9b02cd5 --- /dev/null +++ b/cdc/kv/token_region_test.go @@ -0,0 +1,181 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/pkg/util/testleak" + "github.com/pingcap/tidb/store/tikv" + "golang.org/x/sync/errgroup" +) + +type tokenRegionSuite struct { +} + +var _ = check.Suite(&tokenRegionSuite{}) + +func (s *tokenRegionSuite) TestRouter(c *check.C) { + defer testleak.AfterTest(c)() + store := "store-1" + limit := 10 + r := NewSizedRegionRouter(context.Background(), limit) + for i := 0; i < limit; i++ { + r.AddRegion(singleRegionInfo{ts: uint64(i), rpcCtx: &tikv.RPCContext{Addr: store}}) + } + regions := make([]singleRegionInfo, 0, limit) + // limit is less than regionScanLimitPerTable + for i := 0; i < limit; i++ { + select { + case sri := <-r.Chan(): + c.Assert(sri.ts, check.Equals, uint64(i)) + r.Acquire(store) + regions = append(regions, sri) + default: + c.Error("expect region info from router") + } + } + c.Assert(r.tokens[store], check.Equals, limit) + for range regions { + r.Release(store) + } + c.Assert(r.tokens[store], check.Equals, 0) +} + +func (s *tokenRegionSuite) TestRouterWithFastConsumer(c *check.C) { + defer testleak.AfterTest(c)() + s.testRouterWithConsumer(c, func() {}) +} + +func (s *tokenRegionSuite) TestRouterWithSlowConsumer(c *check.C) { + defer testleak.AfterTest(c)() + s.testRouterWithConsumer(c, func() { time.Sleep(time.Millisecond * 15) }) +} + +func (s *tokenRegionSuite) testRouterWithConsumer(c *check.C, funcDoSth func()) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := "store-1" + limit := 20 + r := NewSizedRegionRouter(context.Background(), limit) + for i := 0; i < limit*2; i++ { + r.AddRegion(singleRegionInfo{ts: uint64(i), rpcCtx: &tikv.RPCContext{Addr: store}}) + } + received := uint64(0) + for i := 0; i < regionRouterChanSize; i++ { + <-r.Chan() + atomic.AddUint64(&received, 1) + r.Acquire(store) + } + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + return r.Run(ctx) + }) + + wg.Go(func() error { + for i := 0; i < regionRouterChanSize; i++ { + r.Release(store) + } + return nil + }) + + wg.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-r.Chan(): + r.Acquire(store) + atomic.AddUint64(&received, 1) + r.Release(store) + funcDoSth() + if atomic.LoadUint64(&received) == uint64(limit*4) { + cancel() + } + } + } + }) + + for i := 0; i < limit*2; i++ { + r.AddRegion(singleRegionInfo{ts: uint64(i), rpcCtx: &tikv.RPCContext{Addr: store}}) + } + + err := wg.Wait() + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + c.Assert(r.tokens[store], check.Equals, 0) +} + +func (s *tokenRegionSuite) TestRouterWithMultiStores(c *check.C) { + defer testleak.AfterTest(c)() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + storeN := 10 + stores := make([]string, 0, storeN) + for i := 0; i < storeN; i++ { + stores = append(stores, fmt.Sprintf("store-%d", i)) + } + limit := 20 + r := NewSizedRegionRouter(context.Background(), limit) + + for _, store := range stores { + for j := 0; j < limit*2; j++ { + r.AddRegion(singleRegionInfo{ts: uint64(j), rpcCtx: &tikv.RPCContext{Addr: store}}) + } + } + received := uint64(0) + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + return r.Run(ctx) + }) + + for _, store := range stores { + store := store + wg.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-r.Chan(): + r.Acquire(store) + atomic.AddUint64(&received, 1) + r.Release(store) + if atomic.LoadUint64(&received) == uint64(limit*4*storeN) { + cancel() + } + } + } + }) + } + + for _, store := range stores { + for i := 0; i < limit*2; i++ { + r.AddRegion(singleRegionInfo{ts: uint64(i), rpcCtx: &tikv.RPCContext{Addr: store}}) + } + } + + err := wg.Wait() + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + for _, store := range stores { + c.Assert(r.tokens[store], check.Equals, 0) + } +} diff --git a/tests/move_table/conf/workload b/tests/move_table/conf/workload index 5b9ca3189fc..128a6404b53 100644 --- a/tests/move_table/conf/workload +++ b/tests/move_table/conf/workload @@ -1,5 +1,5 @@ threadcount=10 -recordcount=60000 +recordcount=6000 operationcount=0 workload=core