Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache memory limit for syncer of drainer #715

Closed
wants to merge 12 commits into from
70 changes: 70 additions & 0 deletions drainer/binlog_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ package drainer

import (
"fmt"
"sync"

"github.com/pingcap/log"
"github.com/pingcap/parser/model"
pb "github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"
)

type binlogItem struct {
Expand All @@ -41,6 +44,13 @@ func (b *binlogItem) String() string {
return fmt.Sprintf("{startTS: %d, commitTS: %d, node: %s}", b.binlog.StartTs, b.binlog.CommitTs, b.nodeID)
}

func (b *binlogItem) Size() int64 {
if b.binlog == nil {
return 0
}
return int64(len(b.binlog.DdlQuery) + len(b.binlog.PrewriteKey) + len(b.binlog.PrewriteValue) + len(b.binlog.XXX_unrecognized))
}

func newBinlogItem(b *pb.Binlog, nodeID string) *binlogItem {
itemp := &binlogItem{
binlog: b,
Expand All @@ -54,3 +64,63 @@ func newBinlogItem(b *pb.Binlog, nodeID string) *binlogItem {
func (b *binlogItem) SetJob(job *model.Job) {
b.job = job
}

type binlogItemCache struct {
cachedChan chan *binlogItem
cachedSize int64
maxBinlogCacheSize int64
cond *sync.Cond
quiting bool
}

func newBinlogItemCache(maxBinlogItemCount int, maxBinlogCacheSize int64) (bc *binlogItemCache) {
return &binlogItemCache{
cachedChan: make(chan *binlogItem, maxBinlogItemCount),
maxBinlogCacheSize: maxBinlogCacheSize,
cond: sync.NewCond(new(sync.Mutex)),
}
}

func (bc *binlogItemCache) Push(b *binlogItem, shutdown chan struct{}) {
bc.cond.L.Lock()
sz := b.Size()
if sz >= bc.maxBinlogCacheSize {
for bc.cachedSize != 0 && !bc.quiting {
bc.cond.Wait()
}
} else {
for bc.cachedSize+sz > bc.maxBinlogCacheSize && !bc.quiting {
bc.cond.Wait()
}
}
bc.cond.L.Unlock()
select {
case <-shutdown:
case bc.cachedChan <- b:
bc.cond.L.Lock()
bc.cachedSize += sz
bc.cond.L.Unlock()
log.Debug("receive publish binlog item", zap.Stringer("item", b))
}
}

func (bc *binlogItemCache) Pop() chan *binlogItem {
return bc.cachedChan
}

func (bc *binlogItemCache) MinusSize(size int64) {
bc.cond.L.Lock()
// has popped new binlog item, minus cachedSize
bc.cachedSize -= size
bc.cond.Signal()
bc.cond.L.Unlock()
}

func (bc *binlogItemCache) Close() {
bc.quiting = true
bc.cond.Signal()
}

func (bc *binlogItemCache) Len() int {
return len(bc.cachedChan)
}
8 changes: 4 additions & 4 deletions drainer/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ var _ = Suite(&syncBinlogSuite{})

func (s *syncBinlogSuite) TestShouldAddToSyncer(c *C) {
syncer := Syncer{
input: make(chan *binlogItem, 1),
input: newBinlogItemCache(1, defaultBinlogCacheSize),
}
col := Collector{syncer: &syncer}

Expand All @@ -164,8 +164,8 @@ func (s *syncBinlogSuite) TestShouldAddToSyncer(c *C) {
err := col.syncBinlog(&item)

c.Assert(err, IsNil)
c.Assert(len(syncer.input), Equals, 1)
get := <-syncer.input
c.Assert(syncer.input.Len(), Equals, 1)
get := <-syncer.input.Pop()
c.Assert(get.binlog.CommitTs, Equals, item.binlog.CommitTs)
}

Expand Down Expand Up @@ -203,7 +203,7 @@ func (s *syncBinlogSuite) TestShouldSetJob(c *C) {
defer func() { fDDLJobGetter = origDDLGetter }()

syncer := Syncer{
input: make(chan *binlogItem, 1),
input: newBinlogItemCache(1, defaultBinlogCacheSize),
}
col := Collector{syncer: &syncer}

Expand Down
32 changes: 18 additions & 14 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
)

var (
defaultBinlogCacheSize int64 = 4 << 30 // 4GB

maxBinlogItemCount int
defaultBinlogItemCount = 512
supportedCompressors = [...]string{"gzip"}
Expand All @@ -59,20 +61,21 @@ var (

// SyncerConfig is the Syncer's configuration.
type SyncerConfig struct {
StrSQLMode *string `toml:"sql-mode" json:"sql-mode"`
SQLMode mysql.SQLMode `toml:"-" json:"-"`
IgnoreTxnCommitTS []int64 `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"`
IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"`
IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
To *dsync.DBConfig `toml:"to" json:"to"`
DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"`
DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"`
DestDBType string `toml:"db-type" json:"db-type"`
DisableDispatch bool `toml:"disable-dispatch" json:"disable-dispatch"`
SafeMode bool `toml:"safe-mode" json:"safe-mode"`
DisableCausality bool `toml:"disable-detect" json:"disable-detect"`
StrSQLMode *string `toml:"sql-mode" json:"sql-mode"`
SQLMode mysql.SQLMode `toml:"-" json:"-"`
IgnoreTxnCommitTS []int64 `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"`
IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"`
IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"`
MaxCacheBinlogSize int64 `toml:"cache-binlog-size" json:"cache-binlog-size"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
To *dsync.DBConfig `toml:"to" json:"to"`
DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"`
DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"`
DestDBType string `toml:"db-type" json:"db-type"`
DisableDispatch bool `toml:"disable-dispatch" json:"disable-dispatch"`
SafeMode bool `toml:"safe-mode" json:"safe-mode"`
DisableCausality bool `toml:"disable-detect" json:"disable-detect"`
}

// Config holds the configuration of drainer
Expand Down Expand Up @@ -133,6 +136,7 @@ func NewConfig() *Config {
fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.BoolVar(&cfg.SyncerCfg.DisableCausality, "disable-detect", false, "disable detect causality")
fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size")
fs.Int64Var(&cfg.SyncerCfg.MaxCacheBinlogSize, "cache-binlog-size", defaultBinlogCacheSize, "blurry memory usage of binlogs in cache, limit cached memory usage, unit is Byte")
fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced")
fs.StringVar(new(string), "log-rotate", "", "DEPRECATED")

Expand Down
18 changes: 9 additions & 9 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Syncer struct {

cfg *SyncerConfig

input chan *binlogItem
input *binlogItemCache

filter *filter.Filter

Expand All @@ -60,7 +60,7 @@ func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) (
syncer := new(Syncer)
syncer.cfg = cfg
syncer.cp = cp
syncer.input = make(chan *binlogItem, maxBinlogItemCount)
syncer.input = newBinlogItemCache(maxBinlogItemCount, cfg.MaxCacheBinlogSize)
syncer.lastSyncTime = time.Now()
syncer.shutdown = make(chan struct{})
syncer.closed = make(chan struct{})
Expand Down Expand Up @@ -282,6 +282,7 @@ func (s *Syncer) run() error {

var lastAddComitTS int64
dsyncError := s.dsyncer.Error()
inputChan := s.input.Pop()
ForLoop:
for {
// check if we can safely push a fake binlog
Expand All @@ -303,8 +304,9 @@ ForLoop:
case pushFakeBinlog <- fakeBinlog:
pushFakeBinlog = nil
continue
case b = <-s.input:
queueSizeGauge.WithLabelValues("syncer_input").Set(float64(len(s.input)))
case b = <-inputChan:
s.input.MinusSize(b.Size())
queueSizeGauge.WithLabelValues("syncer_input").Set(float64(s.input.Len()))
log.Debug("consume binlog item", zap.Stringer("item", b))
}

Expand Down Expand Up @@ -408,6 +410,8 @@ ForLoop:
}
}

s.input.Close()

close(fakeBinlogCh)
cerr := s.dsyncer.Close()
if cerr != nil {
Expand Down Expand Up @@ -467,11 +471,7 @@ func isIgnoreTxnCommitTS(ignoreTxnCommitTS []int64, ts int64) bool {

// Add adds binlogItem to the syncer's input channel
func (s *Syncer) Add(b *binlogItem) {
select {
case <-s.shutdown:
case s.input <- b:
log.Debug("receive publish binlog item", zap.Stringer("item", b))
}
s.input.Push(b, s.shutdown)
}

// Close closes syncer.
Expand Down
140 changes: 139 additions & 1 deletion drainer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func (s *syncerSuite) TestFilterTable(c *check.C) {

func (s *syncerSuite) TestNewSyncer(c *check.C) {
cfg := &SyncerConfig{
DestDBType: "_intercept",
DestDBType: "_intercept",
MaxCacheBinlogSize: defaultBinlogCacheSize,
}

cpFile := c.MkDir() + "/checkpoint"
Expand Down Expand Up @@ -173,6 +174,143 @@ func (s *syncerSuite) TestNewSyncer(c *check.C) {
c.Assert(syncer.GetLatestCommitTS(), check.Greater, lastNoneFakeTS)
}

func (s *syncerSuite) TestSyncerCachedSize(c *check.C) {
cfg := &SyncerConfig{
DestDBType: "_intercept",
MaxCacheBinlogSize: 30, // the max tested binlog size is 33
}

cpFile := c.MkDir() + "/checkpoint"
cp, err := checkpoint.NewPb(&checkpoint.Config{CheckPointFile: cpFile})
c.Assert(err, check.IsNil)

syncer, err := NewSyncer(cp, cfg, nil)
c.Assert(err, check.IsNil)

// run syncer
go func() {
err := syncer.Start()
c.Assert(err, check.IsNil, check.Commentf(errors.ErrorStack(err)))
}()

flag := false
endSig := make(chan struct{})

defer func() {
close(endSig)
c.Assert(flag, check.IsFalse)
syncer.Close()
}()
// check whether cached size will exceed maxBinlogCacheSize
go func() {
ticker := time.NewTicker(50 * time.Microsecond)
for {
select {
case <-endSig:
return
case <-ticker.C:
if syncer.input.cachedSize > 33 {
flag = true
return
}
}
}
}()

var commitTS, jobID int64
// create database test
commitTS++
jobID++
binlog := &pb.Binlog{
Tp: pb.BinlogType_Commit,
CommitTs: commitTS,
DdlQuery: []byte("create database test"),
DdlJobId: jobID,
}
job := &model.Job{
ID: jobID,
Type: model.ActionCreateSchema,
State: model.JobStateSynced,
Query: "create database test",
BinlogInfo: &model.HistoryInfo{
SchemaVersion: 1,
DBInfo: &model.DBInfo{
ID: 1,
Name: model.CIStr{O: "test", L: "test"},
},
},
}
syncer.Add(&binlogItem{
binlog: binlog,
job: job,
})

// create table test.test
commitTS++
jobID++
var testTableID int64 = 2
binlog = &pb.Binlog{
Tp: pb.BinlogType_Commit,
CommitTs: commitTS,
DdlQuery: []byte("create table test.test(id int)"),
DdlJobId: jobID,
}
job = &model.Job{
ID: jobID,
SchemaID: 1, // must be the previous create schema id of `test`
Type: model.ActionCreateTable,
State: model.JobStateSynced,
Query: "create table test.test(id int)",
BinlogInfo: &model.HistoryInfo{
SchemaVersion: 2,
TableInfo: &model.TableInfo{
ID: testTableID,
Name: model.CIStr{O: "test", L: "test"},
},
},
}
syncer.Add(&binlogItem{
binlog: binlog,
job: job,
})

finished := make(chan struct{})
go func() {
for commitTS++; commitTS <= 10; commitTS++ {
binlog := &pb.Binlog{
Tp: pb.BinlogType_Commit,
CommitTs: commitTS,
PrewriteValue: getEmptyPrewriteValue(commitTS+1, testTableID),
}
syncer.Add(&binlogItem{
binlog: binlog,
})
}
close(finished)
}()

select {
case <-finished:
case <-time.After(3 * time.Second):
c.Fatal("binlogItems haven't been added in 3s")
}

// Add fake binlog
time.Sleep(time.Second)
commitTS++
binlog = &pb.Binlog{
StartTs: commitTS,
CommitTs: commitTS,
}
syncer.Add(&binlogItem{
binlog: binlog,
})

// should get 10 binlog item
interceptSyncer := syncer.dsyncer.(*interceptSyncer)
c.Assert(interceptSyncer.items, check.HasLen, 10)
}

func (s *syncerSuite) TestIsIgnoreTxnCommitTS(c *check.C) {
c.Assert(isIgnoreTxnCommitTS(nil, 1), check.IsFalse)
c.Assert(isIgnoreTxnCommitTS([]int64{1, 3}, 1), check.IsTrue)
Expand Down