Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache memory limit for syncer of drainer #715

Closed
wants to merge 12 commits into from
89 changes: 89 additions & 0 deletions drainer/binlog_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ package drainer

import (
"fmt"
"sync"

"github.com/pingcap/log"
"github.com/pingcap/parser/model"
pb "github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"
)

type binlogItem struct {
Expand All @@ -41,6 +44,13 @@ func (b *binlogItem) String() string {
return fmt.Sprintf("{startTS: %d, commitTS: %d, node: %s}", b.binlog.StartTs, b.binlog.CommitTs, b.nodeID)
}

func (b *binlogItem) Size() int64 {
if b.binlog == nil {
return 0
}
return int64(len(b.binlog.DdlQuery) + len(b.binlog.PrewriteKey) + len(b.binlog.PrewriteValue) + len(b.binlog.XXX_unrecognized))
}

func newBinlogItem(b *pb.Binlog, nodeID string) *binlogItem {
itemp := &binlogItem{
binlog: b,
Expand All @@ -54,3 +64,82 @@ func newBinlogItem(b *pb.Binlog, nodeID string) *binlogItem {
func (b *binlogItem) SetJob(job *model.Job) {
b.job = job
}

type binlogItemCache struct {
cachedChan chan *binlogItem
cachedSize int64
maxBinlogCacheSize int64
cond *sync.Cond
quiting bool

finished chan struct{}
}

func newBinlogItemCache(maxBinlogItemCount int, maxBinlogCacheSize int64) (bc *binlogItemCache) {
return &binlogItemCache{
cachedChan: make(chan *binlogItem, maxBinlogItemCount),
maxBinlogCacheSize: maxBinlogCacheSize,
cond: sync.NewCond(new(sync.Mutex)),
finished: make(chan struct{}),
}
}

func (bc *binlogItemCache) Push(b *binlogItem, shutdown chan struct{}) chan struct{} {
go func() {
bc.cond.L.Lock()
sz := b.Size()
if sz >= bc.maxBinlogCacheSize {
for bc.cachedSize != 0 && !bc.quiting {
bc.cond.Wait()
}
} else {
for bc.cachedSize+sz > bc.maxBinlogCacheSize && !bc.quiting {
bc.cond.Wait()
}
}
bc.cond.L.Unlock()
select {
case <-shutdown:
case bc.cachedChan <- b:
bc.cond.L.Lock()
bc.cachedSize += sz
bc.cond.L.Unlock()
log.Debug("receive publish binlog item", zap.Stringer("item", b))
}
bc.finished <- struct{}{}
}()
return bc.finished
}

func (bc *binlogItemCache) Pop(shutdown chan struct{}) chan *binlogItem {
result := make(chan *binlogItem)
go func() {
for !bc.quiting {
select {
case <-shutdown:
return
case b := <-bc.cachedChan:
select {
case <-shutdown:
return
case result <- b:
bc.cond.L.Lock()
// has popped new binlog item, minus cachedSize
bc.cachedSize -= b.Size()
bc.cond.Signal()
bc.cond.L.Unlock()
}
}
}
}()
return result
}

func (bc *binlogItemCache) Close() {
bc.quiting = true
bc.cond.Signal()
}

func (bc *binlogItemCache) Len() int {
return len(bc.cachedChan)
}
8 changes: 4 additions & 4 deletions drainer/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ var _ = Suite(&syncBinlogSuite{})

func (s *syncBinlogSuite) TestShouldAddToSyncer(c *C) {
syncer := Syncer{
input: make(chan *binlogItem, 1),
input: newBinlogItemCache(1, defaultBinlogCacheSize),
}
col := Collector{syncer: &syncer}

Expand All @@ -164,8 +164,8 @@ func (s *syncBinlogSuite) TestShouldAddToSyncer(c *C) {
err := col.syncBinlog(&item)

c.Assert(err, IsNil)
c.Assert(len(syncer.input), Equals, 1)
get := <-syncer.input
c.Assert(syncer.input.Len(), Equals, 1)
get := <-syncer.input.Pop(make(chan struct{}))
c.Assert(get.binlog.CommitTs, Equals, item.binlog.CommitTs)
}

Expand Down Expand Up @@ -203,7 +203,7 @@ func (s *syncBinlogSuite) TestShouldSetJob(c *C) {
defer func() { fDDLJobGetter = origDDLGetter }()

syncer := Syncer{
input: make(chan *binlogItem, 1),
input: newBinlogItemCache(1, defaultBinlogCacheSize),
}
col := Collector{syncer: &syncer}

Expand Down
36 changes: 20 additions & 16 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,31 @@ const (
)

var (
maxBinlogItemCount int
defaultBinlogCacheSize int64 = 4 << 30 // 4GB

defaultBinlogItemCount = 16 << 12
supportedCompressors = [...]string{"gzip"}
newZKFromConnectionString = zk.NewFromConnectionString
)

// SyncerConfig is the Syncer's configuration.
type SyncerConfig struct {
StrSQLMode *string `toml:"sql-mode" json:"sql-mode"`
SQLMode mysql.SQLMode `toml:"-" json:"-"`
IgnoreTxnCommitTS []int64 `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"`
IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"`
IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
To *dsync.DBConfig `toml:"to" json:"to"`
DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"`
DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"`
DestDBType string `toml:"db-type" json:"db-type"`
DisableDispatch bool `toml:"disable-dispatch" json:"disable-dispatch"`
SafeMode bool `toml:"safe-mode" json:"safe-mode"`
DisableCausality bool `toml:"disable-detect" json:"disable-detect"`
StrSQLMode *string `toml:"sql-mode" json:"sql-mode"`
SQLMode mysql.SQLMode `toml:"-" json:"-"`
IgnoreTxnCommitTS []int64 `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"`
IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"`
IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"`
MaxCacheBinlogCount int `toml:"cache-binlog-count" json:"cache-binlog-count"`
MaxCacheBinlogSize int64 `toml:"cache-binlog-size" json:"cache-binlog-size"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
To *dsync.DBConfig `toml:"to" json:"to"`
DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"`
DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"`
DestDBType string `toml:"db-type" json:"db-type"`
DisableDispatch bool `toml:"disable-dispatch" json:"disable-dispatch"`
SafeMode bool `toml:"safe-mode" json:"safe-mode"`
DisableCausality bool `toml:"disable-detect" json:"disable-detect"`
}

// Config holds the configuration of drainer
Expand Down Expand Up @@ -132,7 +135,8 @@ func NewConfig() *Config {
fs.BoolVar(&cfg.SyncerCfg.DisableDispatch, "disable-dispatch", false, "disable dispatching sqls that in one same binlog; if set true, work-count and txn-batch would be useless")
fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.BoolVar(&cfg.SyncerCfg.DisableCausality, "disable-detect", false, "disable detect causality")
fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size")
fs.IntVar(&cfg.SyncerCfg.MaxCacheBinlogCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size")
fs.Int64Var(&cfg.SyncerCfg.MaxCacheBinlogSize, "cache-binlog-size", defaultBinlogCacheSize, "blurry memory usage of binlogs in cache, limit cached memory usage, unit is Byte")
fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced")
fs.StringVar(new(string), "log-rotate", "", "DEPRECATED")

Expand Down
14 changes: 8 additions & 6 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Syncer struct {

cfg *SyncerConfig

input chan *binlogItem
input *binlogItemCache

filter *filter.Filter

Expand All @@ -60,7 +60,7 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) (
syncer := new(Syncer)
syncer.cfg = cfg
syncer.cp = cp
syncer.input = make(chan *binlogItem, maxBinlogItemCount)
syncer.input = newBinlogItemCache(cfg.MaxCacheBinlogCount, cfg.MaxCacheBinlogSize)
syncer.lastSyncTime = time.Now()
syncer.shutdown = make(chan struct{})
syncer.closed = make(chan struct{})
Expand Down Expand Up @@ -282,6 +282,7 @@ func (s *Syncer) run() error {

var lastAddComitTS int64
dsyncError := s.dsyncer.Error()
inputChan := s.input.Pop(s.shutdown)
ForLoop:
for {
// check if we can safely push a fake binlog
Expand All @@ -303,8 +304,8 @@ ForLoop:
case pushFakeBinlog <- fakeBinlog:
pushFakeBinlog = nil
continue
case b = <-s.input:
queueSizeGauge.WithLabelValues("syncer_input").Set(float64(len(s.input)))
case b = <-inputChan:
queueSizeGauge.WithLabelValues("syncer_input").Set(float64(s.input.Len()))
log.Debug("consume binlog item", zap.Stringer("item", b))
}

Expand Down Expand Up @@ -408,6 +409,8 @@ ForLoop:
}
}

s.input.Close()

close(fakeBinlogCh)
cerr := s.dsyncer.Close()
if cerr != nil {
Expand Down Expand Up @@ -469,8 +472,7 @@ func isIgnoreTxnCommitTS(ignoreTxnCommitTS []int64, ts int64) bool {
func (s *Syncer) Add(b *binlogItem) {
select {
case <-s.shutdown:
case s.input <- b:
log.Debug("receive publish binlog item", zap.Stringer("item", b))
case <-s.input.Push(b, s.shutdown):
}
}

Expand Down
Loading