Skip to content

Commit

Permalink
Updates to enable multi-channel redis
Browse files Browse the repository at this point in the history
Updated CONFIG for use by redis-experiment-publisher
  shows a simple way to see data move through redis to OnAIR
Added custom error to DataSource for key config errors
Fixed columnization of redis_adapter
Redis adapter now checks multiple channels
  detects when time is missing from received message
  helps detect unused keys
  helps detect missing expected keys
  warns if listener exits
Updated and added necessary unit tests for coverage and code verification
  • Loading branch information
asgibson committed Mar 22, 2024
1 parent f6f73ac commit c4b947b
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 123 deletions.
98 changes: 25 additions & 73 deletions onair/data/telemetry_configs/redis_example_CONFIG.json
Original file line number Diff line number Diff line change
@@ -1,89 +1,41 @@
{
"subsystems": {
"NONE": {
"Time": {
"conversion": "",
"tests": {
"NOOP": "[]"
},
"description": "No description"
"STATES": {
"time": {
"description": "Time of latest receipt of values"
},
"THRUST": {
"conversion": "",
"tests": {
"FEASIBILITY": "[-999999999999999, 999999999999999]"
},
"description": "No description"
"state_0.x": {
"description": "Vehicle 0's current state of x"
},
"ALTITUDE": {
"conversion": "",
"tests": {
"FEASIBILITY": "[-999999999999999, 999999999999999]"
},
"description": "No description"
"state_0.y": {
"description": "Vehicle 0's current state of y"
},
"ACCELERATION": {
"conversion": "",
"tests": {
"FEASIBILITY": "[-20.1111, 20.1111]"
},
"description": "No description"
"state_1.x": {
"description": "Vehicle 1's current state of x"
},
"SCIENCE_COLLECTION": {
"conversion": "",
"tests": {
"FEASIBILITY": "[-0.9999, 1.1111]"
},
"description": "No description"
"state_1.y": {
"description": "Vehicle 1's current state of y"
},
"LABEL_ERROR_STATE": {
"conversion": "",
"tests": {
"NOOP": "[]"
},
"description": "No description"
}
},
"POWER": {
"VOLTAGE": {
"conversion": "",
"tests": {
"FEASIBILITY": "[12.9999, 18.1111]"
},
"description": "No description"
"state_2.x": {
"description": "Vehicle 2's current state of x"
},
"CURRENT": {
"conversion": "",
"tests": {
"FEASIBILITY": "[2.9999, 5.1111]"
},
"description": "No description"
}
},
"THERMAL": {
"TEMPERATURE": {
"conversion": "",
"tests": {
"FEASIBILITY": "[9.9999, 90.1111]"
},
"description": "No description"
"state_2.y": {
"description": "Vehicle 2's current state of y"
}
}
},
"redis_subscriptions": [
"vehicle_0_state",
"vehicle_1_state",
"vehicle_2_state"
"state_0",
"state_1",
"state_2"
],
"order": [
"Time",
"VOLTAGE",
"CURRENT",
"THRUST",
"ALTITUDE",
"ACCELERATION",
"TEMPERATURE",
"SCIENCE_COLLECTION",
"LABEL_ERROR_STATE"
"time",
"state_0.x",
"state_0.y",
"state_1.x",
"state_1.y",
"state_2.x",
"state_2.y"
]
}
5 changes: 4 additions & 1 deletion onair/data_handling/on_air_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
# See "NOSA GSC-19165-1 OnAIR.pdf"

from abc import ABC, abstractmethod
from .parser_util import *
from .parser_util import *

class ConfigKeyError(KeyError):
pass

class OnAirDataSource(ABC):
def __init__(self, data_file, meta_file, ss_breakdown = False):
Expand Down
91 changes: 74 additions & 17 deletions onair/data_handling/redis_adapter.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# GSC-19165-1, "The On-Board Artificial Intelligence Research (OnAIR) Platform"
#
# Copyright © 2023 United States Government as represented by the Administrator of
# the National Aeronautics and Space Administration. No copyright is claimed in the
# United States under Title 17, U.S. Code. All Other Rights Reserved.
# Copyright © 2023 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# No copyright is claimed in the United States under Title 17, U.S. Code.
# All Other Rights Reserved.
#
# Licensed under the NASA Open Source Agreement version 1.3
# See "NOSA GSC-19165-1 OnAIR.pdf"
Expand All @@ -19,6 +20,7 @@
import json

from onair.data_handling.on_air_data_source import OnAirDataSource
from onair.data_handling.on_air_data_source import ConfigKeyError
from onair.data_handling.tlm_json_parser import parseJson
from onair.src.util.print_io import *
from onair.data_handling.parser_util import *
Expand All @@ -30,12 +32,14 @@ def __init__(self, data_file, meta_file, ss_breakdown = False):
self.address = 'localhost'
self.port = 6379
self.db = 0
self.server = None
self.server = None
self.new_data_lock = threading.Lock()
self.new_data = False
self.currentData = []
self.currentData.append({'headers':None, 'data':None})
self.currentData.append({'headers':None, 'data':None})
self.currentData.append({'headers':self.order,
'data':list('-' * len(self.order))})
self.currentData.append({'headers':self.order,
'data':list('-' * len(self.order))})
self.double_buffer_read_index = 0
self.connect()
self.subscribe(self.subscriptions)
Expand Down Expand Up @@ -63,8 +67,17 @@ def subscribe(self, subscriptions):
print_msg(f"No subscriptions given!")

def parse_meta_data_file(self, meta_data_file, ss_breakdown):
configs = extract_meta_data_handle_ss_breakdown(meta_data_file, ss_breakdown)
configs = extract_meta_data_handle_ss_breakdown(
meta_data_file, ss_breakdown)
meta = parseJson(meta_data_file)
keys = meta.keys()

if 'order' in keys:
self.order = meta['order']
else:
raise ConfigKeyError(f'Config file: \'{meta_data_file}\' ' \
'missing required key \'order\'')

if 'redis_subscriptions' in meta.keys():
self.subscriptions = meta['redis_subscriptions']
else:
Expand All @@ -85,14 +98,15 @@ def get_next(self):
while not data_available:
with self.new_data_lock:
data_available = self.has_data()

if not data_available:
time.sleep(0.01)

read_index = 0
with self.new_data_lock:
self.new_data = False
self.double_buffer_read_index = (self.double_buffer_read_index + 1) % 2
self.double_buffer_read_index = (
self.double_buffer_read_index + 1) % 2
read_index = self.double_buffer_read_index

return self.currentData[read_index]['data']
Expand All @@ -102,18 +116,61 @@ def has_more(self):
return True

def message_listener(self):
"""Loop for listening for messages on channel"""
"""Loop for listening for messages on channels"""
for message in self.pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])

currentData = self.currentData[(self.double_buffer_read_index + 1) %2]
currentData['headers'] = list(data.keys())
currentData['data'] = list(data.values())

channel_name = f"{message['channel'].decode()}"
# Attempt to load message as json
try:
data = json.loads(message['data'])
except ValueError:
# Warn of non-json conforming channel data received
non_json_msg = f'Subscribed channel `{channel_name}\' ' \
'message received but is not in json ' \
f'format.\nMessage:\n{message["data"]}'
print_msg(non_json_msg, ['WARNING'])
continue
# Select the current data
currentData = self.currentData[
(self.double_buffer_read_index + 1) % 2]
# turn all data points to unknown
currentData['data'] = ['-' for _ in currentData['data']]
# Find expected keys for received channel
expected_message_keys = \
[k for k in currentData['headers'] if channel_name in k]
# Time is an expected key for all channels
expected_message_keys.append("time")
# Parse through the message keys for data points
for key in list(data.keys()):
if key.lower() == 'time':
header_string = key.lower()
else:
header_string = f"{channel_name}.{key}"
# Look for channel specific values
try:
index = currentData['headers'].index(header_string)
currentData['data'][index] = data[key]
expected_message_keys.remove(header_string)
# Unexpected key in data
except ValueError:
# warn user about key in data that is not in header
print_msg(f"Unused key `{key}' in message " \
f'from channel `{channel_name}.\'',
['WARNING'])
with self.new_data_lock:
self.new_data = True
# Warn user about expected keys missing from received data
for k in expected_message_keys:
print_msg(f'Message from channel `{channel_name}\' ' \
f'did not contain `{k}\' key\nMessage:\n' \
f'{data}', ['WARNING'])
else:
# Warn user about non message receipts
print_msg(f"Redis adapter: channel " \
f"'{message['channel'].decode()}' received " \
f"message type: {message['type']}.", ['WARNING'])
# When listener loop exits warn user
print_msg("Redis subscription listener exited.", ['WARNING'])

def has_data(self):
return self.new_data

38 changes: 38 additions & 0 deletions redis-experiment-publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import redis
import time
import random

# Initialize the Redis connection
redis_host = "localhost"
redis_port = 6379
# When your Redis server requires a password, fill it in here
redis_password = ""
# Connect to Redis
r = redis.Redis(host=redis_host,
port=redis_port,
password=redis_password,
decode_responses=True)
# List of channel names
channels = ['state_0', 'state_1', 'state_2']
# Publish messages on each channel in random order
def publish_messages():
loop_count = 0
inner_loop_count = 0
max_loops = 9
while loop_count < max_loops:
random.shuffle(channels)
for channel in channels:
r.publish(channel, f'{{"time":{inner_loop_count}, ' \
f'"x":{inner_loop_count+0.1}, ' \
f'"y":{inner_loop_count+0.2}}}')
print(f"Published data to {channel}, " \
f"[{inner_loop_count}, " \
f"{inner_loop_count+0.1}, " \
f"{inner_loop_count+0.2}]")
inner_loop_count += 1
time.sleep(2)
loop_count += 1
print(f"Completed {loop_count} loops")

if __name__ == "__main__":
publish_messages()
Loading

0 comments on commit c4b947b

Please sign in to comment.