Skip to content

Commit

Permalink
kv/client: add global grpc connection pool (pingcap#2511) (pingcap#2531)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and amyangfei committed Sep 30, 2021
1 parent 0824a1f commit ec2aabf
Show file tree
Hide file tree
Showing 22 changed files with 744 additions and 224 deletions.
16 changes: 12 additions & 4 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
Expand All @@ -49,7 +48,7 @@ type Capture struct {
etcdClient kv.CDCEtcdClient
pdCli pd.Client
kvStorage tidbkv.Storage
credential *security.Credential
grpcPool kv.GrpcPool

processorManager *processor.Manager

Expand Down Expand Up @@ -121,12 +120,13 @@ func NewCapture(
Version: version.ReleaseVersion,
}
processorManager := processor.NewManager()
grpcPool := kv.NewGrpcPoolImpl(stdCtx, credential)
log.Info("creating capture", zap.String("capture-id", id), util.ZapFieldCapture(stdCtx))

c = &Capture{
processors: make(map[string]*oldProcessor),
etcdClient: cli,
credential: credential,
grpcPool: grpcPool,
session: sess,
election: elec,
info: info,
Expand Down Expand Up @@ -183,6 +183,10 @@ func (c *Capture) Run(ctx context.Context) (err error) {
return errors.Trace(err)
}
} else {
defer c.grpcPool.Close()
go func() {
c.grpcPool.RecycleConn(ctx)
}()
taskWatcher := NewTaskWatcher(c, &TaskWatcherConfig{
Prefix: kv.TaskStatusKeyPrefix + "/" + c.info.ID,
ChannelSize: 128,
Expand Down Expand Up @@ -266,6 +270,10 @@ func (c *Capture) Close(ctx context.Context) error {
case <-c.closed:
case <-ctx.Done():
}
} else {
if c.grpcPool != nil {
c.grpcPool.Close()
}
}
return errors.Trace(c.etcdClient.DeleteCaptureInfo(ctx, c.info.ID))
}
Expand Down Expand Up @@ -310,7 +318,7 @@ func (c *Capture) assignTask(ctx context.Context, task *Task) (*oldProcessor, er
zap.String("changefeed", task.ChangeFeedID))
conf := config.GetGlobalServerConfig()
p, err := runProcessorImpl(
ctx, c.pdCli, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, time.Duration(conf.ProcessorFlushInterval))
ctx, c.pdCli, c.grpcPool, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, time.Duration(conf.ProcessorFlushInterval))
if err != nil {
log.Error("run processor failed",
zap.String("changefeed", task.ChangeFeedID),
Expand Down
19 changes: 16 additions & 3 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Capture struct {
pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *kv.CDCEtcdClient
grpcPool kv.GrpcPool

cancel context.CancelFunc

Expand All @@ -77,7 +78,7 @@ func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *kv.CDC
}
}

func (c *Capture) reset() error {
func (c *Capture) reset(ctx context.Context) error {
c.captureMu.Lock()
defer c.captureMu.Unlock()
conf := config.GetGlobalServerConfig()
Expand All @@ -97,6 +98,10 @@ func (c *Capture) reset() error {
}
c.session = sess
c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey)
if c.grpcPool != nil {
c.grpcPool.Close()
}
c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security)
log.Info("init capture", zap.String("capture-id", c.info.ID), zap.String("capture-addr", c.info.AdvertiseAddr))
return nil
}
Expand All @@ -121,7 +126,7 @@ func (c *Capture) Run(ctx context.Context) error {
}
return errors.Trace(err)
}
err = c.reset()
err = c.reset(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -145,6 +150,7 @@ func (c *Capture) run(stdCtx context.Context) error {
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
})
err := c.register(ctx)
if err != nil {
Expand All @@ -158,7 +164,7 @@ func (c *Capture) run(stdCtx context.Context) error {
cancel()
}()
wg := new(sync.WaitGroup)
wg.Add(2)
wg.Add(3)
var ownerErr, processorErr error
go func() {
defer wg.Done()
Expand All @@ -180,6 +186,10 @@ func (c *Capture) run(stdCtx context.Context) error {
processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval)
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
go func() {
defer wg.Done()
c.grpcPool.RecycleConn(ctx)
}()
wg.Wait()
if ownerErr != nil {
return errors.Annotate(ownerErr, "owner exited with error")
Expand Down Expand Up @@ -336,6 +346,9 @@ func (c *Capture) AsyncClose() {
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
if c.grpcPool != nil {
c.grpcPool.Close()
}
}

// WriteDebugInfo writes the debug info into writer.
Expand Down
3 changes: 1 addition & 2 deletions cdc/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/util/testleak"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -121,7 +120,7 @@ func (s *captureSuite) TestCaptureSessionDoneDuringHandleTask(c *check.C) {
}()
runProcessorBackup := runProcessorImpl
runProcessorImpl = func(
ctx context.Context, _ pd.Client, _ *security.Credential,
ctx context.Context, _ pd.Client, grpcPool kv.GrpcPool,
session *concurrency.Session, info model.ChangeFeedInfo, changefeedID string,
captureInfo model.CaptureInfo, checkpointTs uint64, flushCheckpointInterval time.Duration,
) (*oldProcessor, error) {
Expand Down
Loading

0 comments on commit ec2aabf

Please sign in to comment.