diff --git a/pkg/pdutil/clock.go b/pkg/pdutil/clock.go index d9610fd3745..c114b47e97c 100644 --- a/pkg/pdutil/clock.go +++ b/pkg/pdutil/clock.go @@ -30,7 +30,7 @@ const pdTimeUpdateInterval = 200 * time.Millisecond // Clock is a time source of PD cluster. type Clock interface { - // CurrentTime returns current time from PD. + // CurrentTime returns approximate current time from pd. CurrentTime() (time.Time, error) Run(ctx context.Context) Stop() @@ -41,32 +41,40 @@ type clock struct { pdClient pd.Client mu struct { sync.RWMutex - timeCache time.Time - err error + // The time encoded in PD ts. + tsEventTime time.Time + // The time we receives PD ts. + tsProcessingTime time.Time + err error } - cancel context.CancelFunc - stopCh chan struct{} + updateInterval time.Duration + cancel context.CancelFunc + stopCh chan struct{} } // NewClock return a new clock func NewClock(ctx context.Context, pdClient pd.Client) (*clock, error) { ret := &clock{ - pdClient: pdClient, - stopCh: make(chan struct{}, 1), + pdClient: pdClient, + stopCh: make(chan struct{}, 1), + updateInterval: pdTimeUpdateInterval, } physical, _, err := pdClient.GetTS(ctx) if err != nil { return nil, errors.Trace(err) } - ret.mu.timeCache = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0)) + ret.mu.tsEventTime = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0)) + ret.mu.tsProcessingTime = time.Now() return ret, nil } -// Run will get time from pd periodically to cache in timeCache +// Run gets time from pd periodically. func (c *clock) Run(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) + c.mu.Lock() c.cancel = cancel - ticker := time.NewTicker(pdTimeUpdateInterval) + c.mu.Unlock() + ticker := time.NewTicker(c.updateInterval) defer func() { c.stopCh <- struct{}{} }() for { select { @@ -81,7 +89,8 @@ func (c *clock) Run(ctx context.Context) { return err } c.mu.Lock() - c.mu.timeCache = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0)) + c.mu.tsEventTime = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0)) + c.mu.tsProcessingTime = time.Now() c.mu.err = nil c.mu.Unlock() return nil @@ -89,7 +98,9 @@ func (c *clock) Run(ctx context.Context) { if err != nil { log.Warn("get time from pd failed, will use local time as pd time") c.mu.Lock() - c.mu.timeCache = time.Now() + now := time.Now() + c.mu.tsEventTime = now + c.mu.tsProcessingTime = now c.mu.err = err c.mu.Unlock() } @@ -97,17 +108,19 @@ func (c *clock) Run(ctx context.Context) { } } -// CurrentTime returns current time from timeCache +// CurrentTime returns approximate current time from pd. func (c *clock) CurrentTime() (time.Time, error) { c.mu.RLock() - err := c.mu.err - cacheTime := c.mu.timeCache - c.mu.RUnlock() - return cacheTime, errors.Trace(err) + defer c.mu.RUnlock() + tsEventTime := c.mu.tsEventTime + current := tsEventTime.Add(time.Since(c.mu.tsProcessingTime)) + return current, errors.Trace(c.mu.err) } // Stop clock. func (c *clock) Stop() { + c.mu.Lock() + defer c.mu.Unlock() c.cancel() <-c.stopCh } diff --git a/pkg/pdutil/clock_test.go b/pkg/pdutil/clock_test.go index 01be7eb2fb6..e96f0477e1b 100644 --- a/pkg/pdutil/clock_test.go +++ b/pkg/pdutil/clock_test.go @@ -53,3 +53,25 @@ func TestTimeFromPD(t *testing.T) { // should return new time require.NotEqual(t, t1, t2) } + +func TestEventTimeAndProcessingTime(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockPDClient := &MockPDClient{} + clock, err := NewClock(ctx, mockPDClient) + require.NoError(t, err) + + // Disable update in test by setting a very long update interval. + clock.updateInterval = time.Hour + go clock.Run(ctx) + defer clock.Stop() + + sleep := time.Second + time.Sleep(sleep) + t1, err := clock.CurrentTime() + now := time.Now() + require.Nil(t, err) + require.Less(t, now.Sub(t1), sleep/2) +}