Skip to content

Commit

Permalink
kv/client: add incremental scan region count limit (pingcap#1899)
Browse files Browse the repository at this point in the history
Note pingcap#1926 has picked part of pingcap#1899, this PR picks the remaining change
  • Loading branch information
amyangfei committed Jul 7, 2021
1 parent 7cb8fc3 commit 8a05bfe
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 7 deletions.
7 changes: 3 additions & 4 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/regionspan"
"github.com/pingcap/ticdc/pkg/retry"
Expand Down Expand Up @@ -68,9 +69,6 @@ const (
// failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we
// don't need to force reload region any more.
regionScheduleReload = false

// defines the scan region limit for each table
regionScanLimitPerTable = 6
)

// time interval to force kv client to terminate gRPC stream and reconnect
Expand Down Expand Up @@ -559,13 +557,14 @@ func newEventFeedSession(
eventCh chan<- *model.RegionFeedEvent,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
kvClientCfg := config.GetGlobalServerConfig().KVClient
return &eventFeedSession{
client: client,
regionCache: regionCache,
kvStorage: kvStorage,
totalSpan: totalSpan,
eventCh: eventCh,
regionRouter: NewSizedRegionRouter(ctx, regionScanLimitPerTable),
regionRouter: NewSizedRegionRouter(ctx, kvClientCfg.RegionScanLimit),
regionCh: make(chan singleRegionInfo, 16),
errCh: make(chan regionErrorInfo, 16),
requestRangeCh: make(chan rangeRequestTask, 16),
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (n *pullerNode) Init(ctx pipeline.NodeContext) error {
config := ctx.ChangefeedVars().Info.Config
ctxC, cancel := context.WithCancel(ctx)
ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName)
ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID)
plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, globalConfig.Security, ctx.GlobalVars().KVStorage,
n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, config.EnableOldValue)
n.wg.Go(func() error {
Expand Down
3 changes: 3 additions & 0 deletions cmd/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) {
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 6,
},
})

Expand Down Expand Up @@ -198,6 +199,7 @@ sort-dir = "/tmp/just_a_test"
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 6,
},
})

Expand Down Expand Up @@ -263,6 +265,7 @@ cert-allowed-cn = ["dd","ee"]
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 6,
},
})
}
11 changes: 11 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ var defaultServerConfig = &ServerConfig{
KVClient: &KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0, // 0 will use NumCPU() * 2
RegionScanLimit: 6,
},
}

Expand Down Expand Up @@ -324,6 +325,16 @@ func (c *ServerConfig) ValidateAndAdjust() error {
return cerror.ErrInvalidServerOption.GenWithStackByArgs("per-table-memory-quota should be at least 6MB")
}

if c.KVClient == nil {
c.KVClient = defaultServerConfig.KVClient
}
if c.KVClient.WorkerConcurrent <= 0 {
return cerror.ErrInvalidServerOption.GenWithStackByArgs("region-scan-limit should be at least 1")
}
if c.KVClient.RegionScanLimit <= 0 {
return cerror.ErrInvalidServerOption.GenWithStackByArgs("region-scan-limit should be at least 1")
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ func (s *serverConfigSuite) TestMarshal(c *check.C) {
conf.Sorter.ChunkSizeLimit = 999
b, err := conf.Marshal()
c.Assert(err, check.IsNil)
c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}`)
c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":6}}`)
conf2 := new(ServerConfig)
err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}`))
err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":6}}`))
c.Assert(err, check.IsNil)
c.Assert(conf2, check.DeepEquals, conf)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/config/kvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ package config

// KVClientConfig represents config for kv client
type KVClientConfig struct {
// how many workers will be used for a single region worker
WorkerConcurrent int `toml:"worker-concurrent" json:"worker-concurrent"`
WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"`
// background workerpool size, the workrpool is shared by all goroutines in cdc server
WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"`
// region incremental scan limit for one table in a single store
RegionScanLimit int `toml:"region-scan-limit" json:"region-scan-limit"`
}

0 comments on commit 8a05bfe

Please sign in to comment.