Skip to content
This repository has been archived by the owner on Mar 23, 2024. It is now read-only.

Implement Sensor Classes #60

Merged
merged 23 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3bcfdea
Sensor: Create Sensor parent class and GPS and WindSensor child classes
chrischang5 Nov 4, 2023
6bbdb72
Sensor: Refactor code
chrischang5 Nov 4, 2023
83b105a
Sensor: Implement WindSensor and GPS classes
chrischang5 Nov 18, 2023
ed70153
Sensor: Add tests and code cleanup
chrischang5 Nov 18, 2023
9e53e3e
Bump UBCSailbot/sailbot_workspace from 1.4.2 to 1.4.7 (#55)
dependabot[bot] Nov 9, 2023
c9b9971
Make ubcsailbotsoftware a code owner (#57)
patrick-5546 Nov 11, 2023
6f65c06
Bump UBCSailbot/sailbot_workspace from 1.4.7 to 1.4.8 (#56)
dependabot[bot] Nov 11, 2023
091154a
Make ubcsailbotsoftware code owner only for test.yml (#58)
patrick-5546 Nov 12, 2023
5d999f0
Bump UBCSailbot/sailbot_workspace from 1.4.8 to 1.5.1 (#59)
dependabot[bot] Nov 13, 2023
a57a9c2
Sensor: Ignore redef mypy error
chrischang5 Nov 19, 2023
ba9cd83
Sensor: Create custom type to have cleaner type hinting
chrischang5 Nov 19, 2023
31f458c
Sensors: Complete tests and resolve typing errors
chrischang5 Nov 25, 2023
3dae5ba
Merge branch 'main' into user/chrischang5/53-implement-sensor-class
chrischang5 Nov 25, 2023
65c42e2
Try to make CI happy
chrischang5 Nov 25, 2023
96c9405
Try to make CI happy
chrischang5 Nov 25, 2023
6a63fac
Fix bugs :(
chrischang5 Nov 25, 2023
f8f6a61
Sensor: make syntax consistent
chrischang5 Nov 25, 2023
b11f176
Fix bugs
chrischang5 Nov 26, 2023
893d4b0
Sensors: add docstrings, add support to implicitly or explicitly set …
chrischang5 Dec 7, 2023
a8688fb
Sensors: finish docstrings
chrischang5 Dec 7, 2023
7c92df1
Merge branch 'main' into user/chrischang5/53-implement-sensor-class
chrischang5 Dec 7, 2023
2f6cc27
fixed an oops
chrischang5 Dec 7, 2023
2609bcc
Sensor: Add todo for dimensionality checking
chrischang5 Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions boat_simulator/common/sensors.py
chrischang5 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from dataclasses import dataclass

from typing import Optional, Any
from numpy.typing import NDArray


from boat_simulator.common.types import Scalar, ScalarOrArray

from boat_simulator.common.generators import (
ConstantGenerator,
MVGaussianGenerator,
GaussianGenerator,
)

WindSensorGenerators = Optional[MVGaussianGenerator | ConstantGenerator]
chrischang5 marked this conversation as resolved.
Show resolved Hide resolved
GPSGenerators = Optional[GaussianGenerator | ConstantGenerator]


@dataclass
class Sensor:
"""Interface for sensors in the Boat Simulation."""

def update(self, **kwargs):
"""
Update attributes in Sensor using keyword arguments.

Usage: Sensor.update(attr1=val1, attr2=val2, ...)

Raises:
ValueError: If kwarg is not a defined attribute in Sensor
"""
for attr_name, attr_val in kwargs.items():
if attr_name in self.__annotations__:
setattr(self, attr_name, attr_val)
else:
raise ValueError(
f"{attr_name} not a property in {self.__class__.__name__} \
expected one of {self.__annotations__}"
)

def read(self, key: str) -> Any:
"""
Read the value from an attribute in Sensor.

Args:
key (str): Attribute name to read from

Raises:
ValueError: If key is not an a defined attribute in Sensor

Returns:
Any: Value stored in attribute with name supplied in "key" argument
"""
if key in self.__annotations__:
return getattr(self, key)
else:
raise ValueError(
f"{key} not a property in {self.__class__.__name__}. \
Available keys: {self.__annotations__}"
)


@dataclass
class WindSensor(Sensor):
"""
Abstraction for wind sensor.

# TODO: Add delay functions.

Properties:
wind (ScalarOrArray): Wind x, y components or single value
wind_noisemaker (Optional[MVGaussianGenerator | ConstantGenerator]):
Noise function to emulate sensor noise in wind data reading
"""

wind: ScalarOrArray
wind_noisemaker: WindSensorGenerators = None

@property # type: ignore
def wind(self) -> ScalarOrArray:
# TODO: Ensure attribute value and noisemakers are using the same value shape.
# - wind scalars should add with noise scalars.
# - wind vectors should add with noise vectors.
# Could consider using a __post_init__ function for this
return (
self._wind + self.wind_noisemaker.next() # type: ignore
if self.wind_noisemaker is not None
else self._wind
)

@wind.setter
def wind(self, wind: ScalarOrArray):
self._wind = wind


@dataclass
class GPS(Sensor):
"""
Abstraction for GPS.

# TODO: Add delay functions.

Properties:
lat_lon (NDArray): Boat latitude and longitude (2x1 array)
speed (Scalar): Boat speed
heading (Scalar): Boat heading
lat_lon_noisemaker (Optional[GaussianGenerator | ConstantGenerator]):
Noise function to emulate sensor noise in latitude and longitude readings
speed_noisemaker (Optional[GaussianGenerator | ConstantGenerator]):
Noise function to emulate sensor noise in speed readings
heading_noisemaker (Optional[GaussianGenerator | ConstantGenerator]):
Noise function to emulate sensor noise in heading readings
"""

lat_lon: NDArray
speed: Scalar
heading: Scalar

lat_lon_noisemaker: GPSGenerators = None
speed_noisemaker: GPSGenerators = None
heading_noisemaker: GPSGenerators = None

@property # type: ignore
def lat_lon(self) -> NDArray:
return (
self._lat_lon + self.lat_lon_noisemaker.next()
if self.lat_lon_noisemaker is not None
else self._lat_lon
)

@lat_lon.setter
def lat_lon(self, lat_lon: NDArray):
self._lat_lon = lat_lon

@property # type: ignore
def speed(self) -> Scalar:
return (
self._speed + self.speed_noisemaker.next() # type: ignore
if self.speed_noisemaker is not None
else self._speed
)

@speed.setter
def speed(self, speed: Scalar):
self._speed = speed

@property # type: ignore
def heading(self) -> Scalar:
return (
self._heading + self.heading_noisemaker.next() # type: ignore
if self.heading_noisemaker is not None
else self._heading
)

@heading.setter
def heading(self, heading: Scalar):
self._heading = heading
145 changes: 145 additions & 0 deletions tests/unit/common/test_gps_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from boat_simulator.common.sensors import GPS
import numpy as np
from boat_simulator.common.generators import (
ConstantGenerator,
GaussianGenerator,
)


class TestGPS:
def test_gps_init(self):
lat_lon = np.array([1, 0])
speed = 100
heading = 1.09
error_fn = None

gps = GPS(
lat_lon=lat_lon,
speed=speed,
heading=heading,
lat_lon_noisemaker=error_fn,
speed_noisemaker=error_fn,
heading_noisemaker=error_fn,
)

assert (gps.lat_lon == lat_lon).all()
assert gps.speed == speed
assert gps.heading == heading
assert gps.lat_lon_noisemaker is error_fn
assert gps.speed_noisemaker is error_fn
assert gps.heading_noisemaker is error_fn

def test_gps_init_implicit_error_fn(self):
lat_lon = np.array([1, 0])
speed = 100
heading = 1.09

gps = GPS(
lat_lon=lat_lon,
speed=speed,
heading=heading,
)

assert (gps.lat_lon == lat_lon).all()
assert gps.speed == speed
assert gps.heading == heading
for noisemaker in [
gps.lat_lon_noisemaker,
gps.speed_noisemaker,
gps.heading_noisemaker,
]:
assert noisemaker is None

def test_gps_read_no_error(self):
lat_lon = np.array([1, 0])
speed = np.random.randint(0, 100)
heading = np.random.rand()

gps = GPS(
lat_lon=lat_lon,
speed=speed,
heading=heading,
)

assert (gps.read("lat_lon") == lat_lon).all()
assert gps.read("speed") == speed
assert gps.read("heading") == heading

def test_gps_read_constant_error(self):
lat_lon = np.array([1, 0])
speed = np.random.randint(0, 100)
heading = np.random.rand()
constant = 3.01
error_fn = ConstantGenerator(constant=constant)

gps = GPS(
lat_lon=lat_lon,
speed=speed,
heading=heading,
lat_lon_noisemaker=error_fn,
speed_noisemaker=error_fn,
heading_noisemaker=error_fn,
)

assert (gps.read("lat_lon") == lat_lon + constant).all()
assert gps.read("speed") == speed + constant
assert gps.read("heading") == heading + constant

def test_gps_gaussian_error(self):
lat_lon = np.array([1, 0])
speed = np.random.randint(0, 100)
heading = np.random.rand()
mean = 0
stdev = 1

error_fn = GaussianGenerator(mean=mean, stdev=stdev)

gps = GPS(
lat_lon=lat_lon,
speed=speed,
heading=heading,
lat_lon_noisemaker=error_fn,
speed_noisemaker=error_fn,
heading_noisemaker=error_fn,
)

NUM_READINGS = 10000
speed_readings = np.zeros(NUM_READINGS)
heading_readings = np.zeros(NUM_READINGS)
lat_lon_readings = np.zeros(shape=(NUM_READINGS, 2))
for i in range(NUM_READINGS):
speed_readings[i] = gps.read("speed")
heading_readings[i] = gps.read("heading")
lat_lon_readings[i, :] = gps.read("lat_lon")

for reading, init_data in zip(
[speed_readings, heading_readings, lat_lon_readings],
[speed, heading, lat_lon],
):
sample_mean = np.mean(reading, axis=0)
assert np.isclose(sample_mean, mean + init_data, atol=0.1).all()

def test_wind_sensor_update(self):
lat_lon = np.array([0, 0])
speed = 0
heading = 0

gps = GPS(
lat_lon=lat_lon,
speed=speed,
heading=heading,
)

NUM_READINGS = 100
for i in range(NUM_READINGS):
speed_reading = gps.read("speed")
assert speed_reading == i
gps.update(speed=i + 1)

heading_reading = gps.read("heading")
assert heading_reading == i
gps.update(heading=i + 1)

lat_lon_reading = gps.read("lat_lon")
assert (lat_lon_reading == np.array([i, i])).all()
gps.update(lat_lon=(lat_lon_reading + 1))
77 changes: 77 additions & 0 deletions tests/unit/common/test_wind_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from boat_simulator.common.sensors import WindSensor
import numpy as np
from boat_simulator.common.generators import (
MVGaussianGenerator,
ConstantGenerator,
)


class TestWindSensor:
def test_wind_sensor_init(self):
init_data = np.array([1, 0])
error_fn = None
ws = WindSensor(
wind=init_data,
wind_noisemaker=error_fn,
)

assert ws.wind_noisemaker == error_fn
assert np.all(ws.wind == init_data)

def test_wind_sensor_init_implicit_error_fn(self):
init_data = np.array([1, 0])
ws = WindSensor(wind=init_data)

assert ws.wind_noisemaker is None
assert np.all(ws.wind == init_data)

def test_wind_sensor_read_no_error(self):
init_data = np.array([1, 0])
ws = WindSensor(
wind=init_data,
)
read_data = ws.read("wind")
assert (init_data == read_data).all()

def test_wind_sensor_read_constant_error(self):
init_data = np.array([1, 0])
const_err = 0.1
error_fn = ConstantGenerator(constant=0.1)
ws = WindSensor(
wind=init_data,
wind_noisemaker=error_fn,
)

read_data = ws.read("wind")
assert ((init_data + const_err) == read_data).all()

def test_wind_sensor_read_mv_gaussian_error(self):
init_data = np.array([1, 0])
mean = np.array([1, 1])
cov = np.eye(2)
error_fn = MVGaussianGenerator(mean=mean, cov=cov)
ws = WindSensor(
wind=init_data,
wind_noisemaker=error_fn,
)

NUM_READINGS = 10000
reading = np.zeros(shape=(NUM_READINGS, mean.size))
for i in range(NUM_READINGS):
reading[i, :] = ws.read("wind")

sample_mean = np.mean(reading, axis=0)
sample_cov = np.cov(reading, rowvar=False)

assert np.allclose(sample_cov, cov, atol=0.2)
assert np.isclose(sample_mean, mean + init_data, 0.1).all()

def test_wind_sensor_update(self):
init_data = np.zeros(2)
ws = WindSensor(wind=init_data)

NUM_READINGS = 100
for i in range(NUM_READINGS):
wind = ws.read("wind")
assert (wind == np.array([i, i])).all()
ws.update(wind=(wind + 1))