Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv (ticdc): fix kvClient reconnection downhill loop #10559

Merged
merged 14 commits into from
Jan 31, 2024
176 changes: 104 additions & 72 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ func newRegionErrorInfo(info singleRegionInfo, err error) regionErrorInfo {
type eventFeedStream struct {
client cdcpb.ChangeData_EventFeedClient
conn *sharedConn
// addr is the address of the TiKV store
addr string
// storeID is the ID of the TiKV store
storeID uint64
// id is the stream ID, which is used to identify the stream.
id uint64
// cancel is used to cancel the gRPC stream
cancel context.CancelFunc
}

// CDCKVClient is an interface to receives kv changed logs from TiKV
Expand Down Expand Up @@ -232,7 +240,12 @@ func NewCDCClient(
return c
}

func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) {
func (c *CDCClient) newStream(
ctx context.Context,
cancel context.CancelFunc,
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
addr string,
storeID uint64,
) (stream *eventFeedStream, newStreamErr error) {
streamFunc := func() (err error) {
var conn *sharedConn
defer func() {
Expand All @@ -255,8 +268,12 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
return cerror.WrapError(cerror.ErrTiKVEventFeed, err)
}
stream = &eventFeedStream{
client: streamClient,
conn: conn,
client: streamClient,
conn: conn,
addr: addr,
storeID: storeID,
id: allocID(),
cancel: cancel,
}
log.Info("created stream to store",
zap.String("namespace", c.changefeed.Namespace),
Expand Down Expand Up @@ -371,9 +388,11 @@ type eventFeedSession struct {
errChSizeGauge prometheus.Gauge
rangeChSizeGauge prometheus.Gauge

streams map[string]*eventFeedStream
streamsLock sync.RWMutex
streamsCanceller map[string]context.CancelFunc
// storeStreamsCache is used to cache the established gRPC streams to TiKV stores.
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
storeStreamsCache map[string]*eventFeedStream
// storeStreamsCacheLock is used to protect storeStreamsCache, since the cache
// is shared by multiple goroutines.
storeStreamsCacheLock sync.RWMutex

// use sync.Pool to store resolved ts event only, because resolved ts event
// has the same size and generate cycle.
Expand Down Expand Up @@ -414,8 +433,7 @@ func newEventFeedSession(
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"),
rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace,
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"),
streams: make(map[string]*eventFeedStream),
streamsCanceller: make(map[string]context.CancelFunc),
storeStreamsCache: make(map[string]*eventFeedStream),
resolvedTsPool: sync.Pool{
New: func() any {
return &regionStatefulEvent{
Expand Down Expand Up @@ -605,7 +623,7 @@ func (s *eventFeedSession) requestRegionToStore(
// Stores pending regions info for each stream. After sending a new request, the region info wil be put to the map,
// and it will be loaded by the receiver thread when it receives the first response from that region. We need this
// to pass the region info to the receiver since the region info cannot be inferred from the response from TiKV.
storePendingRegions := make(map[string]*syncRegionFeedStateMap)
storePendingRegions := make(map[uint64]*syncRegionFeedStateMap)

header := &cdcpb.Header{
ClusterId: s.client.clusterID,
Expand Down Expand Up @@ -653,10 +671,8 @@ func (s *eventFeedSession) requestRegionToStore(
// regions map, the old map will be used in old `receiveFromStream`
// and won't be deleted until that goroutine exits.
pendingRegions := newSyncRegionFeedStateMap()
storePendingRegions[storeAddr] = pendingRegions
streamCtx, streamCancel := context.WithCancel(ctx)
_ = streamCancel // to avoid possible context leak warning from govet
stream, err = s.client.newStream(streamCtx, storeAddr, storeID)
stream, err = s.client.newStream(streamCtx, streamCancel, storeAddr, storeID)
if err != nil {
// get stream failed, maybe the store is down permanently, we should try to relocate the active store
log.Warn("get grpc stream client failed",
Expand All @@ -681,7 +697,9 @@ func (s *eventFeedSession) requestRegionToStore(
s.onRegionFail(ctx, errInfo)
continue
}
s.addStream(storeAddr, stream, streamCancel)

storePendingRegions[stream.id] = pendingRegions
s.addStream(stream)
log.Info("creating new stream to store to send request",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
Expand All @@ -690,15 +708,15 @@ func (s *eventFeedSession) requestRegionToStore(
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", requestID),
zap.Uint64("storeID", storeID),
zap.String("store", storeAddr))
zap.String("store", storeAddr),
zap.Uint64("streamID", stream.id))

g.Go(func() error {
defer s.deleteStream(storeAddr)
return s.receiveFromStream(ctx, storeAddr, storeID, stream.client, pendingRegions)
return s.receiveFromStream(ctx, stream, pendingRegions)
})
}

pendingRegions, ok := storePendingRegions[storeAddr]
pendingRegions, ok := storePendingRegions[stream.id]
if !ok {
// Should never happen
log.Error("pending regions is not found for store",
Expand Down Expand Up @@ -728,9 +746,8 @@ func (s *eventFeedSession) requestRegionToStore(
zap.Stringer("span", &sri.span))

err = stream.client.Send(req)

// If Send error, the receiver should have received error too or will receive error soon. So we don't need
// to do extra work here.
// If Send returns an error, the stream.client.Recv (In s.receiveFromStream)
// would also receive an error.
if err != nil {
log.Warn("send request to stream failed",
zap.String("namespace", s.changefeed.Namespace),
Expand All @@ -752,14 +769,16 @@ func (s *eventFeedSession) requestRegionToStore(
zap.String("store", storeAddr),
zap.Error(err))
}
// Delete the stream from the map so that the next time the store is accessed, the stream will be
// re-established.
s.deleteStream(storeAddr)
// TODO(dongmen): remove this line after testing.
time.Sleep(time.Second * 5)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
// Delete the stream from the cache so that when next time the store is accessed,
// the stream can be re-established.
s.deleteStream(stream)
// Delete `pendingRegions` from `storePendingRegions` so that the next time a region of this store is
// requested, it will create a new one. So if the `receiveFromStream` goroutine tries to stop all
// pending regions, the new pending regions that are requested after reconnecting won't be stopped
// incorrectly.
delete(storePendingRegions, storeAddr)
delete(storePendingRegions, stream.id)

// Remove the region from pendingRegions. If it's already removed, it should be already retried by
// `receiveFromStream`, so no need to retry here.
Expand Down Expand Up @@ -1024,14 +1043,12 @@ func (s *eventFeedSession) getRPCContextForRegion(ctx context.Context, id tikv.R
// routine exits to establish these regions.
func (s *eventFeedSession) receiveFromStream(
parentCtx context.Context,
addr string,
storeID uint64,
stream cdcpb.ChangeData_EventFeedClient,
stream *eventFeedStream,
pendingRegions *syncRegionFeedStateMap,
) error {
var tsStat *tableStoreStat
s.client.tableStoreStats.Lock()
key := fmt.Sprintf("%d_%d", s.totalSpan.TableID, storeID)
key := fmt.Sprintf("%d_%d", s.totalSpan.TableID, stream.storeID)
if tsStat = s.client.tableStoreStats.v[key]; tsStat == nil {
tsStat = new(tableStoreStat)
s.client.tableStoreStats.v[key] = tsStat
Expand All @@ -1047,8 +1064,9 @@ func (s *eventFeedSession) receiveFromStream(
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("storeID", storeID),
zap.String("store", addr))
zap.String("store", stream.addr),
zap.Uint64("storeID", stream.storeID),
zap.Uint64("streamID", stream.id))

failpoint.Inject("kvClientStreamCloseDelay", nil)

Expand All @@ -1069,13 +1087,19 @@ func (s *eventFeedSession) receiveFromStream(
metricSendEventBatchResolvedSize := batchResolvedEventSize.
WithLabelValues(s.changefeed.Namespace, s.changefeed.ID)
metricReceiveBusyRatio := workerBusyRatio.WithLabelValues(
s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-receiver")
s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-receiver")
metricProcessBusyRatio := workerBusyRatio.WithLabelValues(
s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-processor")
s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-processor")

// always create a new region worker, because `receiveFromStream` is ensured
// to call exactly once from outer code logic
worker := newRegionWorker(parentCtx, s.changefeed, s, addr, pendingRegions)
worker := newRegionWorker(
parentCtx,
stream.cancel,
s.changefeed,
s,
stream.addr,
pendingRegions)
defer worker.evictAllRegions()

ctx, cancel := context.WithCancel(parentCtx)
Expand All @@ -1098,8 +1122,9 @@ func (s *eventFeedSession) receiveFromStream(
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("store", addr),
zap.Any("storeID", storeID),
zap.String("store", stream.addr),
zap.Uint64("storeID", stream.storeID),
zap.Uint64("streamID", stream.id),
zap.Error(err))
}
return err
Expand All @@ -1113,7 +1138,7 @@ func (s *eventFeedSession) receiveFromStream(
maxCommitTs := model.Ts(0)
for {
startToReceive := time.Now()
cevent, err := stream.Recv()
cevent, err := stream.client.Recv()

if s.enableTableMonitor {
receiveTime += time.Since(startToReceive)
Expand Down Expand Up @@ -1151,17 +1176,18 @@ func (s *eventFeedSession) receiveFromStream(
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("storeID", storeID),
zap.String("store", addr))
zap.String("store", stream.addr),
zap.Uint64("storeID", stream.storeID),
zap.Uint64("streamID", stream.id))
} else {
log.Warn("failed to receive from stream",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("storeID", storeID),
zap.String("store", addr),
zap.String("store", stream.addr),
zap.Uint64("storeID", stream.storeID),
zap.Uint64("streamID", stream.id),
zap.Error(err))
// Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down
// tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new
Expand All @@ -1173,13 +1199,9 @@ func (s *eventFeedSession) receiveFromStream(
// needs time to recover, kv client doesn't need to retry frequently.
// TODO: add a better retry backoff or rate limitter
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))

// TODO: better to closes the send direction of the stream to notify
// the other side, but it is not safe to call CloseSend concurrently
// with SendMsg, in future refactor we should refine the recv loop
s.deleteStream(addr)

// send nil regionStatefulEvent to signal worker exit
// worker.sendEvents will return error if ctx is canceled
// In this case, we should return the error to the caller to cancel the whole job.
err = worker.sendEvents(ctx, []*regionStatefulEvent{nil})
if err != nil {
return err
Expand Down Expand Up @@ -1386,37 +1408,47 @@ func (s *eventFeedSession) sendResolvedTs(
return nil
}

func (s *eventFeedSession) addStream(storeAddr string, stream *eventFeedStream, cancel context.CancelFunc) {
s.streamsLock.Lock()
defer s.streamsLock.Unlock()
s.streams[storeAddr] = stream
s.streamsCanceller[storeAddr] = cancel
func (s *eventFeedSession) addStream(stream *eventFeedStream) {
s.storeStreamsCacheLock.Lock()
defer s.storeStreamsCacheLock.Unlock()
s.storeStreamsCache[stream.addr] = stream
}

func (s *eventFeedSession) deleteStream(storeAddr string) {
s.streamsLock.Lock()
defer s.streamsLock.Unlock()
if stream, ok := s.streams[storeAddr]; ok {
s.client.grpcPool.ReleaseConn(stream.conn, storeAddr)
delete(s.streams, storeAddr)
}
if cancel, ok := s.streamsCanceller[storeAddr]; ok {
cancel()
delete(s.streamsCanceller, storeAddr)
// deleteStream deletes a stream from the session.streams.
// If the stream is not found, it will be ignored.
func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) {
s.storeStreamsCacheLock.Lock()
defer s.storeStreamsCacheLock.Unlock()

if streamInMap, ok := s.storeStreamsCache[streamToDelete.addr]; ok {
if streamInMap.id != streamToDelete.id {
log.Warn("delete stream failed, stream id mismatch, ignore it",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("streamID", streamToDelete.id),
zap.Uint64("streamIDInMap", streamInMap.id))
return
}
s.client.grpcPool.ReleaseConn(streamToDelete.conn, streamToDelete.addr)
delete(s.storeStreamsCache, streamToDelete.addr)
}
streamToDelete.cancel()
log.Info("A stream to store has been removed",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("store", streamToDelete.addr),
zap.Uint64("storeID", streamToDelete.storeID),
zap.Uint64("streamID", streamToDelete.id))
}

func (s *eventFeedSession) getStream(storeAddr string) (stream *eventFeedStream, ok bool) {
s.streamsLock.RLock()
defer s.streamsLock.RUnlock()
stream, ok = s.streams[storeAddr]
return
}

func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.CancelFunc, ok bool) {
s.streamsLock.RLock()
defer s.streamsLock.RUnlock()
cancel, ok = s.streamsCanceller[storeAddr]
s.storeStreamsCacheLock.RLock()
defer s.storeStreamsCacheLock.RUnlock()
stream, ok = s.storeStreamsCache[storeAddr]
return
}

Expand Down
Loading
Loading