Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/client: add global grpc connection pool #2511

Merged
merged 11 commits into from
Aug 15, 2021
Merged
16 changes: 12 additions & 4 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
Expand All @@ -49,7 +48,7 @@ type Capture struct {
etcdClient kv.CDCEtcdClient
pdCli pd.Client
kvStorage tidbkv.Storage
credential *security.Credential
grpcPool kv.GrpcPool

processorManager *processor.Manager

Expand Down Expand Up @@ -121,12 +120,13 @@ func NewCapture(
Version: version.ReleaseVersion,
}
processorManager := processor.NewManager()
grpcPool := kv.NewGrpcPoolImpl(stdCtx, credential)
log.Info("creating capture", zap.String("capture-id", id), util.ZapFieldCapture(stdCtx))

c = &Capture{
processors: make(map[string]*oldProcessor),
etcdClient: cli,
credential: credential,
grpcPool: grpcPool,
session: sess,
election: elec,
info: info,
Expand Down Expand Up @@ -183,6 +183,10 @@ func (c *Capture) Run(ctx context.Context) (err error) {
return errors.Trace(err)
}
} else {
defer c.grpcPool.Close()
go func() {
c.grpcPool.RecycleConn(ctx)
}()
taskWatcher := NewTaskWatcher(c, &TaskWatcherConfig{
Prefix: kv.TaskStatusKeyPrefix + "/" + c.info.ID,
ChannelSize: 128,
Expand Down Expand Up @@ -266,6 +270,10 @@ func (c *Capture) Close(ctx context.Context) error {
case <-c.closed:
case <-ctx.Done():
}
} else {
if c.grpcPool != nil {
c.grpcPool.Close()
}
}
return errors.Trace(c.etcdClient.DeleteCaptureInfo(ctx, c.info.ID))
}
Expand Down Expand Up @@ -310,7 +318,7 @@ func (c *Capture) assignTask(ctx context.Context, task *Task) (*oldProcessor, er
zap.String("changefeed", task.ChangeFeedID))
conf := config.GetGlobalServerConfig()
p, err := runProcessorImpl(
ctx, c.pdCli, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, time.Duration(conf.ProcessorFlushInterval))
ctx, c.pdCli, c.grpcPool, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, time.Duration(conf.ProcessorFlushInterval))
if err != nil {
log.Error("run processor failed",
zap.String("changefeed", task.ChangeFeedID),
Expand Down
19 changes: 16 additions & 3 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Capture struct {
pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *kv.CDCEtcdClient
grpcPool kv.GrpcPool

cancel context.CancelFunc

Expand All @@ -77,7 +78,7 @@ func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *kv.CDC
}
}

func (c *Capture) reset() error {
func (c *Capture) reset(ctx context.Context) error {
c.captureMu.Lock()
defer c.captureMu.Unlock()
conf := config.GetGlobalServerConfig()
Expand All @@ -97,6 +98,10 @@ func (c *Capture) reset() error {
}
c.session = sess
c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey)
if c.grpcPool != nil {
c.grpcPool.Close()
}
c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security)
log.Info("init capture", zap.String("capture-id", c.info.ID), zap.String("capture-addr", c.info.AdvertiseAddr))
return nil
}
Expand All @@ -121,7 +126,7 @@ func (c *Capture) Run(ctx context.Context) error {
}
return errors.Trace(err)
}
err = c.reset()
err = c.reset(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -145,6 +150,7 @@ func (c *Capture) run(stdCtx context.Context) error {
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
})
err := c.register(ctx)
if err != nil {
Expand All @@ -158,7 +164,7 @@ func (c *Capture) run(stdCtx context.Context) error {
cancel()
}()
wg := new(sync.WaitGroup)
wg.Add(2)
wg.Add(3)
var ownerErr, processorErr error
go func() {
defer wg.Done()
Expand All @@ -180,6 +186,10 @@ func (c *Capture) run(stdCtx context.Context) error {
processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval)
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
go func() {
defer wg.Done()
c.grpcPool.RecycleConn(ctx)
}()
wg.Wait()
if ownerErr != nil {
return errors.Annotate(ownerErr, "owner exited with error")
Expand Down Expand Up @@ -336,6 +346,9 @@ func (c *Capture) AsyncClose() {
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
if c.grpcPool != nil {
c.grpcPool.Close()
}
}

// WriteDebugInfo writes the debug info into writer.
Expand Down
3 changes: 1 addition & 2 deletions cdc/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/util/testleak"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -121,7 +120,7 @@ func (s *captureSuite) TestCaptureSessionDoneDuringHandleTask(c *check.C) {
}()
runProcessorBackup := runProcessorImpl
runProcessorImpl = func(
ctx context.Context, _ pd.Client, _ *security.Credential,
ctx context.Context, _ pd.Client, grpcPool kv.GrpcPool,
session *concurrency.Session, info model.ChangeFeedInfo, changefeedID string,
captureInfo model.CaptureInfo, checkpointTs uint64, flushCheckpointInterval time.Duration,
) (*oldProcessor, error) {
Expand Down
Loading