Skip to content

Commit

Permalink
add debug kvclient log
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen committed Jan 26, 2024
1 parent e29f81d commit c78377f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
24 changes: 20 additions & 4 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"fmt"
"io"
"math/rand"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -669,7 +668,15 @@ func (s *eventFeedSession) requestRegionToStore(
zap.String("addr", storeAddr))

g.Go(func() error {
defer s.deleteStream(storeAddr)
defer func() {
log.Info("fizz-1:receiveFromStream exited, delete stream to store",
zap.Stringer("changefeed", s.changefeed),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("store", storeAddr))
time.Sleep(1 * time.Second)
s.deleteStream(storeAddr)
}()
return s.receiveFromStream(ctx, storeAddr, storeID, stream.client, pendingRegions)
})
}
Expand Down Expand Up @@ -725,6 +732,11 @@ func (s *eventFeedSession) requestRegionToStore(
}
// Delete the stream from the map so that the next time the store is accessed, the stream will be
// re-established.
log.Info("fizz-2:stream client send error, delete stream to store",
zap.Stringer("changefeed", s.changefeed),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("store", storeAddr))
s.deleteStream(storeAddr)
// Delete `pendingRegions` from `storePendingRegions` so that the next time a region of this store is
// requested, it will create a new one. So if the `receiveFromStream` goroutine tries to stop all
Expand Down Expand Up @@ -1082,8 +1094,12 @@ func (s *eventFeedSession) receiveFromStream(
// these two errors often mean upstream store suffers an accident, which
// needs time to recover, kv client doesn't need to retry frequently.
// TODO: add a better retry backoff or rate limitter
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))

time.Sleep(time.Millisecond * time.Duration(1000))
log.Info("fizz-3:stream client recv error, delete stream to store",
zap.Stringer("changefeed", s.changefeed),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("store", addr))
// TODO: better to closes the send direction of the stream to notify
// the other side, but it is not safe to call CloseSend concurrently
// with SendMsg, in future refactor we should refine the recv loop
Expand Down
5 changes: 5 additions & 0 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ func (w *regionWorker) checkShouldExit() error {
// If there is no region maintained by this region worker, exit it and
// cancel the gRPC stream.
if empty && w.pendingRegions.len() == 0 {
log.Info("fizz-4:region state empty, cancel stream to store",
zap.Stringer("changefeed", w.session.changefeed),
zap.Int64("tableID", w.session.tableID),
zap.String("tableName", w.session.id),
zap.String("store", w.storeAddr))
w.cancelStream(time.Duration(0))
return cerror.ErrRegionWorkerExit.GenWithStackByArgs()
}
Expand Down

0 comments on commit c78377f

Please sign in to comment.