Skip to content

Commit

Permalink
package reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Feb 5, 2021
1 parent 7c12c2b commit 9d7ce1c
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 133 deletions.
144 changes: 144 additions & 0 deletions cdc/kv/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ package kv

import (
"context"
"io"
"math/rand"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type regionStatefulEvent struct {
Expand Down Expand Up @@ -141,3 +149,139 @@ func (s *eventFeedSession) sendResolvedTsV2(
}
return nil
}

// receiveFromStreamV2 receives gRPC messages from a stream continuously and sends
// messages to region worker, if `stream.Recv` meets error, this routine will exit
// silently. As for regions managed by this routine, there are two situations:
// 1. established regions: a `nil` event will be sent to region worker, and region
// worker call `s.onRegionFail` to re-establish these regions.
// 2. pending regions: call `s.onRegionFail` for each pending region before this
// routine exits to establish these regions.
func (s *eventFeedSession) receiveFromStreamV2(
ctx context.Context,
g *errgroup.Group,
addr string,
storeID uint64,
stream cdcpb.ChangeData_EventFeedClient,
pendingRegions *syncRegionFeedStateMap,
limiter *rate.Limiter,
) error {
// Cancel the pending regions if the stream failed. Otherwise it will remain unhandled in the pendingRegions list
// however not registered in the new reconnected stream.
defer func() {
log.Info("stream to store closed", zap.String("addr", addr), zap.Uint64("storeID", storeID))

remainingRegions := pendingRegions.takeAll()
for _, state := range remainingRegions {
err := s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(),
})
if err != nil {
// The only possible is that the ctx is cancelled. Simply return.
return
}
}
s.workersLock.Lock()
delete(s.workers, addr)
s.workersLock.Unlock()
}()

captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricSendEventBatchResolvedSize := batchResolvedEventSize.WithLabelValues(captureAddr, changefeedID)

s.workersLock.Lock()
worker, ok := s.workers[addr]
if !ok {
worker = &regionWorker{
session: s,
limiter: limiter,
inputCh: make(chan *regionStatefulEvent, 1024),
outputCh: s.eventCh,
regionStates: make(map[uint64]*regionFeedState),
enableOldValue: s.enableOldValue,
}
s.workers[addr] = worker
}
s.workersLock.Unlock()

g.Go(func() error {
return worker.run(ctx)
})

for {
cevent, err := stream.Recv()

failpoint.Inject("kvClientStreamRecvError", func() {
err = errors.New("injected stream recv error")
})
if err == io.EOF {
close(worker.inputCh)
return nil
}
if err != nil {
if status.Code(errors.Cause(err)) == codes.Canceled {
log.Debug(
"receive from stream canceled",
zap.String("addr", addr),
zap.Uint64("storeID", storeID),
)
} else {
log.Error(
"failed to receive from stream",
zap.String("addr", addr),
zap.Uint64("storeID", storeID),
zap.Error(err),
)
}

// Use the same delay mechanism as `stream.Send` error handling, since
// these two errors often mean upstream store suffers an accident, which
// needs time to recover, kv client doesn't need to retry frequently.
// TODO: add a better retry backoff or rate limitter
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))

// TODO: better to closes the send direction of the stream to notify
// the other side, but it is not safe to call CloseSend concurrently
// with SendMsg, in future refactor we should refine the recv loop
s.deleteStream(addr)

// send nil regionStatefulEvent to signal worker exit
select {
case worker.inputCh <- nil:
case <-ctx.Done():
return ctx.Err()
}

// Do no return error but gracefully stop the goroutine here. Then the whole job will not be canceled and
// connection will be retried.
return nil
}

size := cevent.Size()
if size > warnRecvMsgSizeThreshold {
regionCount := 0
if cevent.ResolvedTs != nil {
regionCount = len(cevent.ResolvedTs.Regions)
}
log.Warn("change data event size too large",
zap.Int("size", size), zap.Int("event length", len(cevent.Events)),
zap.Int("resolved region count", regionCount))
}

