From c4b947bbfac000ebf33b78b4d536829a4dfe96e9 Mon Sep 17 00:00:00 2001 From: Alan Gibson Date: Wed, 13 Mar 2024 12:06:12 -0400 Subject: [PATCH] Updates to enable multi-channel redis Updated CONFIG for use by redis-experiment-publisher shows a simple way to see data move through redis to OnAIR Added custom error to DataSource for key config errors Fixed columnization of redis_adapter Redis adapter now checks multiple channels detects when time is missing from received message helps detect unused keys helps detect missing expected keys warns if listener exits Updated and added necessary unit tests for coverage and code verification --- .../redis_example_CONFIG.json | 98 ++----- onair/data_handling/on_air_data_source.py | 5 +- onair/data_handling/redis_adapter.py | 91 ++++-- redis-experiment-publisher.py | 38 +++ .../onair/data_handling/test_redis_adapter.py | 275 ++++++++++++++++-- 5 files changed, 384 insertions(+), 123 deletions(-) create mode 100644 redis-experiment-publisher.py diff --git a/onair/data/telemetry_configs/redis_example_CONFIG.json b/onair/data/telemetry_configs/redis_example_CONFIG.json index da43e8f0..d99b6bda 100644 --- a/onair/data/telemetry_configs/redis_example_CONFIG.json +++ b/onair/data/telemetry_configs/redis_example_CONFIG.json @@ -1,89 +1,41 @@ { "subsystems": { - "NONE": { - "Time": { - "conversion": "", - "tests": { - "NOOP": "[]" - }, - "description": "No description" + "STATES": { + "time": { + "description": "Time of latest receipt of values" }, - "THRUST": { - "conversion": "", - "tests": { - "FEASIBILITY": "[-999999999999999, 999999999999999]" - }, - "description": "No description" + "state_0.x": { + "description": "Vehicle 0's current state of x" }, - "ALTITUDE": { - "conversion": "", - "tests": { - "FEASIBILITY": "[-999999999999999, 999999999999999]" - }, - "description": "No description" + "state_0.y": { + "description": "Vehicle 0's current state of y" }, - "ACCELERATION": { - "conversion": "", - "tests": { - "FEASIBILITY": "[-20.1111, 20.1111]" - }, - "description": "No description" + "state_1.x": { + "description": "Vehicle 1's current state of x" }, - "SCIENCE_COLLECTION": { - "conversion": "", - "tests": { - "FEASIBILITY": "[-0.9999, 1.1111]" - }, - "description": "No description" + "state_1.y": { + "description": "Vehicle 1's current state of y" }, - "LABEL_ERROR_STATE": { - "conversion": "", - "tests": { - "NOOP": "[]" - }, - "description": "No description" - } - }, - "POWER": { - "VOLTAGE": { - "conversion": "", - "tests": { - "FEASIBILITY": "[12.9999, 18.1111]" - }, - "description": "No description" + "state_2.x": { + "description": "Vehicle 2's current state of x" }, - "CURRENT": { - "conversion": "", - "tests": { - "FEASIBILITY": "[2.9999, 5.1111]" - }, - "description": "No description" - } - }, - "THERMAL": { - "TEMPERATURE": { - "conversion": "", - "tests": { - "FEASIBILITY": "[9.9999, 90.1111]" - }, - "description": "No description" + "state_2.y": { + "description": "Vehicle 2's current state of y" } } }, "redis_subscriptions": [ - "vehicle_0_state", - "vehicle_1_state", - "vehicle_2_state" + "state_0", + "state_1", + "state_2" ], "order": [ - "Time", - "VOLTAGE", - "CURRENT", - "THRUST", - "ALTITUDE", - "ACCELERATION", - "TEMPERATURE", - "SCIENCE_COLLECTION", - "LABEL_ERROR_STATE" + "time", + "state_0.x", + "state_0.y", + "state_1.x", + "state_1.y", + "state_2.x", + "state_2.y" ] } \ No newline at end of file diff --git a/onair/data_handling/on_air_data_source.py b/onair/data_handling/on_air_data_source.py index 80bdc4b5..edd60066 100644 --- a/onair/data_handling/on_air_data_source.py +++ b/onair/data_handling/on_air_data_source.py @@ -8,7 +8,10 @@ # See "NOSA GSC-19165-1 OnAIR.pdf" from abc import ABC, abstractmethod -from .parser_util import * +from .parser_util import * + +class ConfigKeyError(KeyError): + pass class OnAirDataSource(ABC): def __init__(self, data_file, meta_file, ss_breakdown = False): diff --git a/onair/data_handling/redis_adapter.py b/onair/data_handling/redis_adapter.py index d469f71e..ba346004 100644 --- a/onair/data_handling/redis_adapter.py +++ b/onair/data_handling/redis_adapter.py @@ -1,8 +1,9 @@ # GSC-19165-1, "The On-Board Artificial Intelligence Research (OnAIR) Platform" # -# Copyright © 2023 United States Government as represented by the Administrator of -# the National Aeronautics and Space Administration. No copyright is claimed in the -# United States under Title 17, U.S. Code. All Other Rights Reserved. +# Copyright © 2023 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# No copyright is claimed in the United States under Title 17, U.S. Code. +# All Other Rights Reserved. # # Licensed under the NASA Open Source Agreement version 1.3 # See "NOSA GSC-19165-1 OnAIR.pdf" @@ -19,6 +20,7 @@ import json from onair.data_handling.on_air_data_source import OnAirDataSource +from onair.data_handling.on_air_data_source import ConfigKeyError from onair.data_handling.tlm_json_parser import parseJson from onair.src.util.print_io import * from onair.data_handling.parser_util import * @@ -30,12 +32,14 @@ def __init__(self, data_file, meta_file, ss_breakdown = False): self.address = 'localhost' self.port = 6379 self.db = 0 - self.server = None + self.server = None self.new_data_lock = threading.Lock() self.new_data = False self.currentData = [] - self.currentData.append({'headers':None, 'data':None}) - self.currentData.append({'headers':None, 'data':None}) + self.currentData.append({'headers':self.order, + 'data':list('-' * len(self.order))}) + self.currentData.append({'headers':self.order, + 'data':list('-' * len(self.order))}) self.double_buffer_read_index = 0 self.connect() self.subscribe(self.subscriptions) @@ -63,8 +67,17 @@ def subscribe(self, subscriptions): print_msg(f"No subscriptions given!") def parse_meta_data_file(self, meta_data_file, ss_breakdown): - configs = extract_meta_data_handle_ss_breakdown(meta_data_file, ss_breakdown) + configs = extract_meta_data_handle_ss_breakdown( + meta_data_file, ss_breakdown) meta = parseJson(meta_data_file) + keys = meta.keys() + + if 'order' in keys: + self.order = meta['order'] + else: + raise ConfigKeyError(f'Config file: \'{meta_data_file}\' ' \ + 'missing required key \'order\'') + if 'redis_subscriptions' in meta.keys(): self.subscriptions = meta['redis_subscriptions'] else: @@ -85,14 +98,15 @@ def get_next(self): while not data_available: with self.new_data_lock: data_available = self.has_data() - + if not data_available: time.sleep(0.01) read_index = 0 with self.new_data_lock: self.new_data = False - self.double_buffer_read_index = (self.double_buffer_read_index + 1) % 2 + self.double_buffer_read_index = ( + self.double_buffer_read_index + 1) % 2 read_index = self.double_buffer_read_index return self.currentData[read_index]['data'] @@ -102,18 +116,61 @@ def has_more(self): return True def message_listener(self): - """Loop for listening for messages on channel""" + """Loop for listening for messages on channels""" for message in self.pubsub.listen(): if message['type'] == 'message': - data = json.loads(message['data']) - - currentData = self.currentData[(self.double_buffer_read_index + 1) %2] - currentData['headers'] = list(data.keys()) - currentData['data'] = list(data.values()) - + channel_name = f"{message['channel'].decode()}" + # Attempt to load message as json + try: + data = json.loads(message['data']) + except ValueError: + # Warn of non-json conforming channel data received + non_json_msg = f'Subscribed channel `{channel_name}\' ' \ + 'message received but is not in json ' \ + f'format.\nMessage:\n{message["data"]}' + print_msg(non_json_msg, ['WARNING']) + continue + # Select the current data + currentData = self.currentData[ + (self.double_buffer_read_index + 1) % 2] + # turn all data points to unknown + currentData['data'] = ['-' for _ in currentData['data']] + # Find expected keys for received channel + expected_message_keys = \ + [k for k in currentData['headers'] if channel_name in k] + # Time is an expected key for all channels + expected_message_keys.append("time") + # Parse through the message keys for data points + for key in list(data.keys()): + if key.lower() == 'time': + header_string = key.lower() + else: + header_string = f"{channel_name}.{key}" + # Look for channel specific values + try: + index = currentData['headers'].index(header_string) + currentData['data'][index] = data[key] + expected_message_keys.remove(header_string) + # Unexpected key in data + except ValueError: + # warn user about key in data that is not in header + print_msg(f"Unused key `{key}' in message " \ + f'from channel `{channel_name}.\'', + ['WARNING']) with self.new_data_lock: self.new_data = True + # Warn user about expected keys missing from received data + for k in expected_message_keys: + print_msg(f'Message from channel `{channel_name}\' ' \ + f'did not contain `{k}\' key\nMessage:\n' \ + f'{data}', ['WARNING']) + else: + # Warn user about non message receipts + print_msg(f"Redis adapter: channel " \ + f"'{message['channel'].decode()}' received " \ + f"message type: {message['type']}.", ['WARNING']) + # When listener loop exits warn user + print_msg("Redis subscription listener exited.", ['WARNING']) def has_data(self): return self.new_data - \ No newline at end of file diff --git a/redis-experiment-publisher.py b/redis-experiment-publisher.py new file mode 100644 index 00000000..b5b6b53e --- /dev/null +++ b/redis-experiment-publisher.py @@ -0,0 +1,38 @@ +import redis +import time +import random + +# Initialize the Redis connection +redis_host = "localhost" +redis_port = 6379 +# When your Redis server requires a password, fill it in here +redis_password = "" +# Connect to Redis +r = redis.Redis(host=redis_host, + port=redis_port, + password=redis_password, + decode_responses=True) +# List of channel names +channels = ['state_0', 'state_1', 'state_2'] +# Publish messages on each channel in random order +def publish_messages(): + loop_count = 0 + inner_loop_count = 0 + max_loops = 9 + while loop_count < max_loops: + random.shuffle(channels) + for channel in channels: + r.publish(channel, f'{{"time":{inner_loop_count}, ' \ + f'"x":{inner_loop_count+0.1}, ' \ + f'"y":{inner_loop_count+0.2}}}') + print(f"Published data to {channel}, " \ + f"[{inner_loop_count}, " \ + f"{inner_loop_count+0.1}, " \ + f"{inner_loop_count+0.2}]") + inner_loop_count += 1 + time.sleep(2) + loop_count += 1 + print(f"Completed {loop_count} loops") + +if __name__ == "__main__": + publish_messages() diff --git a/test/onair/data_handling/test_redis_adapter.py b/test/onair/data_handling/test_redis_adapter.py index 38bc6928..777dfbb3 100644 --- a/test/onair/data_handling/test_redis_adapter.py +++ b/test/onair/data_handling/test_redis_adapter.py @@ -12,6 +12,7 @@ import onair.data_handling.redis_adapter as redis_adapter from onair.data_handling.redis_adapter import DataSource from onair.data_handling.on_air_data_source import OnAirDataSource +from onair.data_handling.on_air_data_source import ConfigKeyError import redis import threading @@ -33,6 +34,10 @@ def test_redis_adapter_DataSource__init__sets_redis_values_then_connects_and_sub cut = DataSource.__new__(DataSource) cut.subscriptions = expected_subscriptions + fake_order = MagicMock() + fake_order.__len__.return_value = \ + pytest.gen.randint(1, 10) # from 1 to 10 arbitrary + cut.order = fake_order mocker.patch.object(OnAirDataSource, '__init__', new=MagicMock()) mocker.patch('threading.Lock', return_value=fake_new_data_lock) @@ -51,7 +56,10 @@ def test_redis_adapter_DataSource__init__sets_redis_values_then_connects_and_sub assert cut.server == expected_server assert cut.new_data_lock == fake_new_data_lock assert cut.new_data == False - assert cut.currentData == [{'headers':None, 'data':None}, {'headers':None, 'data':None}] + assert cut.currentData == [{'headers':fake_order, + 'data':list('-' * len(fake_order))}, + {'headers':fake_order, + 'data':list('-' * len(fake_order))}] assert cut.double_buffer_read_index == 0 assert cut.connect.call_count == 1 assert cut.connect.call_args_list[0].args == () @@ -206,7 +214,6 @@ def test_redis_adapter_DataSource_subscribe_states_no_subscriptions_given_when_s assert redis_adapter.print_msg.call_args_list[0].args == ("No subscriptions given!",) # get_next tests - def test_redis_adapter_DataSource_get_next_returns_expected_data_when_new_data_is_true_and_double_buffer_read_index_is_0(): # Arrange # Renew DataSource to ensure test independence @@ -324,61 +331,237 @@ def test_redis_adapter_DataSource_has_more_always_returns_True(): assert cut.has_more() == True # message_listener tests -def test_redis_adapter_DataSource_message_listener_does_not_load_json_when_receive_type_is_not_message(mocker): +def test_redis_adapter_DataSource_message_listener_warns_of_exit_and_does_not_run_for_loop_when_listen_returns_StopIteration(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + + cut.pubsub = MagicMock(name="cut.pubsub") + fake_listener = MagicMock(name='fake_listener') + fake_listener.__next__.side_effect = StopIteration + mocker.patch.object(cut.pubsub, 'listen', side_effect=[fake_listener]) + mocker.patch(redis_adapter.__name__ + '.json.loads') + mocker.patch(redis_adapter.__name__ + '.print_msg') + + # Act + cut.message_listener() + + # Assert + assert redis_adapter.json.loads.call_count == 0 + assert redis_adapter.print_msg.call_count == 1 + assert redis_adapter.print_msg.call_args_list[0].args == ("Redis subscription listener exited.", ['WARNING']) + +def test_redis_adapter_DataSource_message_listener_prints_warning_when_receiving_non_message_type(mocker): # Arrange cut = DataSource.__new__(DataSource) + + cut.pubsub = MagicMock() ignored_message_types = ['subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe', 'pmessage'] fake_message = {} fake_message['type'] = pytest.gen.choice(ignored_message_types) - - cut.pubsub = MagicMock() + fake_message['channel'] = str(MagicMock(name='fake_message')).encode('utf-8') mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) mocker.patch(redis_adapter.__name__ + '.json.loads') + mocker.patch(redis_adapter.__name__ + '.print_msg') # Act cut.message_listener() # Assert assert redis_adapter.json.loads.call_count == 0 + assert redis_adapter.print_msg.call_count == 2 + assert redis_adapter.print_msg.call_args_list[0].args == ( + f"Redis adapter: channel '{fake_message['channel'].decode()}' received " \ + f"message type: {fake_message['type']}.", ['WARNING']) + assert redis_adapter.print_msg.call_args_list[1].args == ( + "Redis subscription listener exited.", ['WARNING']) + +def test_redis_adapter_DataSource_message_listener_prints_warning_when_data_not_json_format_and_does_not_update_frame(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + + cut.pubsub = MagicMock() + fake_message = {} + fake_message['type'] = 'message' + fake_message['channel'] = str( + MagicMock(name='fake_message_channel')).encode('utf-8') + fake_message['data'] = str(MagicMock(name='fake_message_data')) + mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch(redis_adapter.__name__ + '.json.loads', side_effect=ValueError) + mocker.patch(redis_adapter.__name__ + '.print_msg') -def test_redis_adapter_DataSource_message_listener_loads_message_info_when_receive_type_is_message(mocker): + # Act + cut.message_listener() + + # Assert + assert redis_adapter.json.loads.call_count == 1 + assert redis_adapter.json.loads.call_args_list[0].args == ( + fake_message['data'], ) + assert redis_adapter.print_msg.call_count == 2 + assert redis_adapter.print_msg.call_args_list[0].args == ( + f'Subscribed channel `{fake_message["channel"].decode()}\' message ' \ + 'received but is not in json format.\nMessage:\n' \ + f'{fake_message["data"]}', ['WARNING']) + assert redis_adapter.print_msg.call_args_list[1].args == ( + "Redis subscription listener exited.", ['WARNING']) + +def test_redis_adapter_DataSource_message_listener_warns_user_when_processed_data_did_not_contain_time(mocker): # Arrange cut = DataSource.__new__(DataSource) + cut.double_buffer_read_index = pytest.gen.choice([0 , 1]) + cut.currentData = {0: {'headers': [], 'data': []}, + 1: {'headers': [], 'data': []}} + cut.pubsub = MagicMock() cut.new_data_lock = MagicMock() - cut.new_data = None - cut.double_buffer_read_index = pytest.gen.randint(0,1) - cut.currentData = [{}, {}] + cut.new_data = False + + fake_message = {} + fake_message['type'] = 'message' + fake_message['channel'] = str( + MagicMock(name='fake_message_channel')).encode('utf-8') + fake_message['data'] = '{}' # empty_message + mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch(redis_adapter.__name__ + '.json.loads', return_value={}) + mocker.patch(redis_adapter.__name__ + '.print_msg') + + # Act + cut.message_listener() + + # Assert + assert redis_adapter.json.loads.call_count == 1 + assert redis_adapter.json.loads.call_args_list[0].args == ( + fake_message['data'], ) + assert redis_adapter.print_msg.call_count == 2 + assert redis_adapter.print_msg.call_args_list[0].args == ( + f'Message from channel `{fake_message["channel"].decode()}\' ' \ + f'did not contain `time\' key\nMessage:\n{fake_message["data"]}', \ + ['WARNING']) + assert redis_adapter.print_msg.call_args_list[1].args == ( + "Redis subscription listener exited.", ['WARNING']) + +def test_redis_adapter_DataSource_message_listener_warns_of_received_key_that_does_not_exist_in_header(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + cut.double_buffer_read_index = pytest.gen.choice([0 , 1]) + cut.currentData = {0: {'headers': ['time'], + 'data': ['-']}, + 1: {'headers': ['time'], + 'data': ['-']}} + cut.pubsub = MagicMock() + cut.new_data_lock = MagicMock() + cut.new_data = False + + fake_message = {} + fake_message['type'] = 'message' + fake_message['channel'] = str( + MagicMock(name='fake_message_channel')).encode('utf-8') + fake_message['data'] = '{"time":0, "unknown_key":0}' + mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch(redis_adapter.__name__ + '.json.loads', return_value={"time":0, "unknown_key":0}) + mocker.patch(redis_adapter.__name__ + '.print_msg') + + # Act + cut.message_listener() + + # Assert + assert redis_adapter.json.loads.call_count == 1 + assert redis_adapter.json.loads.call_args_list[0].args == ( + fake_message['data'], ) + assert redis_adapter.print_msg.call_count == 2 + assert redis_adapter.print_msg.call_args_list[0].args == ( + f"Unused key `unknown_key' in message " \ + f'from channel `{fake_message["channel"].decode()}.\'', ['WARNING']) + assert redis_adapter.print_msg.call_args_list[1].args == ( + "Redis subscription listener exited.", ['WARNING']) + +def test_redis_adapter_DataSource_message_listener_warns_of_expected_keys_that_do_not_appear_in_message(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + cut.double_buffer_read_index = pytest.gen.choice([0 , 1]) + cut.pubsub = MagicMock() + cut.new_data_lock = MagicMock() + cut.new_data = False + + fake_message = {} + fake_message['type'] = 'message' + fake_message['channel'] = str( + MagicMock(name='fake_message_channel')).encode('utf-8') + cut.currentData = {0: {'headers': ['time', + f'{fake_message["channel"].decode()}' \ + '.missing_key'], + 'data': ['-', '-']}, + 1: {'headers': ['time', + f'{fake_message["channel"].decode()}' \ + '.missing_key'], + 'data': ['-', '-']}} + fake_message['data'] = '{}' + mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch(redis_adapter.__name__ + '.json.loads', return_value={}) + mocker.patch(redis_adapter.__name__ + '.print_msg') + + # Act + cut.message_listener() + + # Assert + assert redis_adapter.json.loads.call_count == 1 + assert redis_adapter.json.loads.call_args_list[0].args == ( + fake_message['data'], ) + assert redis_adapter.print_msg.call_count == 3 + assert redis_adapter.print_msg.call_args_list[0].args == ( + f'Message from channel `{fake_message["channel"].decode()}\' ' \ + f'did not contain `{fake_message["channel"].decode()}.missing_key\'' \ + f' key\nMessage:\n{fake_message["data"]}', \ + ['WARNING']) + assert redis_adapter.print_msg.call_args_list[1].args == ( + f'Message from channel `{fake_message["channel"].decode()}\' ' \ + f'did not contain `time\' key\nMessage:\n{fake_message["data"]}', \ + ['WARNING']) + assert redis_adapter.print_msg.call_args_list[2].args == ( + "Redis subscription listener exited.", ['WARNING']) + +def test_redis_adapter_DataSource_message_listener_updates_new_data_with_received_data_by_channel_and_key_matched_to_frame_header(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + cut.double_buffer_read_index = pytest.gen.choice([0 , 1]) cut.pubsub = MagicMock() + cut.new_data_lock = MagicMock() + cut.new_data = False fake_message = {} - fake_message_data = {} fake_message['type'] = 'message' - fake_message['data'] = fake_message_data - fake_data = {} - - expected_index = (cut.double_buffer_read_index + 1) % 2 - expected_data_headers = [] - expected_data_values = [] - - num_fake_data = pytest.gen.randint(1,10) - for i in range(num_fake_data): - fake_data_header = str(i) - fake_data_value = MagicMock() - fake_data[fake_data_header] = fake_data_value - expected_data_headers.append(fake_data_header) - expected_data_values.append(fake_data_value) + fake_message['channel'] = str( + MagicMock(name='fake_message_channel')).encode('utf-8') + cut.currentData = {0: {'headers': ['time', + f'{fake_message["channel"].decode()}' \ + '.correct_key', 'fakeotherchannel.x'], + 'data': ['-', '-', '0']}, + 1: {'headers': ['time', + f'{fake_message["channel"].decode()}' \ + '.correct_key', 'fakeotherchannel.x'], + 'data': ['-', '-', '0']}} + fake_message['data'] = '{}' mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) - mocker.patch(redis_adapter.__name__ + '.json.loads', return_value=fake_data) + fake_data = { + 'time': pytest.gen.randint(1, 100), # from 1 to 100 arbitrary + 'correct_key': pytest.gen.randint(1, 100), # from 1 to 100 arbitrary + } + mocker.patch(redis_adapter.__name__ + '.json.loads', + return_value=fake_data) + mocker.patch(redis_adapter.__name__ + '.print_msg') # Act cut.message_listener() # Assert assert redis_adapter.json.loads.call_count == 1 - assert redis_adapter.json.loads.call_args_list[0].args == (fake_message_data,) - assert cut.currentData[expected_index]['headers'] == expected_data_headers - assert cut.currentData[expected_index]['data'] == expected_data_values + assert redis_adapter.json.loads.call_args_list[0].args == ( + fake_message['data'], ) assert cut.new_data == True + print(cut.currentData[cut.double_buffer_read_index]) + assert cut.currentData[(cut.double_buffer_read_index + 1) % 2]['data'] == \ + [fake_data['time'], fake_data['correct_key'], '-'] + assert redis_adapter.print_msg.call_count == 1 + assert redis_adapter.print_msg.call_args_list[0].args == ( + "Redis subscription listener exited.", ['WARNING']) # has_data tests def test_redis_adapter_DataSource_has_data_returns_instance_new_data(): @@ -391,6 +574,34 @@ def test_redis_adapter_DataSource_has_data_returns_instance_new_data(): assert result == expected_result # redis_adapter parse_meta_data tests +def test_redis_adapter_DataSource_parse_meta_data_file_raises_ConfigKeyError_when_order_is_not_in_config_file(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + arg_configFile = MagicMock() + arg_ss_breakdown = MagicMock() + + expected_extracted_configs = MagicMock() + expected_subscriptions = [MagicMock()] * pytest.gen.randint(0, 10) # 0 to 10 arbitrary + fake_meta = {'fake_other_stuff': MagicMock(), + 'redis_subscriptions':expected_subscriptions} + + mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=expected_extracted_configs) + mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta) + + exception_message = (f'Config file: \'{arg_configFile}\' ' \ + 'missing required key \'order\'') + + # Act + with pytest.raises(ConfigKeyError) as e_info: + cut.parse_meta_data_file(arg_configFile, arg_ss_breakdown, ) + + # Assert + assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_count == 1 + assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_args_list[0].args == (arg_configFile, arg_ss_breakdown) + assert redis_adapter.parseJson.call_count == 1 + assert redis_adapter.parseJson.call_args_list[0].args == (arg_configFile, ) + assert e_info.match(exception_message) + def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_meta_data_handle_ss_breakdown_and_sets_subscriptions_when_redis_subscriptions_occupied(mocker): # Arrange cut = DataSource.__new__(DataSource) @@ -399,8 +610,9 @@ def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_m expected_extracted_configs = MagicMock() expected_subscriptions = [MagicMock()] * pytest.gen.randint(0, 10) # 0 to 10 arbitrary - fake_meta = {'fake_other_stuff': MagicMock(), 'redis_subscriptions':expected_subscriptions} - expected_result_configs = {'redis_subscriptions':expected_subscriptions} + fake_meta = {'fake_other_stuff': MagicMock(), + 'order': MagicMock(), + 'redis_subscriptions':expected_subscriptions} mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=expected_extracted_configs) mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta) @@ -423,8 +635,7 @@ def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_m arg_ss_breakdown = MagicMock() fake_configs = {'fake_other_stuff': MagicMock()} - expected_subscriptions = [MagicMock()] * pytest.gen.randint(0, 10) # 0 to 10 arbitrary - fake_meta = {} + fake_meta = {'order': MagicMock()} mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=fake_configs) mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta)