Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-923] Make GPS-RTK code more robust, remove bad test #1679

Merged
merged 11 commits into from
Jan 5, 2023
98 changes: 83 additions & 15 deletions components/movementsensor/gpsrtk/gpsrtk.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ type RTKMovementSensor struct {
cancelFunc func()

activeBackgroundWorkers sync.WaitGroup
errMu sync.Mutex
lastError error
// Lock the mutex whenever you interact with lastError, ntripClient, or ntripStatus.
mu sync.Mutex
lastError error

nmeamovementsensor gpsnmea.NmeaMovementSensor
inputProtocol string
Expand Down Expand Up @@ -242,31 +243,38 @@ func newRTKMovementSensor(
// I2C address only, assumes address is correct since this was checked when gps was initialized
g.addr = byte(attr.I2cAddr)

if err := g.Start(ctx); err != nil {
if err := g.Start(); err != nil {
return nil, err
}
g.mu.Lock()
defer g.mu.Unlock()
return g, g.lastError
}

func (g *RTKMovementSensor) setLastError(err error) {
g.errMu.Lock()
defer g.errMu.Unlock()
g.mu.Lock()
defer g.mu.Unlock()

g.lastError = err
}

// Start begins NTRIP receiver with specified protocol and begins reading/updating MovementSensor measurements.
func (g *RTKMovementSensor) Start(ctx context.Context) error {
func (g *RTKMovementSensor) Start() error {
// TODO(RDK-1639): Test out what happens if we call this line and then the ReceiveAndWrite*
// correction data goes wrong. Could anything worse than uncorrected data occur?
if err := g.nmeamovementsensor.Start(g.cancelCtx); err != nil {
return err
}

switch g.inputProtocol {
case serialStr:
go g.ReceiveAndWriteSerial()
utils.PanicCapturingGo(g.ReceiveAndWriteSerial)
case i2cStr:
go g.ReceiveAndWriteI2C(ctx)
}
if err := g.nmeamovementsensor.Start(ctx); err != nil {
return err
utils.PanicCapturingGo(func() { g.ReceiveAndWriteI2C(g.cancelCtx) })
}

g.mu.Lock()
defer g.mu.Unlock()
return g.lastError
}

Expand Down Expand Up @@ -298,9 +306,12 @@ func (g *RTKMovementSensor) Connect(casterAddr, user, pwd string, maxAttempts in
g.logger.Errorf("Can't connect to NTRIP caster: %s", err)
return err
}
g.ntripClient.Client = c

g.logger.Debug("Connected to NTRIP caster")
g.mu.Lock()
defer g.mu.Unlock()

g.ntripClient.Client = c
penguinland marked this conversation as resolved.
Show resolved Hide resolved
return g.lastError
}

Expand All @@ -321,7 +332,11 @@ func (g *RTKMovementSensor) GetStream(mountPoint string, maxAttempts int) error
default:
}

rc, err = g.ntripClient.Client.GetStream(mountPoint)
rc, err = func() (io.ReadCloser, error) {
g.mu.Lock()
defer g.mu.Unlock()
return g.ntripClient.Client.GetStream(mountPoint)
}()
if err == nil {
success = true
}
Expand All @@ -333,10 +348,11 @@ func (g *RTKMovementSensor) GetStream(mountPoint string, maxAttempts int) error
return err
}

g.ntripClient.Stream = rc

g.logger.Debug("Connected to stream")
g.mu.Lock()
defer g.mu.Unlock()

g.ntripClient.Stream = rc
return g.lastError
}

