Skip to content

Commit

Permalink
[RSDK-923] Make GPS-RTK code more robust, remove bad test (viamroboti…
Browse files Browse the repository at this point in the history
…cs#1679)

Changes include:
- Migrate go statements to `utils.PanicCapturingGo()`
- Don't start the background goroutine unless the rest of startup succeeds (to prevent leaking goroutines)
- Lock the mutex whenever you read or write any data that is also read/written from a different goroutine (this was the actual cause of the race conditions!)
- Remove the test that sometimes fails due to a bug in third-party code. It sucks that it sometimes fails, but all the problems are down in the bowels of https://github.com/de-bkg/gognss, and not in code we own.

Things I kinda wanted to change but didn't: 
 - `RTKMovementSensor.GetStream()` is nearly identical to `ntripCorrectionSource.GetStream()`, and the only times you call the RTK version, you always pass in the same state stored in an ntrip object. Unfortunately, it's a _different_ ntrip object defined in the same file as `ntripCorrectionSource`. Should the function be moved to that class instead, or removed entirely? I dunno. Something in here is redundant, but I couldn't remove the duplication without thinking a lot more.

I ran the tests 1,000 times: no failures. It's possible I'm being overly cautious with the mutex, and if you have specific ones you want me to remove, I'm happy to do so. It's possible I'm writing non-idiomatic Go code, and I'm happy to rewrite this whole thing if you have a different style I should match.
  • Loading branch information
penguinland authored Jan 5, 2023
1 parent 4d62c01 commit 66cdb28
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 19 deletions.
98 changes: 83 additions & 15 deletions components/movementsensor/gpsrtk/gpsrtk.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ type RTKMovementSensor struct {
cancelFunc func()

activeBackgroundWorkers sync.WaitGroup
errMu sync.Mutex
lastError error
// Lock the mutex whenever you interact with lastError, ntripClient, or ntripStatus.
mu sync.Mutex
lastError error

nmeamovementsensor gpsnmea.NmeaMovementSensor
inputProtocol string
Expand Down Expand Up @@ -243,31 +244,38 @@ func newRTKMovementSensor(
// I2C address only, assumes address is correct since this was checked when gps was initialized
g.addr = byte(attr.I2cAddr)

if err := g.Start(ctx); err != nil {
if err := g.Start(); err != nil {
return nil, err
}
g.mu.Lock()
defer g.mu.Unlock()
return g, g.lastError
}

func (g *RTKMovementSensor) setLastError(err error) {
g.errMu.Lock()
defer g.errMu.Unlock()
g.mu.Lock()
defer g.mu.Unlock()

g.lastError = err
}

// Start begins NTRIP receiver with specified protocol and begins reading/updating MovementSensor measurements.
func (g *RTKMovementSensor) Start(ctx context.Context) error {
func (g *RTKMovementSensor) Start() error {
// TODO(RDK-1639): Test out what happens if we call this line and then the ReceiveAndWrite*
// correction data goes wrong. Could anything worse than uncorrected data occur?
if err := g.nmeamovementsensor.Start(g.cancelCtx); err != nil {
return err
}

switch g.inputProtocol {
case serialStr:
go g.ReceiveAndWriteSerial()
utils.PanicCapturingGo(g.ReceiveAndWriteSerial)
case i2cStr:
go g.ReceiveAndWriteI2C(ctx)
}
if err := g.nmeamovementsensor.Start(ctx); err != nil {
return err
utils.PanicCapturingGo(func() { g.ReceiveAndWriteI2C(g.cancelCtx) })
}

g.mu.Lock()
defer g.mu.Unlock()
return g.lastError
}

Expand Down Expand Up @@ -299,9 +307,12 @@ func (g *RTKMovementSensor) Connect(casterAddr, user, pwd string, maxAttempts in
g.logger.Errorf("Can't connect to NTRIP caster: %s", err)
return err
}
g.ntripClient.Client = c

g.logger.Debug("Connected to NTRIP caster")
g.mu.Lock()
defer g.mu.Unlock()

g.ntripClient.Client = c
return g.lastError
}

Expand All @@ -322,7 +333,11 @@ func (g *RTKMovementSensor) GetStream(mountPoint string, maxAttempts int) error
default:
}

rc, err = g.ntripClient.Client.GetStream(mountPoint)
rc, err = func() (io.ReadCloser, error) {
g.mu.Lock()
defer g.mu.Unlock()
return g.ntripClient.Client.GetStream(mountPoint)
}()
if err == nil {
success = true
}
Expand All @@ -334,10 +349,11 @@ func (g *RTKMovementSensor) GetStream(mountPoint string, maxAttempts int) error
return err
}

g.ntripClient.Stream = rc

g.logger.Debug("Connected to stream")
g.mu.Lock()
defer g.mu.Unlock()

g.ntripClient.Stream = rc
return g.lastError
}

