Skip to content

Commit

Permalink
fix: update the way NTP sync uses adjtimex syscall
Browse files Browse the repository at this point in the history
Fixes #3582

Time adjustment code was rewritten taking a peek at other time sync
implementations. Looks like `adjtimex` was used incorrectly before which
leads to huge time oscillations and `STA_UNSYNC` being set by the
kernel. Instead of setting time via `settimeofday`, use `adjtimex` as
well to set the time on big jump.

With this change, oscillation is pretty stable around zero, in
microsecond range (polling interval lowered for testing):

```
172.20.0.2: 2021/05/06 18:51:28  time.SyncController: adjusting time (slew) by -11.375µs via 192.36.143.130, state TIME_OK, status STA_PLL | STA_NANO
172.20.0.2: 2021/05/06 18:51:37  time.SyncController: adjusting time (slew) by 426.276µs via 192.36.143.130, state TIME_OK, status STA_PLL | STA_NANO
172.20.0.2: 2021/05/06 18:51:50  time.SyncController: adjusting time (slew) by -622.037µs via 192.36.143.130, state TIME_OK, status STA_PLL | STA_NANO
172.20.0.2: 2021/05/06 18:51:58  time.SyncController: adjusting time (slew) by 59.822µs via 192.36.143.130, state TIME_OK, status STA_PLL | STA_NANO
172.20.0.2: 2021/05/06 18:52:11  time.SyncController: adjusting time (slew) by 126.855µs via 192.36.143.130, state TIME_OK, status STA_NANO | STA_PLL
172.20.0.2: 2021/05/06 18:52:20  time.SyncController: adjusting time (slew) by 17.334µs via 192.36.143.130, state TIME_OK, status STA_NANO | STA_PLL
172.20.0.2: 2021/05/06 18:52:28  time.SyncController: adjusting time (slew) by -108.787µs via 192.36.143.130, state TIME_OK, status STA_NANO | STA_PLL
172.20.0.2: 2021/05/06 18:52:34  time.SyncController: adjusting time (slew) by -71.687µs via 192.36.143.130, state TIME_OK, status STA_PLL | STA_NANO
172.20.0.2: 2021/05/06 18:52:40  time.SyncController: adjusting time (slew) by 114.759µs via 192.36.143.130, state TIME_OK, status STA_PLL | STA_NANO
172.20.0.2: 2021/05/06 18:52:47  time.SyncController: adjusting time (slew) by 46.716µs via 192.36.143.130, state TIME_OK, status STA_PLL | STA_NANO
```

Also one should pick a time server close to the node to get lower RTT
and dispersion.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira authored and talos-bot committed May 7, 2021
1 parent 1a85c14 commit 4d50a4e
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 67 deletions.
124 changes: 73 additions & 51 deletions internal/pkg/ntp/ntp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"fmt"
"log"
"math/bits"
"math/rand"
"net"
"reflect"
Expand Down Expand Up @@ -42,7 +43,6 @@ type Syncer struct {
// these functions are overridden in tests for mocking support
CurrentTime CurrentTimeFunc
NTPQuery QueryFunc
SetTime SetTimeFunc
AdjustTime AdjustTimeFunc
}

Expand All @@ -62,7 +62,6 @@ func NewSyncer(logger *log.Logger, timeServers []string) *Syncer {

CurrentTime: time.Now,
NTPQuery: ntp.Query,
SetTime: syscall.Settimeofday,
AdjustTime: timex.Adjtimex,
}

Expand Down Expand Up @@ -144,8 +143,18 @@ func (syncer *Syncer) Run(ctx context.Context) {
return
}

// Set some variance with how frequently we poll ntp servers.
// This is based on rand(MaxPoll) + MinPoll so we wait at least
// MinPoll.
nextPollInterval := time.Duration(rand.Intn(int(syncer.MaxPoll.Seconds())))*time.Second + syncer.MinPoll

if resp == nil {
// if no response was ever received, consider doing short sleep to retry sooner as it's not Kiss-o-Death response
nextPollInterval = syncer.MinPoll / 2
}

if resp != nil && resp.Validate() == nil {
err = syncer.adjustTime(resp.ClockOffset, lastSyncServer)
err = syncer.adjustTime(resp.ClockOffset, resp.Leap, lastSyncServer, nextPollInterval)

if err == nil {
if !syncer.timeSyncNotified {
Expand All @@ -159,22 +168,12 @@ func (syncer *Syncer) Run(ctx context.Context) {
}
}

// Set some variance with how frequently we poll ntp servers.
// This is based on rand(MaxPoll) + MinPoll so we wait at least
// MinPoll.
sleepInterval := time.Duration(rand.Intn(int(syncer.MaxPoll.Seconds())))*time.Second + syncer.MinPoll

if resp == nil {
// if no response was ever received, consider doing short sleep to retry sooner as it's not Kiss-o-Death response
sleepInterval = syncer.MinPoll / 2
}

select {
case <-ctx.Done():
return
case <-syncer.restartSyncCh:
// time servers got changed, restart the loop immediately
case <-time.After(sleepInterval):
case <-time.After(nextPollInterval):
}
}
}
Expand Down Expand Up @@ -268,52 +267,55 @@ func (syncer *Syncer) queryServer(server string) (*ntp.Response, error) {
return resp, err
}