Expand Down Expand Up @@ -412,8 +428,12 @@ func (g *RTKMovementSensor) ReceiveAndWriteI2C(ctx context.Context) {

scanner := rtcm3.NewScanner(r)

g.mu.Lock()
g.ntripStatus = true
g.mu.Unlock()

// It's okay to skip the mutex on this next line: g.ntripStatus can only be mutated by this
// goroutine itself.
for g.ntripStatus {
select {
case <-g.cancelCtx.Done():
Expand All @@ -432,7 +452,10 @@ func (g *RTKMovementSensor) ReceiveAndWriteI2C(ctx context.Context) {

msg, err := scanner.NextMessage()
if err != nil {
g.mu.Lock()
g.ntripStatus = false
penguinland marked this conversation as resolved.
Show resolved Hide resolved
g.mu.Unlock()

if msg == nil {
g.logger.Debug("No message... reconnecting to stream...")
err = g.GetStream(g.ntripClient.MountPoint, g.ntripClient.MaxConnectAttempts)
Expand Down Expand Up @@ -461,7 +484,9 @@ func (g *RTKMovementSensor) ReceiveAndWriteI2C(ctx context.Context) {
}

scanner = rtcm3.NewScanner(r)
g.mu.Lock()
g.ntripStatus = true
g.mu.Unlock()
continue
}
}
Expand Down Expand Up @@ -516,8 +541,12 @@ func (g *RTKMovementSensor) ReceiveAndWriteSerial() {
r := io.TeeReader(g.ntripClient.Stream, w)
scanner := rtcm3.NewScanner(r)

g.mu.Lock()
g.ntripStatus = true
g.mu.Unlock()

// It's okay to skip the mutex on this next line: g.ntripStatus can only be mutated by this
// goroutine itself.
for g.ntripStatus {
select {
case <-g.cancelCtx.Done():
Expand All @@ -527,7 +556,10 @@ func (g *RTKMovementSensor) ReceiveAndWriteSerial() {

msg, err := scanner.NextMessage()
if err != nil {
g.mu.Lock()
g.ntripStatus = false
g.mu.Unlock()

if msg == nil {
g.logger.Debug("No message... reconnecting to stream...")
err = g.GetStream(g.ntripClient.MountPoint, g.ntripClient.MaxConnectAttempts)
Expand All @@ -538,7 +570,9 @@ func (g *RTKMovementSensor) ReceiveAndWriteSerial() {

r = io.TeeReader(g.ntripClient.Stream, w)
scanner = rtcm3.NewScanner(r)
g.mu.Lock()
g.ntripStatus = true
g.mu.Unlock()
continue
}
}
Expand All @@ -547,70 +581,104 @@ func (g *RTKMovementSensor) ReceiveAndWriteSerial() {

// NtripStatus returns true if connection to NTRIP stream is OK, false if not.
func (g *RTKMovementSensor) NtripStatus() (bool, error) {
g.mu.Lock()
defer g.mu.Unlock()
return g.ntripStatus, g.lastError
}

// Position returns the current geographic location of the MOVEMENTSENSOR.
func (g *RTKMovementSensor) Position(ctx context.Context, extra map[string]interface{}) (*geo.Point, float64, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return &geo.Point{}, 0, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.Position(ctx, extra)
}

// LinearVelocity passthrough.
func (g *RTKMovementSensor) LinearVelocity(ctx context.Context, extra map[string]interface{}) (r3.Vector, error) {
g.mu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe if we have mu defined as a RWMutex instead, we can just use a read lock here (and in other places where we're just checking the lastError property). We might otherwise have some latency issues

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gautham and I discussed: if the lock was held for a long time by multiple reading coroutines, we both agree that changing it from a Mutex to a RWMutex would be a great idea, and we also both agree that given how quickly the lock is released and how few coroutines will be using it, this isn't obviously an improvement. Neither of us feel strongly either way, so I'm going to leave it as-is until it becomes important.

if g.lastError != nil {
defer g.mu.Unlock()
return r3.Vector{}, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.LinearVelocity(ctx, extra)
}

// AngularVelocity passthrough.
func (g *RTKMovementSensor) AngularVelocity(ctx context.Context, extra map[string]interface{}) (spatialmath.AngularVelocity, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return spatialmath.AngularVelocity{}, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.AngularVelocity(ctx, extra)
}

// CompassHeading passthrough.
func (g *RTKMovementSensor) CompassHeading(ctx context.Context, extra map[string]interface{}) (float64, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return 0, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.CompassHeading(ctx, extra)
}

// Orientation passthrough.
func (g *RTKMovementSensor) Orientation(ctx context.Context, extra map[string]interface{}) (spatialmath.Orientation, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return spatialmath.NewZeroOrientation(), g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.Orientation(ctx, extra)
}

// ReadFix passthrough.
func (g *RTKMovementSensor) ReadFix(ctx context.Context) (int, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return 0, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.ReadFix(ctx)
}

// Properties passthrough.
func (g *RTKMovementSensor) Properties(ctx context.Context, extra map[string]interface{}) (*movementsensor.Properties, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return &movementsensor.Properties{}, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.Properties(ctx, extra)
}

// Accuracy passthrough.
func (g *RTKMovementSensor) Accuracy(ctx context.Context, extra map[string]interface{}) (map[string]float32, error) {
g.mu.Lock()
if g.lastError != nil {
defer g.mu.Unlock()
return map[string]float32{}, g.lastError
}
g.mu.Unlock()

return g.nmeamovementsensor.Accuracy(ctx, extra)
}

Expand Down
4 changes: 0 additions & 4 deletions components/movementsensor/gpsrtk/gpsrtk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func TestConnect(t *testing.T) {
url := "http://fakeurl"
username := "user"
password := "pwd"
mountPoint := "mp"

// create new ntrip client and connect
err := g.Connect("invalidurl", username, password, 10)
Expand All @@ -73,9 +72,6 @@ func TestConnect(t *testing.T) {

err = g.GetStream("", 10)
test.That(t, err, test.ShouldNotBeNil)

err = g.GetStream(mountPoint, 10)
test.That(t, err.Error(), test.ShouldContainSubstring, "lookup fakeurl")
}

func TestNewRTKMovementSensor(t *testing.T) {
Expand Down