for _, event := range cevent.Events {
err = s.sendRegionChangeEventV2(ctx, g, event, worker, pendingRegions, addr, limiter)
if err != nil {
return err
}
}
if cevent.ResolvedTs != nil {
metricSendEventBatchResolvedSize.Observe(float64(len(cevent.ResolvedTs.Regions)))
err = s.sendResolvedTsV2(ctx, g, cevent.ResolvedTs, worker, addr)
if err != nil {
return err
}
}
}
}
133 changes: 0 additions & 133 deletions cdc/kv/region_worker.go
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ package kv

import (
"context"
"io"
"math/rand"
"sync"
"time"

Expand All @@ -32,8 +30,6 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type regionWorker struct {
Expand Down Expand Up @@ -398,132 +394,3 @@ func (w *regionWorker) evitAllRegions(ctx context.Context) error {
}
return nil
}

func (s *eventFeedSession) receiveFromStreamV2(
ctx context.Context,
g *errgroup.Group,
addr string,
storeID uint64,
stream cdcpb.ChangeData_EventFeedClient,
pendingRegions *syncRegionFeedStateMap,
limiter *rate.Limiter,
) error {
// Cancel the pending regions if the stream failed. Otherwise it will remain unhandled in the pendingRegions list
// however not registered in the new reconnected stream.
defer func() {
log.Info("stream to store closed", zap.String("addr", addr), zap.Uint64("storeID", storeID))

remainingRegions := pendingRegions.takeAll()
for _, state := range remainingRegions {
err := s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(),
})
if err != nil {
// The only possible is that the ctx is cancelled. Simply return.
return
}
}
s.workersLock.Lock()
delete(s.workers, addr)
s.workersLock.Unlock()
}()

captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricSendEventBatchResolvedSize := batchResolvedEventSize.WithLabelValues(captureAddr, changefeedID)

s.workersLock.Lock()
worker, ok := s.workers[addr]
if !ok {
worker = &regionWorker{
session: s,
limiter: limiter,
inputCh: make(chan *regionStatefulEvent, 1024),
outputCh: s.eventCh,
regionStates: make(map[uint64]*regionFeedState),
enableOldValue: s.enableOldValue,
}
s.workers[addr] = worker
}
s.workersLock.Unlock()

g.Go(func() error {
return worker.run(ctx)
})

for {
cevent, err := stream.Recv()

failpoint.Inject("kvClientStreamRecvError", func() {
err = errors.New("injected stream recv error")
})
if err == io.EOF {
close(worker.inputCh)
return nil
}
if err != nil {
if status.Code(errors.Cause(err)) == codes.Canceled {
log.Debug(
"receive from stream canceled",
zap.String("addr", addr),
zap.Uint64("storeID", storeID),
)
} else {
log.Error(
"failed to receive from stream",
zap.String("addr", addr),
zap.Uint64("storeID", storeID),
zap.Error(err),
)
}

// Use the same delay mechanism as `stream.Send` error handling, since
// these two errors often mean upstream store suffers an accident, which
// needs time to recover, kv client doesn't need to retry frequently.
// TODO: add a better retry backoff or rate limitter
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))

// TODO: better to closes the send direction of the stream to notify
// the other side, but it is not safe to call CloseSend concurrently
// with SendMsg, in future refactor we should refine the recv loop
s.deleteStream(addr)

// send nil regionStatefulEvent to signal worker exit
select {
case worker.inputCh <- nil:
case <-ctx.Done():
return ctx.Err()
}

// Do no return error but gracefully stop the goroutine here. Then the whole job will not be canceled and
// connection will be retried.
return nil
}

size := cevent.Size()
if size > warnRecvMsgSizeThreshold {
regionCount := 0
if cevent.ResolvedTs != nil {
regionCount = len(cevent.ResolvedTs.Regions)
}
log.Warn("change data event size too large",
zap.Int("size", size), zap.Int("event length", len(cevent.Events)),
zap.Int("resolved region count", regionCount))
}

for _, event := range cevent.Events {
err = s.sendRegionChangeEventV2(ctx, g, event, worker, pendingRegions, addr, limiter)
if err != nil {
return err
}
}
if cevent.ResolvedTs != nil {
metricSendEventBatchResolvedSize.Observe(float64(len(cevent.ResolvedTs.Regions)))
err = s.sendResolvedTsV2(ctx, g, cevent.ResolvedTs, worker, addr)
if err != nil {
return err
}
}
}
}

0 comments on commit 9d7ce1c

Please sign in to comment.