func (syncer *Syncer) setTime(adjustedTime time.Time, server string) error {
syncer.logger.Printf("setting time to %s via %s", adjustedTime, server)
// adjustTime adds an offset to the current time.
//
//nolint:gocyclo
func (syncer *Syncer) adjustTime(offset time.Duration, leapSecond ntp.LeapIndicator, server string, nextPollInterval time.Duration) error {
var (
buf bytes.Buffer
req syscall.Timex
jump bool
)

timeval := syscall.NsecToTimeval(adjustedTime.UnixNano())
if offset < -AdjustTimeLimit || offset > AdjustTimeLimit {
jump = true

if err := syncer.SetTime(&timeval); err != nil {
return err
}
fmt.Fprintf(&buf, "adjusting time (jump) by %s via %s", offset, server)

if RTCClock != nil {
if err := RTCClock.Set(adjustedTime); err != nil {
syncer.logger.Printf("error syncing RTC: %s", err)
} else {
syncer.logger.Printf("synchronized RTC with system clock")
req = syscall.Timex{
Modes: timex.ADJ_SETOFFSET | timex.ADJ_NANO | timex.ADJ_STATUS,
Time: syscall.Timeval{
Sec: int64(offset / time.Second),
Usec: int64(offset / time.Nanosecond % time.Second),
},
}
}

return nil
}

// adjustTime adds an offset to the current time.
func (syncer *Syncer) adjustTime(offset time.Duration, server string) error {
if offset < -AdjustTimeLimit || offset > AdjustTimeLimit {
if err := syncer.setTime(syncer.CurrentTime().Add(offset), server); err != nil {
return err
// kernel wants tv_usec to be positive
if req.Time.Usec < 0 {
req.Time.Sec--
req.Time.Usec += int64(time.Second / time.Nanosecond)
}

if offset < -EpochLimit || offset > EpochLimit {
// notify about epoch change
select {
case syncer.epochChangeCh <- struct{}{}:
default:
}
} else {
fmt.Fprintf(&buf, "adjusting time (slew) by %s via %s", offset, server)

pollSeconds := uint64(nextPollInterval / time.Second)
log2iPollSeconds := 64 - bits.LeadingZeros64(pollSeconds)

req = syscall.Timex{
Modes: timex.ADJ_OFFSET | timex.ADJ_NANO | timex.ADJ_STATUS | timex.ADJ_TIMECONST | timex.ADJ_MAXERROR | timex.ADJ_ESTERROR,
Offset: int64(offset / time.Nanosecond),
Status: timex.STA_PLL,
Maxerror: 0,
Esterror: 0,
Constant: int64(log2iPollSeconds) - 4,
}

return nil
}

var buf bytes.Buffer

fmt.Fprintf(&buf, "adjusting time by %s via %s", offset, server)

req := syscall.Timex{
Modes: timex.ADJ_OFFSET | timex.ADJ_NANO | timex.ADJ_STATUS,
Offset: int64(offset / time.Nanosecond),
Status: timex.STA_PLL,
switch leapSecond { //nolint:exhaustive
case ntp.LeapAddSecond:
req.Status |= timex.STA_INS
case ntp.LeapDelSecond:
req.Status |= timex.STA_DEL
}

state, err := syncer.AdjustTime(&req)
Expand All @@ -326,5 +328,25 @@ func (syncer *Syncer) adjustTime(offset time.Duration, server string) error {

syncer.logger.Println(buf.String())

if err == nil {
if offset < -EpochLimit || offset > EpochLimit {
// notify about epoch change
select {
case syncer.epochChangeCh <- struct{}{}:
default:
}
}

if jump {
if RTCClock != nil {
if rtcErr := RTCClock.Set(time.Now().Add(offset)); rtcErr != nil {
syncer.logger.Printf("error syncing RTC: %s", rtcErr)
} else {
syncer.logger.Printf("synchronized RTC with system clock")
}
}
}
}

return err
}
23 changes: 7 additions & 16 deletions internal/pkg/ntp/ntp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ func (suite *NTPSuite) SetupTest() {
suite.failingServer = 0
}

func (suite *NTPSuite) setSystemClock(timeval *syscall.Timeval) error {
suite.clockLock.Lock()
defer suite.clockLock.Unlock()

suite.systemClock = time.Unix(timeval.Sec, timeval.Usec)

return nil
}

func (suite *NTPSuite) getSystemClock() time.Time {
suite.clockLock.Lock()
defer suite.clockLock.Unlock()
Expand All @@ -67,7 +58,13 @@ func (suite *NTPSuite) adjustSystemClock(val *syscall.Timex) (status timex.State
suite.clockLock.Lock()
defer suite.clockLock.Unlock()

suite.clockAdjustments = append(suite.clockAdjustments, time.Duration(val.Offset)*time.Nanosecond)
if val.Modes&timex.ADJ_OFFSET == timex.ADJ_OFFSET {
suite.T().Logf("adjustment by %s", time.Duration(val.Offset)*time.Nanosecond)
suite.clockAdjustments = append(suite.clockAdjustments, time.Duration(val.Offset)*time.Nanosecond)
} else {
suite.T().Logf("set clock by %s", time.Duration(val.Time.Sec)*time.Second+time.Duration(val.Time.Usec)*time.Nanosecond)
suite.systemClock = suite.systemClock.Add(time.Duration(val.Time.Sec)*time.Second + time.Duration(val.Time.Usec)*time.Nanosecond)
}

return
}
Expand Down Expand Up @@ -140,7 +137,6 @@ func (suite *NTPSuite) fakeQuery(host string) (resp *beevikntp.Response, err err
func (suite *NTPSuite) TestSync() {
syncer := ntp.NewSyncer(log.New(log.Writer(), "ntp ", log.LstdFlags), []string{constants.DefaultNTPServer})

syncer.SetTime = suite.setSystemClock
syncer.AdjustTime = suite.adjustSystemClock
syncer.CurrentTime = suite.getSystemClock

Expand Down Expand Up @@ -171,7 +167,6 @@ func (suite *NTPSuite) TestSync() {
func (suite *NTPSuite) TestSyncContinuous() {
syncer := ntp.NewSyncer(log.New(log.Writer(), "ntp ", log.LstdFlags), []string{"127.0.0.3"})

syncer.SetTime = suite.setSystemClock
syncer.AdjustTime = suite.adjustSystemClock
syncer.CurrentTime = suite.getSystemClock
syncer.NTPQuery = suite.fakeQuery
Expand Down Expand Up @@ -218,7 +213,6 @@ func (suite *NTPSuite) TestSyncContinuous() {
func (suite *NTPSuite) TestSyncChangeTimeservers() {
syncer := ntp.NewSyncer(log.New(log.Writer(), "ntp ", log.LstdFlags), []string{"127.0.0.1"})

syncer.SetTime = suite.setSystemClock
syncer.AdjustTime = suite.adjustSystemClock
syncer.CurrentTime = suite.getSystemClock

Expand Down Expand Up @@ -257,7 +251,6 @@ func (suite *NTPSuite) TestSyncChangeTimeservers() {
func (suite *NTPSuite) TestSyncIterateTimeservers() {
syncer := ntp.NewSyncer(log.New(log.Writer(), "ntp ", log.LstdFlags), []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"})

syncer.SetTime = suite.setSystemClock
syncer.AdjustTime = suite.adjustSystemClock
syncer.CurrentTime = suite.getSystemClock
syncer.NTPQuery = suite.fakeQuery
Expand Down Expand Up @@ -309,7 +302,6 @@ func (suite *NTPSuite) TestSyncIterateTimeservers() {
func (suite *NTPSuite) TestSyncEpochChange() {
syncer := ntp.NewSyncer(log.New(log.Writer(), "ntp ", log.LstdFlags), []string{"127.0.0.5"})

syncer.SetTime = suite.setSystemClock
syncer.AdjustTime = suite.adjustSystemClock
syncer.CurrentTime = suite.getSystemClock
syncer.NTPQuery = suite.fakeQuery
Expand Down Expand Up @@ -349,7 +341,6 @@ func (suite *NTPSuite) TestSyncEpochChange() {
func (suite *NTPSuite) TestSyncSwitchTimeservers() {
syncer := ntp.NewSyncer(log.New(log.Writer(), "ntp ", log.LstdFlags), []string{"127.0.0.6", "127.0.0.4"})

syncer.SetTime = suite.setSystemClock
syncer.AdjustTime = suite.adjustSystemClock
syncer.CurrentTime = suite.getSystemClock
syncer.NTPQuery = suite.fakeQuery
Expand Down

0 comments on commit 4d50a4e

Please sign in to comment.