diff --git a/interoperability_report.py b/interoperability_report.py index 7893340..0f6eabd 100644 --- a/interoperability_report.py +++ b/interoperability_report.py @@ -33,6 +33,7 @@ def run_subscriber_shape_main( produced_code_index: int, subscriber_index: int, samples_sent: "list[multiprocessing.Queue]", + last_sample_saved: "list[multiprocessing.Queue]", verbosity: bool, timeout: int, file: tempfile.TemporaryFile, @@ -56,6 +57,9 @@ def run_subscriber_shape_main( samples_sent <>: list of multiprocessing Queues with the samples the Publishers send. Element 1 of the list is for Publisher 1, etc. + last_sample_saved <>: list of multiprocessing Queues with the last + sample saved on samples_sent for each Publisher. Element 1 of + the list is for Publisher 1, etc. verbosity <>: print debug information. timeout <>: time pexpect waits until it matches a pattern. file <>: temporal file to save shape_main application output. @@ -157,7 +161,7 @@ def run_subscriber_shape_main( # to the Subscriber. By default it does not check # anything and returns ReturnCode.OK. produced_code[produced_code_index] = check_function( - child_sub, samples_sent, timeout) + child_sub, samples_sent, last_sample_saved, timeout) subscriber_finished.set() # set subscriber as finished log_message(f'Subscriber {subscriber_index}: Waiting for Publishers to ' @@ -176,6 +180,7 @@ def run_publisher_shape_main( produced_code_index: int, publisher_index: int, samples_sent: multiprocessing.Queue, + last_sample_saved: multiprocessing.Queue, verbosity: bool, timeout: int, file: tempfile.TemporaryFile, @@ -197,6 +202,8 @@ def run_publisher_shape_main( publisher it is 1, for the second 2, etc. samples_sent <>: this variable contains the samples the Publisher sends. + last_sample_saved <>: this variable contains the last sample + saved on samples_sent. verbosity <>: print debug information. timeout <>: time pexpect waits until it matches a pattern. file <>: temporal file to save shape_main application output. @@ -296,12 +303,14 @@ def run_publisher_shape_main( produced_code[produced_code_index] = ReturnCode.OK log_message(f'Publisher {publisher_index}: Sending ' 'samples', verbosity) + last_sample = '' for x in range(0, MAX_SAMPLES_SAVED, 1): # At this point, at least one sample has been printed # Therefore, that sample is added to samples_sent. pub_string = re.search('[0-9]+ [0-9]+ \[[0-9]+\]', child_pub.before + child_pub.after) - samples_sent.put(pub_string.group(0)) + last_sample = pub_string.group(0) + samples_sent.put(last_sample) index = child_pub.expect([ '\[[0-9]+\]', # index = 0 'on_offered_deadline_missed()', # index = 1 @@ -314,6 +323,7 @@ def run_publisher_shape_main( elif index == 2: produced_code[produced_code_index] = ReturnCode.DATA_NOT_SENT break + last_sample_saved.put(last_sample) else: produced_code[produced_code_index] = ReturnCode.OK @@ -395,6 +405,7 @@ def run_test( return_codes = manager.list(range(num_entities)) samples_sent = [] # used for storing the samples the Publishers send. # It is a list with one Queue for each Publisher. + last_sample_saved = [] # used for storing the last value sent by each Publisher. # list of multiprocessing Events used as semaphores to control the end of # the processes, one for each entity. @@ -419,6 +430,7 @@ def run_test( if ('-P ' in element or element.endswith('-P')): publishers_finished.append(multiprocessing.Event()) samples_sent.append(multiprocessing.Queue()) + last_sample_saved.append(multiprocessing.Queue()) elif ('-S ' in element or element.endswith('-S')): subscribers_finished.append(multiprocessing.Event()) else: @@ -438,6 +450,7 @@ def run_test( 'produced_code_index':i, 'publisher_index':publisher_number+1, 'samples_sent':samples_sent[publisher_number], + 'last_sample_saved':last_sample_saved[publisher_number], 'verbosity':verbosity, 'timeout':timeout, 'file':temporary_file[i], @@ -461,6 +474,7 @@ def run_test( 'produced_code_index':i, 'subscriber_index':subscriber_number+1, 'samples_sent':samples_sent, + 'last_sample_saved':last_sample_saved, 'verbosity':verbosity, 'timeout':timeout, 'file':temporary_file[i], diff --git a/rtps_test_utilities.py b/rtps_test_utilities.py index 26aff99..39d7073 100644 --- a/rtps_test_utilities.py +++ b/rtps_test_utilities.py @@ -50,5 +50,5 @@ def remove_ansi_colors(text): cleaned_str = ansi_escape.sub('', text) return cleaned_str -def no_check(child_sub, samples_sent, timeout): +def no_check(child_sub, samples_sent, last_sample_saved, timeout): return ReturnCode.OK diff --git a/test_suite.py b/test_suite.py index b04a13c..9dca06c 100644 --- a/test_suite.py +++ b/test_suite.py @@ -48,7 +48,7 @@ # is received in order, or that OWNERSHIP works properly, etc... MAX_SAMPLES_READ = 500 -def test_ownership_receivers(child_sub, samples_sent, timeout): +def test_ownership_receivers(child_sub, samples_sent, last_sample_saved, timeout): """ This function is used by test cases that have two publishers and one subscriber. @@ -63,6 +63,9 @@ def test_ownership_receivers(child_sub, samples_sent, timeout): samples_sent: list of multiprocessing Queues with the samples the publishers send. Element 1 of the list is for publisher 1, etc. + last_sample_saved: list of multiprocessing Queues with the last + sample saved on samples_sent for each Publisher. Element 1 of + the list is for Publisher 1, etc. timeout: time pexpect waits until it matches a pattern. This functions assumes that the subscriber has already received samples @@ -76,6 +79,9 @@ def test_ownership_receivers(child_sub, samples_sent, timeout): list_data_received_first = [] max_samples_received = MAX_SAMPLES_READ samples_read = 0 + list_samples_processed = [] + last_first_sample = ''; + last_second_sample = ''; while(samples_read < max_samples_received): # take the topic, color, position and size of the ShapeType. @@ -110,16 +116,39 @@ def test_ownership_receivers(child_sub, samples_sent, timeout): except queue.Empty: pass + # Take the last sample published by each publisher from their queues + # ('last_sample_saved[i]') and save them local variables. + try: + last_first_sample = last_sample_saved[0].get(block=False) + except queue.Empty: + pass + + try: + last_second_sample = last_sample_saved[1].get(block=False) + except queue.Empty: + pass + # Determine to which publisher the current sample belong to if sub_string.group(0) in list_data_received_second: current_sample_from_publisher = 2 elif sub_string.group(0) in list_data_received_first: current_sample_from_publisher = 1 else: - # If the sample is not in any queue, wait a bit and continue + # If the sample is not in any queue, break the loop if the + # the last sample for any publisher has already been processed. + if last_first_sample in list_samples_processed: + break + if last_second_sample in list_samples_processed: + break + print(f'Last samples: {last_first_sample}, {last_second_sample}') + # Otherwise, wait a bit and continue time.sleep(0.1) continue + # Keep all samples processed in a single list, so we can check whether + # the last sample published by any publisher has already been processed + list_samples_processed.append(sub_string.group(0)) + # If the app hit this point, it is because the previous subscriber # sample has been already read. Then, we can process the next sample # read by the subscriber. @@ -172,7 +201,7 @@ def test_ownership_receivers(child_sub, samples_sent, timeout): print(f'Samples read: {samples_read}') return ReturnCode.RECEIVING_FROM_ONE -def test_color_receivers(child_sub, samples_sent, timeout): +def test_color_receivers(child_sub, samples_sent, last_sample_saved, timeout): """ This function is used by test cases that have two publishers and one @@ -182,6 +211,7 @@ def test_color_receivers(child_sub, samples_sent, timeout): child_sub: child program generated with pexpect samples_sent: not used + last_sample_saved: not used timeout: time pexpect waits until it matches a pattern. """ sub_string = re.search('\w\s+(\w+)\s+[0-9]+ [0-9]+ \[[0-9]+\]', @@ -217,13 +247,14 @@ def test_color_receivers(child_sub, samples_sent, timeout): print(f'Samples read: {samples_read}') return ReturnCode.RECEIVING_FROM_ONE -def test_reliability_order(child_sub, samples_sent, timeout): +def test_reliability_order(child_sub, samples_sent, last_sample_saved, timeout): """ This function tests reliability, it checks whether the subscriber receives the samples in order. child_sub: child program generated with pexpect samples_sent: not used + last_sample_saved: not used timeout: not used """ @@ -267,7 +298,7 @@ def test_reliability_order(child_sub, samples_sent, timeout): return produced_code -def test_reliability_no_losses(child_sub, samples_sent, timeout): +def test_reliability_no_losses(child_sub, samples_sent, last_sample_saved, timeout): """ This function tests RELIABLE reliability, it checks whether the subscriber receives the samples in order and with no losses. @@ -276,6 +307,7 @@ def test_reliability_no_losses(child_sub, samples_sent, timeout): samples_sent: list of multiprocessing Queues with the samples the publishers send. Element 1 of the list is for publisher 1, etc. + last_sample_saved: not used timeout: time pexpect waits until it matches a pattern. """ @@ -352,7 +384,7 @@ def test_reliability_no_losses(child_sub, samples_sent, timeout): return produced_code -def test_durability_volatile(child_sub, samples_sent, timeout): +def test_durability_volatile(child_sub, samples_sent, last_sample_saved, timeout): """ This function tests the volatile durability, it checks that the sample the subscriber receives is not the first one. The publisher application sends @@ -365,6 +397,7 @@ def test_durability_volatile(child_sub, samples_sent, timeout): child_sub: child program generated with pexpect samples_sent: not used + last_sample_saved: not used timeout: not used """ @@ -387,7 +420,7 @@ def test_durability_volatile(child_sub, samples_sent, timeout): return produced_code -def test_durability_transient_local(child_sub, samples_sent, timeout): +def test_durability_transient_local(child_sub, samples_sent, last_sample_saved, timeout): """ This function tests the TRANSIENT_LOCAL durability, it checks that the sample the subscriber receives is the first one. The publisher application @@ -396,6 +429,7 @@ def test_durability_transient_local(child_sub, samples_sent, timeout): child_sub: child program generated with pexpect samples_sent: not used + last_sample_saved: not used timeout: not used """ @@ -416,7 +450,7 @@ def test_durability_transient_local(child_sub, samples_sent, timeout): return produced_code -def test_deadline_missed(child_sub, samples_sent, timeout): +def test_deadline_missed(child_sub, samples_sent, last_sample_saved, timeout): """ This function tests whether the subscriber application misses the requested deadline or not. This is needed in case the subscriber application receives @@ -424,6 +458,7 @@ def test_deadline_missed(child_sub, samples_sent, timeout): child_sub: child program generated with pexpect samples_sent: not used + last_sample_saved: not used timeout: time pexpect waits until it matches a pattern """