diff --git a/pump/server.go b/pump/server.go index e80577f01..91540daa3 100644 --- a/pump/server.go +++ b/pump/server.go @@ -216,7 +216,7 @@ func (s *Server) writeBinlog(ctx context.Context, in *binlog.WriteBinlogReq, isF goto errHandle } - if !isFakeBinlog { + if !isFakeBinlog && blog.Tp == binlog.BinlogType_Prewrite { state := s.node.NodeStatus().State if state != node.Online { err = errors.Errorf("no online: %v", state) @@ -788,6 +788,7 @@ func (s *Server) waitSafeToOffline(ctx context.Context) error { log.Debug("Start waiting until all drainers have consumed the last fake binlog") + maxCommitTS := s.storage.MaxCommitTS() for { select { case <-time.After(time.Second): @@ -796,12 +797,12 @@ func (s *Server) waitSafeToOffline(ctx context.Context) error { log.Error("Failed to get safe GCTS", zap.Error(err)) break } - if safeTSO >= fakeBinlog.CommitTs { + if safeTSO >= maxCommitTS { return nil } log.Warn("Waiting for drainer to consume binlog", zap.Int64("Minimum Drainer MaxCommitTS", safeTSO), - zap.Int64("FakeBinlog CommiTS", fakeBinlog.CommitTs)) + zap.Int64("Need to reach maxCommitTS", maxCommitTS)) if _, err = s.writeFakeBinlog(); err != nil { log.Error("write fake binlog failed", zap.Error(err)) } @@ -883,14 +884,18 @@ func (s *Server) waitUntilCommitTSSaved(ctx context.Context, ts int64, checkInte log.Info("The max commit ts saved is less than expected commit ts", zap.Int64("max commit ts", maxCommitTS), zap.Int64("expected commit ts", ts)) - select { - case <-time.After(checkInterval): - continue - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - } + } else if !s.storage.AllMatched() { + log.Info("wait all P-binlog to be matched") + } else { + return nil + } + + select { + case <-time.After(checkInterval): + continue + case <-ctx.Done(): + return errors.Trace(ctx.Err()) } - return nil } } diff --git a/pump/server_test.go b/pump/server_test.go index d4153df56..6a2a77fd0 100644 --- a/pump/server_test.go +++ b/pump/server_test.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/tidb-binlog/pkg/etcd" "github.com/pingcap/tidb-binlog/pkg/node" "github.com/pingcap/tidb-binlog/pkg/util" - "github.com/pingcap/tidb-binlog/pump/storage" "github.com/pingcap/tidb/store/tikv/oracle" binlog "github.com/pingcap/tipb/go-binlog" pb "github.com/pingcap/tipb/go-binlog" @@ -122,6 +121,7 @@ func (s *pullBinlogsSuite) TestReturnErrIfClusterIDMismatched(c *C) { type noOpStorage struct{} +func (s *noOpStorage) AllMatched() bool { return true } func (s *noOpStorage) WriteBinlog(binlog *pb.Binlog) error { return nil } func (s *noOpStorage) GetGCTS() int64 { return 0 } func (s *noOpStorage) GC(ts int64) {} @@ -272,7 +272,7 @@ type printServerInfoSuite struct{} var _ = Suite(&printServerInfoSuite{}) type dummyStorage struct { - storage.Storage + noOpStorage gcTS int64 maxCommitTS int64 } diff --git a/pump/storage/sorter.go b/pump/storage/sorter.go index 2e4ec19e4..f830e4447 100644 --- a/pump/storage/sorter.go +++ b/pump/storage/sorter.go @@ -124,6 +124,13 @@ func (s *sorter) setResolver(resolver func(startTS int64) bool) { s.resolver = resolver } +func (s *sorter) allMatched() bool { + s.lock.Lock() + defer s.lock.Unlock() + + return len(s.waitStartTS) == 0 +} + func (s *sorter) pushTSItem(item sortItem) { if s.isClosed() { // i think we can just panic @@ -149,6 +156,18 @@ func (s *sorter) pushTSItem(item sortItem) { func (s *sorter) run() { defer s.wg.Done() + go func() { + // Avoid if no any more pushTSItem call so block at s.cond.Wait() in run() waiting the matching c-binlog + tick := time.NewTicker(1 * time.Second) + defer tick.Stop() + for range tick.C { + s.cond.Signal() + if s.isClosed() { + return + } + } + }() + var maxTSItem sortItem for { s.cond.L.Lock() @@ -176,6 +195,7 @@ func (s *sorter) run() { // we may get the C binlog soon at start up time if time.Since(getTime) > time.Second { if s.resolver != nil && s.resolver(item.start) { + delete(s.waitStartTS, item.start) break } } diff --git a/pump/storage/sorter_test.go b/pump/storage/sorter_test.go index 2c2f02aed..cfc05b4d5 100644 --- a/pump/storage/sorter_test.go +++ b/pump/storage/sorter_test.go @@ -41,6 +41,8 @@ func testSorter(c *check.C, items []sortItem, expectMaxCommitTS []int64) { // we should never push item with commit ts less than lastGetSortItemTS, or something go wrong if item.tp == pb.BinlogType_Commit { c.Assert(item.commit, check.Greater, atomic.LoadInt64(&lastGetSortItemTS)) + } else if item.tp == pb.BinlogType_Prewrite { + c.Assert(sorter.allMatched(), check.IsFalse) } if item.commit > maxTS { @@ -61,6 +63,7 @@ func testSorter(c *check.C, items []sortItem, expectMaxCommitTS []int64) { c.Assert(maxCommitTS[i], check.Greater, maxCommitTS[i-1]) } c.Assert(maxTS, check.Equals, maxCommitTS[len(maxCommitTS)-1]) + c.Assert(sorter.allMatched(), check.IsTrue) } func (s *SorterSuite) TestSorter(c *check.C) { diff --git a/pump/storage/storage.go b/pump/storage/storage.go index 8ad005158..9360b89ed 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -73,6 +73,9 @@ type Storage interface { GetGCTS() int64 + // AllMatched return if all the P-binlog have the matching C-binlog + AllMatched() bool + MaxCommitTS() int64 // GetBinlog return the binlog of ts @@ -1359,3 +1362,8 @@ func (a *Append) writeBatchToKV(bufReqs []*request) error { continue } } + +// AllMatched implement Storage.AllMatched +func (a *Append) AllMatched() bool { + return a.sorter.allMatched() +}