diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..1a164f5 --- /dev/null +++ b/__init__.py @@ -0,0 +1,4 @@ +import sys +import os + +sys.path.append(os.path.abspath(os.path.dirname(__file__))) \ No newline at end of file diff --git a/onair/data/telemetry_configs/redis_example_CONFIG.json b/onair/data/telemetry_configs/redis_example_CONFIG.json index 1813570..7c253e9 100644 --- a/onair/data/telemetry_configs/redis_example_CONFIG.json +++ b/onair/data/telemetry_configs/redis_example_CONFIG.json @@ -24,11 +24,25 @@ } } }, - "redis_subscriptions": [ - "state_0", - "state_1", - "state_2" - ], + + "redis" : [ + { + "address": "localhost", + "port": 6379, + "subscriptions": [ + "state_0" + ] + }, + { + "address": "localhost", + "port": 6380, + "subscriptions": [ + "state_1", + "state_2" + ] + } + ], + "order": [ "time", "state_0.x", diff --git a/onair/data_handling/redis_adapter.py b/onair/data_handling/redis_adapter.py index ba34600..4a9d4f6 100644 --- a/onair/data_handling/redis_adapter.py +++ b/onair/data_handling/redis_adapter.py @@ -29,12 +29,9 @@ class DataSource(OnAirDataSource): def __init__(self, data_file, meta_file, ss_breakdown = False): super().__init__(data_file, meta_file, ss_breakdown) - self.address = 'localhost' - self.port = 6379 - self.db = 0 - self.server = None self.new_data_lock = threading.Lock() self.new_data = False + self.servers = [] self.currentData = [] self.currentData.append({'headers':self.order, 'data':list('-' * len(self.order))}) @@ -42,47 +39,86 @@ def __init__(self, data_file, meta_file, ss_breakdown = False): 'data':list('-' * len(self.order))}) self.double_buffer_read_index = 0 self.connect() - self.subscribe(self.subscriptions) def connect(self): """Establish connection to REDIS server.""" print_msg('Redis adapter connecting to server...') - self.server = redis.Redis(self.address, self.port, self.db) + for idx, server_config in enumerate(self.server_configs): + server_config_keys = server_config.keys() + if 'address' in server_config_keys: + address = server_config['address'] + else: + address = 'localhost' - if self.server.ping(): - print_msg('... connected!') + if 'port' in server_config_keys: + port = server_config['port'] + else: + port = 6379 + + if 'db' in server_config_keys: + db = server_config['db'] + else: + db = 0 - def subscribe(self, subscriptions): - """Subscribe to REDIS message channel(s) and launch listener thread.""" - if len(subscriptions) != 0 and self.server.ping(): - self.pubsub = self.server.pubsub() + if 'password' in server_config_keys: + password = server_config['password'] + else: + password = '' + + #if there are subscriptions in this Redis server configuration's subscription key + if len(server_config['subscriptions']) != 0: + #Create the servers and append them to self.servers list + self.servers.append(redis.Redis(address, port, db, password)) - for s in subscriptions: - self.pubsub.subscribe(s) - print_msg(f"Subscribing to channel: {s}") + try: + #Ping server to make sure we can connect + self.servers[-1].ping() + print_msg(f'... connected to server # {idx}!') - listen_thread = threading.Thread(target=self.message_listener) - listen_thread.start() - else: - print_msg(f"No subscriptions given!") + #Set up Redis pubsub function for the current server + pubsub = self.servers[-1].pubsub() + + for s in server_config['subscriptions']: + pubsub.subscribe(s) + print_msg(f"Subscribing to channel: {s} on server # {idx}") + listen_thread = threading.Thread(target=self.message_listener, args=(pubsub,)) + listen_thread.start() + + #This except will be hit if self.servers[-1].ping() threw an exception (could not properly ping server) + except: + print_msg(f'Did not connect to server # {idx}. Not setting up subscriptions.', 'RED') + + else: + print_msg("No subscriptions given! Redis server not created") def parse_meta_data_file(self, meta_data_file, ss_breakdown): + self.server_configs = [] configs = extract_meta_data_handle_ss_breakdown( meta_data_file, ss_breakdown) meta = parseJson(meta_data_file) keys = meta.keys() + # Setup redis server configuration + #Checking if 'redis' exists + if 'redis' in keys: + count_server_config = 0 + #Checking if dictionaries within 'redis' key each have a 'subscription' key. Error will be thrown if not. + for server_config in meta['redis']: + redis_config_keys = server_config.keys() + if ('subscriptions' in redis_config_keys) == False: + raise ConfigKeyError(f'Config file: \'{meta_data_file}\' ' \ + f'missing required key \'subscriptions\' from {count_server_config} in key \'redis\'') + count_server_config +=1 + + #Saving all of Redis dictionaries from JSON file to self.server_configs + self.server_configs = meta['redis'] + if 'order' in keys: self.order = meta['order'] else: raise ConfigKeyError(f'Config file: \'{meta_data_file}\' ' \ - 'missing required key \'order\'') - - if 'redis_subscriptions' in meta.keys(): - self.subscriptions = meta['redis_subscriptions'] - else: - self.subscriptions = [] - + 'missing required key \'order\'') + return configs def process_data_file(self, data_file): @@ -115,9 +151,9 @@ def has_more(self): """Live connection should always return True""" return True - def message_listener(self): + def message_listener(self, pubsub): """Loop for listening for messages on channels""" - for message in self.pubsub.listen(): + for message in pubsub.listen(): if message['type'] == 'message': channel_name = f"{message['channel'].decode()}" # Attempt to load message as json diff --git a/redis-experiment-publisher-multi-server.py b/redis-experiment-publisher-multi-server.py new file mode 100644 index 0000000..056a015 --- /dev/null +++ b/redis-experiment-publisher-multi-server.py @@ -0,0 +1,70 @@ +import redis +import time +import random + +# Initialize the Redis connection for server #1 +redis_host = "localhost" +redis_port = 6379 +# When your Redis server requires a password, fill it in here +redis_password = "" +# Connect to Redis +r1 = redis.Redis(host=redis_host, + port=redis_port, + password=redis_password, + decode_responses=True) + +# Initialize the Redis connection for server #2 +redis_host = "localhost" +redis_port = 6380 # Make sure you start your redis server with a different port +# When your Redis server requires a password, fill it in here +redis_password = "" +# Connect to Redis +r2 = redis.Redis(host=redis_host, + port=redis_port, + password=redis_password, + decode_responses=True) + +# List of channel names +server1_channels = ['state_0'] +server2_channels = ['state_1', 'state_2'] +# Publish messages on each channel in random order +def publish_messages(): + loop_count = 0 + inner_loop_count = 0 + max_loops = 9 + while loop_count < max_loops: + random.shuffle(server1_channels) + for channel in server1_channels: + r1.publish(channel, f'{{"time":{inner_loop_count}, ' \ + f'"x":{inner_loop_count+0.1}, ' \ + f'"y":{inner_loop_count+0.2}}}') + + print(f"Published data to {channel}, " \ + f"[{inner_loop_count}, " \ + f"{inner_loop_count+0.1}, " \ + f"{inner_loop_count+0.2}]") + + inner_loop_count += 1 + time.sleep(2) + + random.shuffle(server2_channels) + for channel in server2_channels: + r2.publish(channel, f'{{"time":{inner_loop_count}, ' \ + f'"x":{inner_loop_count+0.1}, ' \ + f'"y":{inner_loop_count+0.2}}}') + + print(f"Published data to {channel}, " \ + f"[{inner_loop_count}, " \ + f"{inner_loop_count+0.1}, " \ + f"{inner_loop_count+0.2}]") + + inner_loop_count += 1 + time.sleep(2) + + loop_count += 1 + print(f"Completed {loop_count} loops") + + + +if __name__ == "__main__": + publish_messages() diff --git a/test/onair/data_handling/test_redis_adapter.py b/test/onair/data_handling/test_redis_adapter.py index 777dfbb..bd58360 100644 --- a/test/onair/data_handling/test_redis_adapter.py +++ b/test/onair/data_handling/test_redis_adapter.py @@ -18,12 +18,9 @@ import threading # __init__ tests -def test_redis_adapter_DataSource__init__sets_redis_values_then_connects_and_subscribes_to_subscriptions(mocker): +def test_redis_adapter_DataSource__init__sets_redis_values_then_connects(mocker): # Arrange - expected_address = 'localhost' - expected_port = 6379 - expected_db = 0 - expected_server = None + expected_server = [] expected_subscriptions = MagicMock() arg_data_file = MagicMock() @@ -33,7 +30,6 @@ def test_redis_adapter_DataSource__init__sets_redis_values_then_connects_and_sub fake_new_data_lock = MagicMock() cut = DataSource.__new__(DataSource) - cut.subscriptions = expected_subscriptions fake_order = MagicMock() fake_order.__len__.return_value = \ pytest.gen.randint(1, 10) # from 1 to 10 arbitrary @@ -42,7 +38,6 @@ def test_redis_adapter_DataSource__init__sets_redis_values_then_connects_and_sub mocker.patch.object(OnAirDataSource, '__init__', new=MagicMock()) mocker.patch('threading.Lock', return_value=fake_new_data_lock) mocker.patch.object(cut, 'connect') - mocker.patch.object(cut, 'subscribe') # Act cut.__init__(arg_data_file, arg_meta_file, arg_ss_breakdown) @@ -50,11 +45,9 @@ def test_redis_adapter_DataSource__init__sets_redis_values_then_connects_and_sub # Assert assert OnAirDataSource.__init__.call_count == 1 assert OnAirDataSource.__init__.call_args_list[0].args == (arg_data_file, arg_meta_file, arg_ss_breakdown) - assert cut.address == expected_address - assert cut.port == expected_port - assert cut.db == expected_db - assert cut.server == expected_server + assert cut.servers == expected_server assert cut.new_data_lock == fake_new_data_lock + assert threading.Lock.call_count == 1 assert cut.new_data == False assert cut.currentData == [{'headers':fake_order, 'data':list('-' * len(fake_order))}, @@ -63,177 +56,192 @@ def test_redis_adapter_DataSource__init__sets_redis_values_then_connects_and_sub assert cut.double_buffer_read_index == 0 assert cut.connect.call_count == 1 assert cut.connect.call_args_list[0].args == () - assert cut.subscribe.call_count == 1 - assert cut.subscribe.call_args_list[0].args == (expected_subscriptions, ) # connect tests def test_redis_adapter_DataSource_connect_establishes_server_with_initialized_attributes(mocker): # Arrange - expected_address = MagicMock() - expected_port = MagicMock() - expected_db = MagicMock() + fake_server_configs = [{"address": MagicMock(), "port": 1234,"db": 1, "password": 'test', "subscriptions": ["state_0", "state_1"]}, {"address": '000.000.000.222', "port": 5678, "db": 2, "password": 'test2', "subscriptions" : ["state_2", "state_3"]}] + fake_server = MagicMock() - cut = DataSource.__new__(DataSource) - cut.address = expected_address - cut.port = expected_port - cut.db = expected_db + fake_message_listener = MagicMock() + fake_listen_thread = MagicMock() + cut = DataSource.__new__(DataSource) + cut.server_configs = fake_server_configs + cut.servers = [] + cut.message_listener = fake_message_listener + + mocker.patch(redis_adapter.__name__ + '.print_msg') mocker.patch('redis.Redis', return_value=fake_server) + mocker.patch.object(fake_server, 'ping') + mocker.patch('threading.Thread', return_value=fake_listen_thread) + mocker.patch.object(fake_listen_thread, 'start') # Act cut.connect() # Assert - assert redis_adapter.print_msg.call_count == 2 + assert redis_adapter.print_msg.call_count == 7 assert redis_adapter.print_msg.call_args_list[0].args == ('Redis adapter connecting to server...',) - assert redis.Redis.call_count == 1 - assert redis.Redis.call_args_list[0].args == (expected_address, expected_port, expected_db) - assert fake_server.ping.call_count == 1 - assert redis_adapter.print_msg.call_args_list[1].args == ('... connected!',) - assert cut.server == fake_server + assert redis_adapter.print_msg.call_args_list[1].args == ('... connected to server # 0!',) + assert redis_adapter.print_msg.call_args_list[2].args == ('Subscribing to channel: state_0 on server # 0',) + assert redis_adapter.print_msg.call_args_list[3].args == ('Subscribing to channel: state_1 on server # 0',) + assert redis_adapter.print_msg.call_args_list[4].args == ('... connected to server # 1!',) + assert redis_adapter.print_msg.call_args_list[5].args == ('Subscribing to channel: state_2 on server # 1',) + assert redis_adapter.print_msg.call_args_list[6].args == ('Subscribing to channel: state_3 on server # 1',) + + assert redis.Redis.call_count == 2 + assert redis.Redis.call_args_list[0].args == (fake_server_configs[0]["address"], fake_server_configs[0]["port"], fake_server_configs[0]["db"], fake_server_configs[0]["password"] ) + assert redis.Redis.call_args_list[1].args == (fake_server_configs[1]["address"], fake_server_configs[1]["port"], fake_server_configs[1]["db"], fake_server_configs[1]["password"] ) + + assert fake_server.ping.call_count == 2 + assert cut.servers == [fake_server, fake_server] -def test_redis_adapter_DataSource_fails_to_connect_to_server(mocker): +# connect tests +def test_redis_adapter_DataSource_connect_establishes_server_with_default_attributes(mocker): # Arrange - expected_address = MagicMock() - expected_port = MagicMock() - expected_db = MagicMock() + expected_address = 'localhost' + expected_port = 6379 + expected_db = 0 + expected_password = '' + + fake_server_configs = [{"subscriptions": ["state_0", "state_1"]}, {"subscriptions" : ["state_2", "state_3"]}] + fake_server = MagicMock() - cut = DataSource.__new__(DataSource) - cut.address = expected_address - cut.port = expected_port - cut.db = expected_db + fake_message_listener = MagicMock() + fake_listen_thread = MagicMock() + cut = DataSource.__new__(DataSource) + cut.server_configs = fake_server_configs + cut.servers = [] + cut.message_listener = fake_message_listener + + mocker.patch(redis_adapter.__name__ + '.print_msg') mocker.patch('redis.Redis', return_value=fake_server) - mocker.patch.object(fake_server, 'ping', return_value=False) + mocker.patch.object(fake_server, 'ping') + mocker.patch('threading.Thread', return_value=fake_listen_thread) + mocker.patch.object(fake_listen_thread, 'start') # Act cut.connect() # Assert - assert redis_adapter.print_msg.call_count == 1 + assert redis.Redis.call_count == 2 + assert redis.Redis.call_args_list[0].args == (expected_address, expected_port, expected_db, expected_password ) + assert redis.Redis.call_args_list[1].args == (expected_address, expected_port, expected_db, expected_password ) + + +def test_redis_adapter_DataSource_fails_to_connect_to_server_with_ping_and_states_no_subscriptions_(mocker): + fake_server_configs = [{"subscriptions": ["state_0", "state_1"]}, {"subscriptions": ["state_2", "state_3"]}] + fake_server = MagicMock() + + cut = DataSource.__new__(DataSource) + cut.server_configs = fake_server_configs + cut.servers = [] + + mocker.patch(redis_adapter.__name__ + '.print_msg') + mocker.patch('redis.Redis', return_value=fake_server) + mocker.patch.object(fake_server, 'ping', side_effect=ConnectionError) + + # Act + cut.connect() + + # Assert + assert redis_adapter.print_msg.call_count == 3 assert redis_adapter.print_msg.call_args_list[0].args == ("Redis adapter connecting to server...",) - assert redis.Redis.call_count == 1 - assert redis.Redis.call_args_list[0].args == (expected_address, expected_port, expected_db) - assert fake_server.ping.call_count == 1 - assert cut.server == fake_server + assert redis.Redis.call_count == 2 + assert fake_server.ping.call_count == 2 + assert cut.servers == [fake_server, fake_server] + + assert redis_adapter.print_msg.call_args_list[0].args == ('Redis adapter connecting to server...',) + assert redis_adapter.print_msg.call_args_list[1].args == ('Did not connect to server # 0. Not setting up subscriptions.', 'RED') + assert redis_adapter.print_msg.call_args_list[2].args == ('Did not connect to server # 1. Not setting up subscriptions.', 'RED') + # subscribe_message tests def test_redis_adapter_DataSource_subscribe_subscribes_to_each_given_subscription_and_starts_listening_when_server_available(mocker): # Arrange - arg_subscriptions = [MagicMock()] * pytest.gen.randint(1, 10) # 1 to 10 arbitrary - + fake_server = MagicMock() fake_pubsub = MagicMock() - fake_thread = MagicMock() + fake_message_listener = MagicMock() + fake_listen_thread = MagicMock() + + fake_server_configs = [{"subscriptions": ["state_0", "state_1"]}, {"subscriptions": ["state_2", "state_3"]}] cut = DataSource.__new__(DataSource) - cut.server = fake_server + cut.server_configs = fake_server_configs + cut.message_listener = fake_message_listener + cut.servers = [{"subscriptions": ["state_0", "state_1"]}, {"subscriptions": ["state_2", "state_3"]}] + mocker.patch('redis.Redis', return_value=fake_server) mocker.patch.object(fake_server, 'ping', return_value=True) mocker.patch.object(fake_server, 'pubsub', return_value=fake_pubsub) mocker.patch.object(fake_pubsub, 'subscribe') mocker.patch(redis_adapter.__name__ + '.print_msg') - mocker.patch('threading.Thread', return_value=fake_thread) - mocker.patch.object(fake_thread, 'start') + mocker.patch('threading.Thread', return_value=fake_listen_thread) + mocker.patch.object(fake_listen_thread, 'start') # Act - cut.subscribe(arg_subscriptions) + cut.connect() # Assert - assert fake_server.ping.call_count == 1 - assert fake_server.pubsub.call_count == 1 - assert fake_pubsub.subscribe.call_count == len(arg_subscriptions) - for i in range(len(arg_subscriptions)): - assert fake_pubsub.subscribe.call_args_list[i].args == (arg_subscriptions[i],) - assert redis_adapter.print_msg.call_args_list[i].args == (f"Subscribing to channel: {arg_subscriptions[i]}",) - assert threading.Thread.call_count == 1 - assert threading.Thread.call_args_list[0].kwargs == ({'target': cut.message_listener}) - assert fake_thread.start.call_count == 1 - assert cut.pubsub == fake_pubsub + assert fake_server.ping.call_count == 2 + assert fake_server.pubsub.call_count == 2 + + #This is already checked in the first test. Should it be removed? Should the first test not check the subscription messages? + assert redis_adapter.print_msg.call_count == 7 + assert redis_adapter.print_msg.call_args_list[0].args == ('Redis adapter connecting to server...',) + assert redis_adapter.print_msg.call_args_list[1].args == ('... connected to server # 0!',) + assert redis_adapter.print_msg.call_args_list[2].args == ('Subscribing to channel: state_0 on server # 0',) + assert redis_adapter.print_msg.call_args_list[3].args == ('Subscribing to channel: state_1 on server # 0',) + assert redis_adapter.print_msg.call_args_list[4].args == ('... connected to server # 1!',) + assert redis_adapter.print_msg.call_args_list[5].args == ('Subscribing to channel: state_2 on server # 1',) + assert redis_adapter.print_msg.call_args_list[6].args == ('Subscribing to channel: state_3 on server # 1',) + + assert fake_pubsub.subscribe.call_count == 4 + + assert threading.Thread.call_count == 2 + assert threading.Thread.call_args_list[0].kwargs == ({'target': cut.message_listener, 'args': (fake_pubsub,)}) + assert fake_listen_thread.start.call_count == 2 def test_redis_adapter_DataSource_subscribe_states_no_subscriptions_given_when_empty(mocker): # Arrange - arg_subscriptions = [] - fake_server = MagicMock() + fake_servers = MagicMock() initial_pubsub = MagicMock() - fake_subscription = MagicMock() - fake_thread = MagicMock() - cut = DataSource.__new__(DataSource) - cut.server = fake_server - cut.pubsub = initial_pubsub + fake_message_listener = MagicMock() + fake_listen_thread = MagicMock() - mocker.patch.object(fake_server, 'ping', return_value=False) - mocker.patch(redis_adapter.__name__ + '.print_msg') - mocker.patch.object(fake_server, 'pubsub') - mocker.patch('threading.Thread') - mocker.patch.object(fake_thread, 'start') - - # Act - cut.subscribe(arg_subscriptions) + fake_server_configs = [{'subscriptions': {}}] - # Assert - assert fake_server.ping.call_count == 0 - assert fake_server.pubsub.call_count == 0 - assert threading.Thread.call_count == 0 - assert fake_thread.start.call_count == 0 - assert cut.pubsub == initial_pubsub - assert redis_adapter.print_msg.call_args_list[0].args == ("No subscriptions given!",) - -# Note the self.server.ping during runtime will error, not actually return False, but that means code will never run -# this unit test is for completeness of coverage -def test_redis_adapter_DataSource_subscribe_states_no_subscriptions_given_when_server_does_not_respond_to_ping(mocker): - # Arrange - arg_channel = [MagicMock()] - fake_server = MagicMock() - initial_pubsub = MagicMock() - fake_subscription = MagicMock() - fake_thread = MagicMock() cut = DataSource.__new__(DataSource) - cut.server = fake_server - cut.pubsub = initial_pubsub + cut.server_configs = fake_server_configs + cut.message_listener = fake_message_listener + cut.servers = [] + - mocker.patch.object(fake_server, 'ping', return_value=False) + mocker.patch('redis.Redis', return_value=fake_servers) + mocker.patch.object(fake_servers, 'ping', return_value=True) + mocker.patch.object(fake_servers, 'pubsub', return_value=initial_pubsub) + mocker.patch.object(initial_pubsub, 'subscribe') mocker.patch(redis_adapter.__name__ + '.print_msg') - mocker.patch.object(fake_server, 'pubsub') - mocker.patch('threading.Thread') - mocker.patch.object(fake_thread, 'start') + mocker.patch('threading.Thread', return_value=fake_listen_thread) + mocker.patch.object(fake_listen_thread, 'start') # Act - cut.subscribe(arg_channel) + cut.connect() # Assert - assert fake_server.ping.call_count == 1 - assert fake_server.pubsub.call_count == 0 + assert fake_servers.ping.call_count == 0 + assert fake_servers.pubsub.call_count == 0 + assert fake_servers.pubsub.subscribe.call_count == 0 assert threading.Thread.call_count == 0 - assert fake_thread.start.call_count == 0 - assert cut.pubsub == initial_pubsub - assert redis_adapter.print_msg.call_args_list[0].args == ("No subscriptions given!",) - -# get_next tests -def test_redis_adapter_DataSource_get_next_returns_expected_data_when_new_data_is_true_and_double_buffer_read_index_is_0(): - # Arrange - # Renew DataSource to ensure test independence - cut = DataSource.__new__(DataSource) - cut.new_data = True - cut.new_data_lock = MagicMock() - cut.double_buffer_read_index = 0 - pre_call_index = cut.double_buffer_read_index - expected_result = MagicMock() - cut.currentData = [] - cut.currentData.append({'data': MagicMock()}) - cut.currentData.append({'data': expected_result}) - - # Act - result = cut.get_next() - - # Assert - assert cut.new_data == False - assert cut.double_buffer_read_index == 1 - assert result == expected_result + assert fake_listen_thread.start.call_count == 0 + assert redis_adapter.print_msg.call_args_list[1].args == ("No subscriptions given! Redis server not created",) def test_redis_adapter_DataSource_get_next_returns_expected_data_when_new_data_is_true_and_double_buffer_read_index_is_1(): # Arrange @@ -335,15 +343,16 @@ def test_redis_adapter_DataSource_message_listener_warns_of_exit_and_does_not_ru # Arrange cut = DataSource.__new__(DataSource) - cut.pubsub = MagicMock(name="cut.pubsub") + fake_server = MagicMock() + fake_server.fake_pubsub = MagicMock() fake_listener = MagicMock(name='fake_listener') fake_listener.__next__.side_effect = StopIteration - mocker.patch.object(cut.pubsub, 'listen', side_effect=[fake_listener]) + mocker.patch.object(fake_server.fake_pubsub, 'listen', side_effect=[fake_listener]) mocker.patch(redis_adapter.__name__ + '.json.loads') mocker.patch(redis_adapter.__name__ + '.print_msg') # Act - cut.message_listener() + cut.message_listener(fake_server.fake_pubsub) # Assert assert redis_adapter.json.loads.call_count == 0 @@ -354,17 +363,19 @@ def test_redis_adapter_DataSource_message_listener_prints_warning_when_receiving # Arrange cut = DataSource.__new__(DataSource) - cut.pubsub = MagicMock() + #cut.pubsub = MagicMock() + fake_server = MagicMock() + fake_server.fake_pubsub = MagicMock() ignored_message_types = ['subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe', 'pmessage'] fake_message = {} fake_message['type'] = pytest.gen.choice(ignored_message_types) fake_message['channel'] = str(MagicMock(name='fake_message')).encode('utf-8') - mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch.object(fake_server.fake_pubsub, 'listen', return_value=[fake_message]) mocker.patch(redis_adapter.__name__ + '.json.loads') mocker.patch(redis_adapter.__name__ + '.print_msg') # Act - cut.message_listener() + cut.message_listener(fake_server.fake_pubsub) # Assert assert redis_adapter.json.loads.call_count == 0 @@ -378,19 +389,19 @@ def test_redis_adapter_DataSource_message_listener_prints_warning_when_receiving def test_redis_adapter_DataSource_message_listener_prints_warning_when_data_not_json_format_and_does_not_update_frame(mocker): # Arrange cut = DataSource.__new__(DataSource) - - cut.pubsub = MagicMock() + fake_server = MagicMock() + fake_server.fake_pubsub = MagicMock() fake_message = {} fake_message['type'] = 'message' fake_message['channel'] = str( MagicMock(name='fake_message_channel')).encode('utf-8') fake_message['data'] = str(MagicMock(name='fake_message_data')) - mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch.object(fake_server.fake_pubsub, 'listen', return_value=[fake_message]) mocker.patch(redis_adapter.__name__ + '.json.loads', side_effect=ValueError) mocker.patch(redis_adapter.__name__ + '.print_msg') # Act - cut.message_listener() + cut.message_listener(fake_server.fake_pubsub) # Assert assert redis_adapter.json.loads.call_count == 1 @@ -410,7 +421,8 @@ def test_redis_adapter_DataSource_message_listener_warns_user_when_processed_dat cut.double_buffer_read_index = pytest.gen.choice([0 , 1]) cut.currentData = {0: {'headers': [], 'data': []}, 1: {'headers': [], 'data': []}} - cut.pubsub = MagicMock() + fake_server = MagicMock() + fake_server.fake_pubsub = MagicMock() cut.new_data_lock = MagicMock() cut.new_data = False @@ -419,12 +431,12 @@ def test_redis_adapter_DataSource_message_listener_warns_user_when_processed_dat fake_message['channel'] = str( MagicMock(name='fake_message_channel')).encode('utf-8') fake_message['data'] = '{}' # empty_message - mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch.object(fake_server.fake_pubsub, 'listen', return_value=[fake_message]) mocker.patch(redis_adapter.__name__ + '.json.loads', return_value={}) mocker.patch(redis_adapter.__name__ + '.print_msg') # Act - cut.message_listener() + cut.message_listener(fake_server.fake_pubsub) # Assert assert redis_adapter.json.loads.call_count == 1 @@ -446,7 +458,8 @@ def test_redis_adapter_DataSource_message_listener_warns_of_received_key_that_do 'data': ['-']}, 1: {'headers': ['time'], 'data': ['-']}} - cut.pubsub = MagicMock() + fake_server = MagicMock() + fake_server.fake_pubsub = MagicMock() cut.new_data_lock = MagicMock() cut.new_data = False @@ -455,12 +468,12 @@ def test_redis_adapter_DataSource_message_listener_warns_of_received_key_that_do fake_message['channel'] = str( MagicMock(name='fake_message_channel')).encode('utf-8') fake_message['data'] = '{"time":0, "unknown_key":0}' - mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch.object(fake_server.fake_pubsub, 'listen', return_value=[fake_message]) mocker.patch(redis_adapter.__name__ + '.json.loads', return_value={"time":0, "unknown_key":0}) mocker.patch(redis_adapter.__name__ + '.print_msg') # Act - cut.message_listener() + cut.message_listener(fake_server.fake_pubsub) # Assert assert redis_adapter.json.loads.call_count == 1 @@ -476,8 +489,9 @@ def test_redis_adapter_DataSource_message_listener_warns_of_received_key_that_do def test_redis_adapter_DataSource_message_listener_warns_of_expected_keys_that_do_not_appear_in_message(mocker): # Arrange cut = DataSource.__new__(DataSource) - cut.double_buffer_read_index = pytest.gen.choice([0 , 1]) - cut.pubsub = MagicMock() + cut.double_buffer_read_index = pytest.gen.choice([0 , 1]) + fake_server = MagicMock() + fake_server.fake_pubsub = MagicMock() cut.new_data_lock = MagicMock() cut.new_data = False @@ -494,12 +508,12 @@ def test_redis_adapter_DataSource_message_listener_warns_of_expected_keys_that_d '.missing_key'], 'data': ['-', '-']}} fake_message['data'] = '{}' - mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch.object(fake_server.fake_pubsub, 'listen', return_value=[fake_message]) mocker.patch(redis_adapter.__name__ + '.json.loads', return_value={}) mocker.patch(redis_adapter.__name__ + '.print_msg') # Act - cut.message_listener() + cut.message_listener(fake_server.fake_pubsub) # Assert assert redis_adapter.json.loads.call_count == 1 @@ -522,7 +536,8 @@ def test_redis_adapter_DataSource_message_listener_updates_new_data_with_receive # Arrange cut = DataSource.__new__(DataSource) cut.double_buffer_read_index = pytest.gen.choice([0 , 1]) - cut.pubsub = MagicMock() + fake_server = MagicMock() + fake_server.fake_pubsub = MagicMock() cut.new_data_lock = MagicMock() cut.new_data = False @@ -539,7 +554,7 @@ def test_redis_adapter_DataSource_message_listener_updates_new_data_with_receive '.correct_key', 'fakeotherchannel.x'], 'data': ['-', '-', '0']}} fake_message['data'] = '{}' - mocker.patch.object(cut.pubsub, 'listen', return_value=[fake_message]) + mocker.patch.object(fake_server.fake_pubsub, 'listen', return_value=[fake_message]) fake_data = { 'time': pytest.gen.randint(1, 100), # from 1 to 100 arbitrary 'correct_key': pytest.gen.randint(1, 100), # from 1 to 100 arbitrary @@ -549,7 +564,7 @@ def test_redis_adapter_DataSource_message_listener_updates_new_data_with_receive mocker.patch(redis_adapter.__name__ + '.print_msg') # Act - cut.message_listener() + cut.message_listener(fake_server.fake_pubsub) # Assert assert redis_adapter.json.loads.call_count == 1 @@ -602,17 +617,15 @@ def test_redis_adapter_DataSource_parse_meta_data_file_raises_ConfigKeyError_whe assert redis_adapter.parseJson.call_args_list[0].args == (arg_configFile, ) assert e_info.match(exception_message) -def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_meta_data_handle_ss_breakdown_and_sets_subscriptions_when_redis_subscriptions_occupied(mocker): +def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_meta_data_handle_ss_breakdown(mocker): # Arrange cut = DataSource.__new__(DataSource) arg_configFile = MagicMock() arg_ss_breakdown = MagicMock() expected_extracted_configs = MagicMock() - expected_subscriptions = [MagicMock()] * pytest.gen.randint(0, 10) # 0 to 10 arbitrary fake_meta = {'fake_other_stuff': MagicMock(), - 'order': MagicMock(), - 'redis_subscriptions':expected_subscriptions} + 'order': MagicMock()} mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=expected_extracted_configs) mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta) @@ -625,32 +638,8 @@ def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_m assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_args_list[0].args == (arg_configFile, arg_ss_breakdown) assert redis_adapter.parseJson.call_count == 1 assert redis_adapter.parseJson.call_args_list[0].args == (arg_configFile, ) - assert cut.subscriptions == expected_subscriptions assert result == expected_extracted_configs -def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_meta_data_handle_ss_breakdown_and_sets_subscriptions_to_empty_when_none_given(mocker): - # Arrange - cut = DataSource.__new__(DataSource) - arg_configFile = MagicMock() - arg_ss_breakdown = MagicMock() - - fake_configs = {'fake_other_stuff': MagicMock()} - fake_meta = {'order': MagicMock()} - - mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=fake_configs) - mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta) - - # Act - result = cut.parse_meta_data_file(arg_configFile, arg_ss_breakdown, ) - - # Assert - assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_count == 1 - assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_args_list[0].args == (arg_configFile, arg_ss_breakdown) - assert redis_adapter.parseJson.call_count == 1 - assert redis_adapter.parseJson.call_args_list[0].args == (arg_configFile, ) - assert cut.subscriptions == [] - assert result == fake_configs - # redis_adapter get_vehicle_metadata tests def test_redis_adapter_DataSource_get_vehicle_metadata_returns_list_of_headers_and_list_of_test_assignments(): # Arrange @@ -684,3 +673,62 @@ def test_redis_adapter_DataSource_process_data_file_does_nothing(): # Assert assert result == expected_result + + +def test_redis_adapter_DataSource_parse_meta_data_file_redis_in_keys_subscriptions_exist_and_adds_redis_to_server_configs(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + arg_configFile = MagicMock() + arg_ss_breakdown = MagicMock() + + fake_server_configs = [{"subscriptions": ["state_0", "state_1"]}, {"subscriptions": ["state_2", "state_3"]}] + cut.server_configs = MagicMock() + + expected_extracted_configs = MagicMock() + fake_meta = {'fake_other_stuff': MagicMock(), + 'redis': fake_server_configs, + 'order': MagicMock()} + + mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=expected_extracted_configs) + mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta) + + # Act + result = cut.parse_meta_data_file(arg_configFile, arg_ss_breakdown, ) + + # Assert + assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_count == 1 + assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_args_list[0].args == (arg_configFile, arg_ss_breakdown) + assert redis_adapter.parseJson.call_count == 1 + assert redis_adapter.parseJson.call_args_list[0].args == (arg_configFile, ) + assert result == expected_extracted_configs + assert cut.server_configs == fake_meta['redis'] + +def test_redis_adapter_DataSource_parse_meta_data_file_redis_in_keys_subscriptions_do_not_exist(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + arg_configFile = MagicMock() + arg_ss_breakdown = MagicMock() + + fake_server_configs = [{"address": 1}] + cut.server_configs = MagicMock() + + expected_extracted_configs = MagicMock() + fake_meta = {'fake_other_stuff': MagicMock(), + 'redis': fake_server_configs, + 'order': MagicMock()} + + mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=expected_extracted_configs) + mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta) + + # Act + with pytest.raises(ConfigKeyError) as e_info: + cut.parse_meta_data_file(arg_configFile, arg_ss_breakdown, ) + + + # Assert + assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_count == 1 + assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_args_list[0].args == (arg_configFile, arg_ss_breakdown) + assert redis_adapter.parseJson.call_count == 1 + assert redis_adapter.parseJson.call_args_list[0].args == (arg_configFile, ) + assert cut.server_configs == [] + assert e_info.match(f'Config file: \'{arg_configFile}\' missing required key \'subscriptions\' from 0 in key \'redis\'') \ No newline at end of file