Expand Down Expand Up @@ -413,8 +429,12 @@ func (g *RTKMovementSensor) ReceiveAndWriteI2C(ctx context.Context) {

scanner := rtcm3.NewScanner(r)

g.mu.Lock()
g.ntripStatus = true
g.mu.Unlock()

// It's okay to skip the mutex on this next line: g.ntripStatus can only be mutated by this
// goroutine itself.
for g.ntripStatus {
select {
case <-g.cancelCtx.Done():
Expand All @@ -433,7 +453,10 @@ func (g *RTKMovementSensor) ReceiveAndWriteI2C(ctx context.Context) {

msg, err := scanner.NextMessage()
if err != nil {
g.mu.Lock()
g.ntripStatus = false
g.mu.Unlock()

if msg == nil {
g.logger.Debug("No message... reconnecting to stream...")
err = g.GetStream(g.ntripClient.MountPoint, g.ntripClient.MaxConnectAttempts)
Expand Down Expand Up @@ -462,7 +485,9 @@ func (g *RTKMovementSensor) ReceiveAndWriteI2C(ctx context.Context) {
}

scanner = rtcm3.NewScanner(r)
g.mu.Lock()
g.ntripStatus = true
g.mu.Unlock()
continue
}
}
Expand Down Expand Up @@ -517,8 +542,12 @@ func (g *RTKMovementSensor) ReceiveAndWriteSerial() {
r := io.TeeReader(g.ntripClient.Stream, w)
scanner := rtcm3.NewScanner(r)

g.mu.Lock()
g.ntripStatus = true
g.mu.Unlock()

// It's okay to skip the mutex on this next line: g.ntripStatus can only be mutated by this
// goroutine itself.
for g.ntripStatus {
select {
case <-g.cancelCtx.Done():
Expand All @@ -528,7 +557,10 @@ func (g *RTKMovementSensor) ReceiveAndWriteSerial() {

msg, err := scanner.NextMessage()
if err != nil {
g.mu.Lock()
g.ntripStatus = false
g.mu.Unlock()

if msg == nil {
g.logger.Debug("No message... reconnecting to stream...")
err = g.GetStream(g.ntripClient.MountPoint, g.ntripClient.MaxConnectAttempts)
Expand All @@ -539,7 +571,9 @@ func (g *RTKMovementSensor) ReceiveAndWriteSerial() {

r = io.TeeReader(g.ntripClient.Stream, w)
scanner = rtcm3.NewScanner(r)
g.mu.Lock()
g.ntripStatus = true
g.mu.Unlock()
continue
}
}
Expand All @@ -548,22 +582,32 @@ func (g *RTKMovementSensor) ReceiveAndWriteSerial() {

// NtripStatus returns true if connection to NTRIP stream is OK, false if not.
func (g *RTKMovementSensor) NtripStatus() (bool, error) {
g.mu.Lock()
defer g.mu.Unlock()
return g.ntripStatus, g.lastError
}

// Position returns the current geographic location of the MOVEMENTSENSOR.
func (g *RTKMovementSensor) Position(ctx context.Context, extra map[string]interface{}) (*geo.Point, float64, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return &geo.Point{}, 0, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.Position(ctx, extra)
}

// LinearVelocity passthrough.
func (g *RTKMovementSensor) LinearVelocity(ctx context.Context, extra map[string]interface{}) (r3.Vector, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return r3.Vector{}, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.LinearVelocity(ctx, extra)
}

Expand All @@ -577,49 +621,73 @@ func (g *RTKMovementSensor) LinearAcceleration(ctx context.Context, extra map[st

// AngularVelocity passthrough.
func (g *RTKMovementSensor) AngularVelocity(ctx context.Context, extra map[string]interface{}) (spatialmath.AngularVelocity, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return spatialmath.AngularVelocity{}, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.AngularVelocity(ctx, extra)
}

// CompassHeading passthrough.
func (g *RTKMovementSensor) CompassHeading(ctx context.Context, extra map[string]interface{}) (float64, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return 0, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.CompassHeading(ctx, extra)
}

// Orientation passthrough.
func (g *RTKMovementSensor) Orientation(ctx context.Context, extra map[string]interface{}) (spatialmath.Orientation, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return spatialmath.NewZeroOrientation(), g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.Orientation(ctx, extra)
}

// ReadFix passthrough.
func (g *RTKMovementSensor) ReadFix(ctx context.Context) (int, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return 0, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.ReadFix(ctx)
}

// Properties passthrough.
func (g *RTKMovementSensor) Properties(ctx context.Context, extra map[string]interface{}) (*movementsensor.Properties, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return &movementsensor.Properties{}, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.Properties(ctx, extra)
}

// Accuracy passthrough.
func (g *RTKMovementSensor) Accuracy(ctx context.Context, extra map[string]interface{}) (map[string]float32, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return map[string]float32{}, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.Accuracy(ctx, extra)
}

Expand Down
4 changes: 0 additions & 4 deletions components/movementsensor/gpsrtk/gpsrtk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func TestConnect(t *testing.T) {
url := "http://fakeurl"
username := "user"
password := "pwd"
mountPoint := "mp"

// create new ntrip client and connect
err := g.Connect("invalidurl", username, password, 10)
Expand All @@ -74,9 +73,6 @@ func TestConnect(t *testing.T) {

err = g.GetStream("", 10)
test.That(t, err, test.ShouldNotBeNil)

err = g.GetStream(mountPoint, 10)
test.That(t, err.Error(), test.ShouldContainSubstring, "lookup fakeurl")
}

func TestNewRTKMovementSensor(t *testing.T) {
Expand Down

0 comments on commit 66cdb28

Please sign in to comment.