Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: fix in-memory tps count (#286) #294

Merged
merged 1 commit into from
Sep 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ func (s *Syncer) getTable(schema string, table string) (*table, []string, error)
func (s *Syncer) addCount(isFinished bool, queueBucket string, tp opType, n int64) {
m := addedJobsTotal
if isFinished {
s.count.Add(n)
m = finishedJobsTotal
}

Expand All @@ -708,8 +709,6 @@ func (s *Syncer) addCount(isFinished bool, queueBucket string, tp opType, n int6
default:
s.tctx.L().Warn("unknown job operation type", zap.Stringer("type", tp))
}

s.count.Add(n)
}

func (s *Syncer) checkWait(job *job) bool {
Expand Down
17 changes: 14 additions & 3 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,8 @@ func (s *testSyncerSuite) TestRun(c *C) {
}

executeSQLAndWait(len(expectJobs1))
c.Assert(syncer.Status().(*pb.SyncStatus).TotalEvents, Equals, int64(0))
syncer.mockFinishJob(expectJobs1)

testJobs.Lock()
checkJobs(c, testJobs.jobs, expectJobs1)
Expand Down Expand Up @@ -1534,14 +1536,14 @@ func (s *testSyncerSuite) TestRun(c *C) {
}

executeSQLAndWait(len(expectJobs2))
c.Assert(syncer.Status().(*pb.SyncStatus).TotalEvents, Equals, int64(len(expectJobs1)))
syncer.mockFinishJob(expectJobs2)
c.Assert(syncer.Status().(*pb.SyncStatus).TotalEvents, Equals, int64(len(expectJobs1)+len(expectJobs2)))

testJobs.RLock()
checkJobs(c, testJobs.jobs, expectJobs2)
testJobs.RUnlock()

status := syncer.Status().(*pb.SyncStatus)
c.Assert(status.TotalEvents, Equals, int64(len(expectJobs1)+len(expectJobs2)))

cancel()
syncer.Close()
c.Assert(syncer.isClosed(), IsTrue)
Expand Down Expand Up @@ -1595,6 +1597,15 @@ var testJobs struct {
jobs []*job
}

func (s *Syncer) mockFinishJob(jobs []*expectJob) {
for _, job := range jobs {
switch job.tp {
case ddl, insert, update, del, flush:
s.addCount(true, "test", job.tp, 1)
}
}
}

func (s *Syncer) addJobToMemory(job *job) error {
log.L().Info("add job to memory", zap.Stringer("job", job))

Expand Down