Skip to content

Commit

Permalink
Allow for multiple SHM reference clocks and SHM providers in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
marcfrei committed Mar 14, 2024
1 parent 6a2312d commit 928b0da
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 108 deletions.
22 changes: 15 additions & 7 deletions driver/mbg/refclk.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ const (
ioctlDirShift = ioctlSizeShift + ioctlSizeBits
)

type ReferenceClock struct {
dev string
}

func ioctlRequest(d, s, t, n int) uint {
// See https://man7.org/linux/man-pages/man2/ioctl.2.html#NOTES

Expand All @@ -56,18 +60,22 @@ func nanoseconds(frac uint32) int64 {
return int64((uint64(frac) * uint64(time.Second)) / (1 << 32))
}

func MeasureClockOffset(ctx context.Context, log *zap.Logger, dev string) (time.Duration, error) {
fd, err := unix.Open(dev, unix.O_RDWR, 0)
func NewReferenceClock(dev string) *ReferenceClock {
return &ReferenceClock{dev: dev}
}

func (c *ReferenceClock) MeasureClockOffset(ctx context.Context, log *zap.Logger) (time.Duration, error) {
fd, err := unix.Open(c.dev, unix.O_RDWR, 0)
if err != nil {
log.Error("unix.Open failed", zap.String("dev", dev), zap.Error(err))
log.Error("unix.Open failed", zap.String("dev", c.dev), zap.Error(err))
return 0, err
}
defer func(log *zap.Logger, dev string) {
err = unix.Close(fd)
if err != nil {
log.Info("unix.Close failed", zap.String("dev", dev), zap.Error(err))
}
}(log, dev)
}(log, c.dev)

featureType := uint32(2 /* PCPS */)
featureNumber := uint32(6 /* HAS_HR_TIME */)
Expand All @@ -80,7 +88,7 @@ func MeasureClockOffset(ctx context.Context, log *zap.Logger, dev string) (time.
uintptr(ioctlRequest(ioctlWrite, len(featureData), 'M', 0xa4)),
uintptr(unsafe.Pointer(&featureData[0])))
if errno != 0 {
log.Error("ioctl failed (features) or HR time not supported", zap.String("dev", dev), zap.Error(errno))
log.Error("ioctl failed (features) or HR time not supported", zap.String("dev", c.dev), zap.Error(errno))
return 0, errno
}

Expand All @@ -89,7 +97,7 @@ func MeasureClockOffset(ctx context.Context, log *zap.Logger, dev string) (time.
uintptr(ioctlRequest(ioctlRead, len(cycleFrequencyData), 'M', 0x68)),
uintptr(unsafe.Pointer(&cycleFrequencyData[0])))
if errno != 0 {
log.Error("ioctl failed (cycle frequency)", zap.String("dev", dev), zap.Error(errno))
log.Error("ioctl failed (cycle frequency)", zap.String("dev", c.dev), zap.Error(errno))
return 0, errno
}

Expand All @@ -101,7 +109,7 @@ func MeasureClockOffset(ctx context.Context, log *zap.Logger, dev string) (time.
uintptr(ioctlRequest(ioctlRead, len(timeData), 'M', 0x80)),
uintptr(unsafe.Pointer(&timeData[0])))
if errno != 0 {
log.Error("ioctl failed (time)", zap.String("dev", dev), zap.Error(errno))
log.Error("ioctl failed (time)", zap.String("dev", c.dev), zap.Error(errno))
return 0, errno
}

Expand Down
66 changes: 31 additions & 35 deletions driver/shm/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,28 @@ package shm
import (
"unsafe"

"go.uber.org/zap"

"golang.org/x/sys/unix"
)

var (
shmInitialized bool
type segment struct {
initialized bool

shmTimeMode *int32
shmTimeCount *int32
shmTimeClockTimeStampSec *int64
shmTimeClockTimeStampUSec *int32
shmTimeReceiveTimeStampSec *int64
shmTimeReceiveTimeStampUSec *int32
shmTimeLeap *int32
shmTimePrecision *int32
shmTimeNSamples *int32
shmTimeValid *int32
shmTimeClockTimeStampNSec *uint32
shmTimeReceiveTimeStampNSec *uint32
)
timeMode *int32
timeCount *int32
timeClockTimeStampSec *int64
timeClockTimeStampUSec *int32
timeReceiveTimeStampSec *int64
timeReceiveTimeStampUSec *int32
timeLeap *int32
timePrecision *int32
timeNSamples *int32
timeValid *int32
timeClockTimeStampNSec *uint32
timeReceiveTimeStampNSec *uint32
}

func initSHM(log *zap.Logger, unit int) error {
if shmInitialized {
func initSegment(shm *segment, unit int) error {
if shm.initialized {
panic("SHM already initialized")
}

Expand All @@ -38,14 +36,12 @@ func initSHM(log *zap.Logger, unit int) error {
id, _, errno := unix.Syscall(unix.SYS_SHMGET, uintptr(key), uintptr(size), uintptr(flags))
if int(id) < 0 {
if int(id) != -1 {
log.Fatal("shmget returned invalid value", zap.Uintptr("id", id))
panic("shmget returned invalid value")
}
log.Error("shmget failed", zap.Error(errno))
return errno
}
addr, _, errno := unix.Syscall(unix.SYS_SHMAT, id, uintptr(0), uintptr(0))
if int(addr) == -1 {
log.Error("shmat failed", zap.Error(errno))
return errno
}

Expand All @@ -55,32 +51,32 @@ func initSHM(log *zap.Logger, unit int) error {
// will help here: https://github.com/golang/go/issues/58625
// Another possible workourund would be to use expressions of the form
// (unsafe.Add(*(*unsafe.Pointer)(unsafe.Pointer(&addr)), ...))
shmTimeMode = (*int32)(unsafe.Pointer(addr +
shm.timeMode = (*int32)(unsafe.Pointer(addr +
0 /* offsetof(struct shmTime, mode) */))
shmTimeCount = (*int32)(unsafe.Pointer(addr +
shm.timeCount = (*int32)(unsafe.Pointer(addr +
4 /* offsetof(struct shmTime, count) */))
shmTimeClockTimeStampSec = (*int64)(unsafe.Pointer(addr +
shm.timeClockTimeStampSec = (*int64)(unsafe.Pointer(addr +
8 /* offsetof(struct shmTime, clockTimeStampSec) */))
shmTimeClockTimeStampUSec = (*int32)(unsafe.Pointer(addr +
shm.timeClockTimeStampUSec = (*int32)(unsafe.Pointer(addr +
16 /* offsetof(struct shmTime, clockTimeStampUSec) */))
shmTimeReceiveTimeStampSec = (*int64)(unsafe.Pointer(addr +
shm.timeReceiveTimeStampSec = (*int64)(unsafe.Pointer(addr +
24 /* offsetof(struct shmTime, receiveTimeStampSec) */))
shmTimeReceiveTimeStampUSec = (*int32)(unsafe.Pointer(addr +
shm.timeReceiveTimeStampUSec = (*int32)(unsafe.Pointer(addr +
32 /* offsetof(struct shmTime, receiveTimeStampUSec) */))
shmTimeLeap = (*int32)(unsafe.Pointer(addr +
shm.timeLeap = (*int32)(unsafe.Pointer(addr +
36 /* offsetof(struct shmTime, leap) */))
shmTimePrecision = (*int32)(unsafe.Pointer(addr +
shm.timePrecision = (*int32)(unsafe.Pointer(addr +
40 /* offsetof(struct shmTime, precision) */))
shmTimeNSamples = (*int32)(unsafe.Pointer(addr +
shm.timeNSamples = (*int32)(unsafe.Pointer(addr +
44 /* offsetof(struct shmTime, nsamples) */))
shmTimeValid = (*int32)(unsafe.Pointer(addr +
shm.timeValid = (*int32)(unsafe.Pointer(addr +
48 /* offsetof(struct shmTime, valid) */))
shmTimeClockTimeStampNSec = (*uint32)(unsafe.Pointer(addr +
shm.timeClockTimeStampNSec = (*uint32)(unsafe.Pointer(addr +
52 /* offsetof(struct shmTime, clockTimeStampNSec) */))
shmTimeReceiveTimeStampNSec = (*uint32)(unsafe.Pointer(addr +
shm.timeReceiveTimeStampNSec = (*uint32)(unsafe.Pointer(addr +
56 /* offsetof(struct shmTime, receiveTimeStampNSec) */))

shmInitialized = true
shm.initialized = true

return nil
}
41 changes: 25 additions & 16 deletions driver/shm/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,36 @@ import (
"go.uber.org/zap"
)

func StoreClockSamples(log *zap.Logger, refTime, sysTime time.Time) error {
if !shmInitialized {
err := initSHM(log, 0 /* unit */)
type Provider struct {
unit int
shm segment
}

func NewProvider(unit int) *Provider {
return &Provider{unit: unit}
}

func (p *Provider) StoreClockSamples(log *zap.Logger, refTime, sysTime time.Time) error {
if !p.shm.initialized {
err := initSegment(&p.shm, p.unit)
if err != nil {
return err
}
}

*shmTimeMode = 0
*shmTimeClockTimeStampSec = refTime.Unix()
*shmTimeClockTimeStampUSec = int32(refTime.Nanosecond() / 1e3)
*shmTimeReceiveTimeStampSec = sysTime.Unix()
*shmTimeReceiveTimeStampUSec = int32(sysTime.Nanosecond() / 1e3)
*shmTimeLeap = 0
*shmTimePrecision = 0
*shmTimeNSamples = 0
*shmTimeClockTimeStampNSec = uint32(refTime.Nanosecond())
*shmTimeReceiveTimeStampNSec = uint32(sysTime.Nanosecond())

*shmTimeCount++
*shmTimeValid = 1
*p.shm.timeMode = 0
*p.shm.timeClockTimeStampSec = refTime.Unix()
*p.shm.timeClockTimeStampUSec = int32(refTime.Nanosecond() / 1e3)
*p.shm.timeReceiveTimeStampSec = sysTime.Unix()
*p.shm.timeReceiveTimeStampUSec = int32(sysTime.Nanosecond() / 1e3)
*p.shm.timeLeap = 0
*p.shm.timePrecision = 0
*p.shm.timeNSamples = 0
*p.shm.timeClockTimeStampNSec = uint32(refTime.Nanosecond())
*p.shm.timeReceiveTimeStampNSec = uint32(sysTime.Nanosecond())

*p.shm.timeCount++
*p.shm.timeValid = 1

return nil
}
61 changes: 35 additions & 26 deletions driver/shm/refclk.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,24 @@ import (

const ReferenceClockType = "ntpshm"

type ReferenceClock struct {
unit int
shm segment
}

var errNoSample = errors.New("SHM sample temporarily unavailable")

func MeasureClockOffset(ctx context.Context, log *zap.Logger, unit int) (time.Duration, error) {
func NewReferenceClock(unit int) *ReferenceClock {
return &ReferenceClock{unit: unit}
}

func (c *ReferenceClock) MeasureClockOffset(ctx context.Context, log *zap.Logger) (time.Duration, error) {
deadline, deadlineIsSet := ctx.Deadline()
const maxNumRetries = 8
numRetries := 0
for {
if !shmInitialized {
err := initSHM(log, unit)
if !c.shm.initialized {
err := initSegment(&c.shm, c.unit)
if err != nil {
if numRetries != maxNumRetries && deadlineIsSet && time.Now().Before(deadline) {
time.Sleep(0)
Expand All @@ -29,23 +38,23 @@ func MeasureClockOffset(ctx context.Context, log *zap.Logger, unit int) (time.Du
}
}

tTimeMode := *shmTimeMode
tTimeCount := *shmTimeCount
tTimeValid := *shmTimeValid
tTimeClockTimeStampSec := *shmTimeClockTimeStampSec
tTimeClockTimeStampUSec := *shmTimeClockTimeStampUSec
tTimeReceiveTimeStampSec := *shmTimeReceiveTimeStampSec
tTimeReceiveTimeStampUSec := *shmTimeReceiveTimeStampUSec
tTimeLeap := *shmTimeLeap
tTimeClockTimeStampNSec := *shmTimeClockTimeStampNSec
tTimeReceiveTimeStampNSec := *shmTimeReceiveTimeStampNSec
timeMode := *c.shm.timeMode
timeCount := *c.shm.timeCount
timeValid := *c.shm.timeValid
timeClockTimeStampSec := *c.shm.timeClockTimeStampSec
timeClockTimeStampUSec := *c.shm.timeClockTimeStampUSec
timeReceiveTimeStampSec := *c.shm.timeReceiveTimeStampSec
timeReceiveTimeStampUSec := *c.shm.timeReceiveTimeStampUSec
timeLeap := *c.shm.timeLeap
timeClockTimeStampNSec := *c.shm.timeClockTimeStampNSec
timeReceiveTimeStampNSec := *c.shm.timeReceiveTimeStampNSec

// SHM client logic based on analogous code in chrony

if (tTimeMode == 1 && tTimeCount != *shmTimeCount) ||
!(tTimeMode == 0 || tTimeMode == 1) || tTimeValid == 0 {
if (timeMode == 1 && timeCount != *c.shm.timeCount) ||
!(timeMode == 0 || timeMode == 1) || timeValid == 0 {
log.Error("SHM sample temporarily unavailable",
zap.Int32("mode", tTimeMode), zap.Int32("count", tTimeCount), zap.Int32("valid", tTimeValid))
zap.Int32("mode", timeMode), zap.Int32("count", timeCount), zap.Int32("valid", timeValid))
if numRetries != maxNumRetries && deadlineIsSet && time.Now().Before(deadline) {
time.Sleep(0)
numRetries++
Expand All @@ -54,19 +63,19 @@ func MeasureClockOffset(ctx context.Context, log *zap.Logger, unit int) (time.Du
return 0, errNoSample
}

*shmTimeValid = 0
*c.shm.timeValid = 0

receiveTimeSeconds := tTimeReceiveTimeStampSec
clockTimeSeconds := tTimeClockTimeStampSec
receiveTimeSeconds := timeReceiveTimeStampSec
clockTimeSeconds := timeClockTimeStampSec

var receiveTimeNanoseconds, clockTimeNanoseconds int64
if tTimeClockTimeStampNSec/1000 == uint32(tTimeClockTimeStampUSec) &&
tTimeReceiveTimeStampNSec/1000 == uint32(tTimeReceiveTimeStampUSec) {
receiveTimeNanoseconds = int64(tTimeReceiveTimeStampNSec)
clockTimeNanoseconds = int64(tTimeClockTimeStampNSec)
if timeClockTimeStampNSec/1000 == uint32(timeClockTimeStampUSec) &&
timeReceiveTimeStampNSec/1000 == uint32(timeReceiveTimeStampUSec) {
receiveTimeNanoseconds = int64(timeReceiveTimeStampNSec)
clockTimeNanoseconds = int64(timeClockTimeStampNSec)
} else {
receiveTimeNanoseconds = 1000 * int64(tTimeReceiveTimeStampUSec)
clockTimeNanoseconds = 1000 * int64(tTimeClockTimeStampUSec)
receiveTimeNanoseconds = 1000 * int64(timeReceiveTimeStampUSec)
clockTimeNanoseconds = 1000 * int64(timeClockTimeStampUSec)
}

receiveTime := time.Unix(receiveTimeSeconds, receiveTimeNanoseconds).UTC()
Expand All @@ -75,7 +84,7 @@ func MeasureClockOffset(ctx context.Context, log *zap.Logger, unit int) (time.Du
log.Debug("SHM clock sample",
zap.Time("receiveTime", receiveTime),
zap.Time("clockTime", clockTime),
zap.Int32("leap", tTimeLeap),
zap.Int32("leap", timeLeap),
)

offset := clockTime.Sub(receiveTime)
Expand Down
26 changes: 2 additions & 24 deletions timeservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,6 @@ type svcConfig struct {
DSCP uint8 `toml:"dscp,omitempty"` // must be in range [0, 63]
}

type mbgReferenceClock struct {
dev string
}

type shmReferenceClock struct {
unit int
}

type ntpReferenceClockIP struct {
ntpc *client.IPClient
localAddr *net.UDPAddr
Expand Down Expand Up @@ -163,16 +155,6 @@ func (c *tlsCertCache) loadCert(chi *tls.ClientHelloInfo) (*tls.Certificate, err
return c.cert, nil
}

func (c *mbgReferenceClock) MeasureClockOffset(ctx context.Context, log *zap.Logger) (
time.Duration, error) {
return mbg.MeasureClockOffset(ctx, log, c.dev)
}

func (c *shmReferenceClock) MeasureClockOffset(ctx context.Context, log *zap.Logger) (
time.Duration, error) {
return shm.MeasureClockOffset(ctx, log, c.unit)
}

func configureIPClientNTS(c *client.IPClient, ntskeServer string, ntskeInsecureSkipVerify bool) {
ntskeHost, ntskePort, err := net.SplitHostPort(ntskeServer)
if err != nil {
Expand Down Expand Up @@ -324,9 +306,7 @@ func createClocks(cfg svcConfig, localAddr *snet.UDPAddr) (
dscp := dscp(cfg)

for _, s := range cfg.MBGReferenceClocks {
refClocks = append(refClocks, &mbgReferenceClock{
dev: s,
})
refClocks = append(refClocks, mbg.NewReferenceClock(s))
}

for _, s := range cfg.SHMReferenceClocks {
Expand All @@ -344,9 +324,7 @@ func createClocks(cfg svcConfig, localAddr *snet.UDPAddr) (
zap.String("id", s), zap.Error(err))
}
}
refClocks = append(refClocks, &shmReferenceClock{
unit: u,
})
refClocks = append(refClocks, shm.NewReferenceClock(u))
}

var dstIAs []addr.IA
Expand Down

0 comments on commit 928b0da

Please sign in to comment.