From 8c7ffe4baca3fe3ee0318d41acd10aa8a0d28077 Mon Sep 17 00:00:00 2001 From: Bill Graham Date: Wed, 9 Nov 2016 13:39:32 -0800 Subject: [PATCH 1/3] Change local integration tests to use test status objects instead of True/False --- heron/tools/cli/src/python/cli_helper.py | 2 +- .../src/python/local_test_runner/main.py | 17 ++- .../src/python/local_test_runner/status.py | 14 +++ .../python/local_test_runner/test_scale_up.py | 10 +- .../python/local_test_runner/test_template.py | 105 +++++++++--------- .../src/python/test_runner/main.py | 2 +- 6 files changed, 85 insertions(+), 65 deletions(-) create mode 100644 integration-test/src/python/local_test_runner/status.py diff --git a/heron/tools/cli/src/python/cli_helper.py b/heron/tools/cli/src/python/cli_helper.py index c74765ddea0..204f34a260c 100644 --- a/heron/tools/cli/src/python/cli_helper.py +++ b/heron/tools/cli/src/python/cli_helper.py @@ -89,5 +89,5 @@ def run(command, parser, cl_args, unknown_args, action): Log.error('Failed to %s \'%s\'' % (action, topology_name)) return False - Log.info('Successfully %s \'%s\'' % (action, topology_name)) + Log.info('Successfully executed %s \'%s\'' % (action, topology_name)) return True diff --git a/integration-test/src/python/local_test_runner/main.py b/integration-test/src/python/local_test_runner/main.py index 1f9e4686ae3..40b0fc3942d 100644 --- a/integration-test/src/python/local_test_runner/main.py +++ b/integration-test/src/python/local_test_runner/main.py @@ -25,6 +25,8 @@ import sys from collections import namedtuple +import status + # import test_kill_bolt import test_kill_metricsmgr import test_kill_stmgr @@ -60,10 +62,19 @@ def run_tests(test_classes, args): logging.info("==== Starting test %s of %s: %s ====", len(successes) + len(failures) + 1, len(test_classes), testname) template = test_class(testname, args) - if template.run_test(): # testcase passed - successes += [testname] - else: + try: + result = template.run_test() + if isinstance(result, status.TestSuccess): # testcase passed + successes += [testname] + elif isinstance(result, status.TestFailure): + failures += [testname] + else: + logging.error( + "Unrecognized test response returned for test %s: %s", testname, str(result)) + failures += [testname] + except status.TestFailure: failures += [testname] + except Exception as e: logging.error("Exception thrown while running tests: %s", str(e)) finally: diff --git a/integration-test/src/python/local_test_runner/status.py b/integration-test/src/python/local_test_runner/status.py new file mode 100644 index 00000000000..cc7f4ef3577 --- /dev/null +++ b/integration-test/src/python/local_test_runner/status.py @@ -0,0 +1,14 @@ +import logging + +class TestFailure(Exception): + def __init__(self, message, error=None): + Exception.__init__(self, message, error) + if error: + logging.error("%s :: %s", message, str(error)) + else: + logging.error(message) + +class TestSuccess(object): + def __init__(self, message=None): + if message: + logging.info(message) diff --git a/integration-test/src/python/local_test_runner/test_scale_up.py b/integration-test/src/python/local_test_runner/test_scale_up.py index 075e9bf06ee..9ff985b7c28 100644 --- a/integration-test/src/python/local_test_runner/test_scale_up.py +++ b/integration-test/src/python/local_test_runner/test_scale_up.py @@ -16,6 +16,7 @@ import logging import subprocess +import status import test_template class TestScaleUp(test_template.TestTemplate): @@ -39,11 +40,8 @@ def pre_check_results(self, physical_plan_json): instances = physical_plan_json['instances'] instance_count = len(instances) if instance_count != self.expected_instance_count: - logging.error("Found %s instances but expected %s: %s", - instance_count, self.expected_instance_count, instances) - return False - - return True + raise status.TestFailure("Found %s instances but expected %s: %s" % + (instance_count, self.expected_instance_count, instances)) def scale_up(heron_cli_path, test_cluster, topology_name): splitcmd = [ @@ -52,5 +50,5 @@ def scale_up(heron_cli_path, test_cluster, topology_name): ] logging.info("Increasing number of component instances: %s", splitcmd) if subprocess.call(splitcmd) != 0: - raise RuntimeError("Unable to update topology") + raise status.TestFailure("Unable to update topology %s" % topology_name) logging.info("Increased number of component instances") diff --git a/integration-test/src/python/local_test_runner/test_template.py b/integration-test/src/python/local_test_runner/test_template.py index 0e2e9da0ae0..d54cfc47991 100644 --- a/integration-test/src/python/local_test_runner/test_template.py +++ b/integration-test/src/python/local_test_runner/test_template.py @@ -21,6 +21,8 @@ import subprocess from collections import namedtuple +import status + # Test input. Please set each variable as it's own line, ended with \n, otherwise the value of lines # passed into the topology will be incorrect, and the test will fail. TEST_INPUT = ["1\n", "2\n", "3\n", "4\n", "5\n", "6\n", "7\n", "8\n", @@ -52,17 +54,16 @@ def __init__(self, testname, params): # pylint: disable=too-many-return-statements, too-many-branches, # pylint: disable=too-many-statements def run_test(self): - """ Runs the test template """ - + """ Runs the test template. Must either return TestSuccess or raise TestFailure""" + topology_submitted = False try: # prepare test data, start the topology and block until it's running self._prepare_test_data() self.submit_topology() + topology_submitted = True _block_until_stmgr_running(self.get_expected_container_count()) - if not self._block_until_topology_running(self.get_expected_min_instance_count()): - self.cleanup_test() - return False + self._block_until_topology_running(self.get_expected_min_instance_count()) # Execute the specific test logic and block until topology is running again self.execute_test_case() @@ -70,38 +71,37 @@ def run_test(self): _block_until_stmgr_running(self.get_expected_container_count()) physical_plan_json =\ self._block_until_topology_running(self.get_expected_min_instance_count()) - if not physical_plan_json: - self.cleanup_test() - return False # trigger the test data to flow and invoke the pre_check_results hook self._inject_test_data() - if not self.pre_check_results(physical_plan_json): - self.cleanup_test() - return False - except Exception as e: - logging.error("Test failed, attempting to clean up: %s", e) + self.pre_check_results(physical_plan_json) + + # finally verify the expected results + result = self._check_results() self.cleanup_test() - return False + return result - # finally verify the expected results - return self._check_results() + except status.TestFailure as e: + if topology_submitted: + logging.error("Test failed, attempting to clean up") + self.cleanup_test() + raise e + except Exception as e: + if topology_submitted: + logging.error("Test failed, attempting to clean up") + self.cleanup_test() + return status.TestFailure("Exception thrown during test", e) def submit_topology(self): - #submit topology - try: - _submit_topology( - self.params['cliPath'], - self.params['cluster'], - self.params['testJarPath'], - self.params['topologyClassPath'], - self.params['topologyName'], - self.params['readFile'], - self.params['outputFile'] - ) - except Exception as e: - logging.error("Failed to submit %s topology: %s", self.params['topologyName'], str(e)) - return False + _submit_topology( + self.params['cliPath'], + self.params['cluster'], + self.params['testJarPath'], + self.params['topologyClassPath'], + self.params['topologyName'], + self.params['readFile'], + self.params['outputFile'] + ) # pylint: disable=no-self-use def get_expected_container_count(self): @@ -123,7 +123,6 @@ def cleanup_test(self): _kill_topology(self.params['cliPath'], self.params['cluster'], self.params['topologyName']) except Exception as e: logging.error("Failed to kill %s topology: %s", self.params['topologyName'], str(e)) - return False finally: self._delete_test_data_files() @@ -165,11 +164,11 @@ def _check_results(self): with open(self.params['outputFile'], 'r') as g: actual_result = g.read() except Exception as e: - logging.error( - "Failed to read expected or actual results from file for test %s: %s", self.testname, e) + message =\ + "Failed to read expected or actual results from file for test %s: %s" % self.testname if retries_left == 0: - self.cleanup_test() - return False + raise status.TestFailure(message, e) + logging.error(message, e) # if we get expected result, no need to retry expected_sorted = sorted(expected_result.split('\n')) actual_sorted = sorted(actual_result.split('\n')) @@ -185,19 +184,19 @@ def _check_results(self): self.testname, RETRY_COUNT - retries_left, RETRY_COUNT, RETRY_INTERVAL) time.sleep(RETRY_INTERVAL) - self.cleanup_test() - # Compare the actual and expected result if actual_sorted == expected_sorted: - logging.info("Actual result matched expected result for test %s", self.testname) + result = status.TestSuccess( + "Actual result matched expected result for test %s" % self.testname) logging.info("Actual result ---------- \n%s", actual_sorted) logging.info("Expected result ---------- \n%s", expected_sorted) - return True + return result else: - logging.error("Actual result did not match expected result for test %s", self.testname) + result = status.TestFailure( + "Actual result did not match expected result for test %s" % self.testname) logging.info("Actual result ---------- \n%s", actual_sorted) logging.info("Expected result ---------- \n%s", expected_sorted) - return False + raise result # pylint: disable=no-self-use def get_pid(self, process_name, heron_working_directory): @@ -254,8 +253,8 @@ def _get_tracker_pplan(self): physical_plan_json = json.loads(response.read()) if 'result' not in physical_plan_json: - logging.error("Could not find result json in physical plan request to tracker: %s", url) - return None + raise status.TestFailure( + "Could not find result json in physical plan request to tracker: %s" % url) return physical_plan_json['result'] @@ -272,20 +271,16 @@ def _block_until_topology_running(self, min_instances): self.testname, RETRY_COUNT - retries_left) return packing_plan elif retries_left == 0: - logging.error( + raise status.TestFailure( "Got pplan from tracker for test %s but the number of instances found (%d) was " +\ - "less than min expected (%s).", self.testname, instances_found, min_instances) - self.cleanup_test() - return None + "less than min expected (%s)." % (self.testname, instances_found, min_instances)) if retries_left > 0: _sleep("before trying again to fetch pplan for test %s (attempt %s/%s)" % (self.testname, RETRY_COUNT - retries_left, RETRY_COUNT), RETRY_INTERVAL) else: - logging.error("Failed to get pplan from tracker for test %s after %s attempts.", - self.testname, RETRY_COUNT) - self.cleanup_test() - return None + raise status.TestFailure("Failed to get pplan from tracker for test %s after %s attempts." + % (self.testname, RETRY_COUNT)) def _block_until_stmgr_running(expected_stmgrs): # block until ./heron-stmgr exists @@ -304,16 +299,18 @@ def _submit_topology(heron_cli_path, test_cluster, test_jar_path, topology_class logging.info("Submitting topology: %s", splitcmd) p = subprocess.Popen(splitcmd) p.wait() + if p.returncode != 0: + raise status.TestFailure("Failed to submit topology %s" % topology_name) + logging.info("Submitted topology %s", topology_name) def _kill_topology(heron_cli_path, test_cluster, topology_name): """ Kill a topology using heron-cli """ - splitcmd = [heron_cli_path, 'kill', '--verbose', test_cluster, topology_name] - logging.info("Killing topology: %s", splitcmd) + splitcmd = [heron_cli_path, 'kill', test_cluster, topology_name] + logging.info("Killing topology: %s", ' '.join(splitcmd)) # this call can be blocking, no need for subprocess if subprocess.call(splitcmd) != 0: raise RuntimeError("Unable to kill the topology: %s" % topology_name) - logging.info("Successfully killed topology %s", topology_name) def _get_processes(): """ diff --git a/integration-test/src/python/test_runner/main.py b/integration-test/src/python/test_runner/main.py index 86082d4de7f..fe8bebecfed 100644 --- a/integration-test/src/python/test_runner/main.py +++ b/integration-test/src/python/test_runner/main.py @@ -264,7 +264,7 @@ def kill_topology(heron_cli_path, cli_config_path, cluster, role, env, topology_ cmd = "%s kill --config-path=%s %s %s" %\ (heron_cli_path, cli_config_path, cluster_token(cluster, role, env), topology_name) - logging.info("Kill topology: %s", cmd) + logging.info("Killing topology: %s", cmd) if os.system(cmd) == 0: logging.info("Successfully killed topology %s", topology_name) return From ee5d78fc2a30addd7dddca5f031f8d0fff594aa2 Mon Sep 17 00:00:00 2001 From: Bill Graham Date: Wed, 9 Nov 2016 16:39:52 -0800 Subject: [PATCH 2/3] fix checkstyles --- .../src/python/local_test_runner/main.py | 2 +- .../src/python/local_test_runner/status.py | 17 +++++++++++++++++ .../python/local_test_runner/test_template.py | 8 ++++---- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/integration-test/src/python/local_test_runner/main.py b/integration-test/src/python/local_test_runner/main.py index 40b0fc3942d..6a4fc4f019f 100644 --- a/integration-test/src/python/local_test_runner/main.py +++ b/integration-test/src/python/local_test_runner/main.py @@ -70,7 +70,7 @@ def run_tests(test_classes, args): failures += [testname] else: logging.error( - "Unrecognized test response returned for test %s: %s", testname, str(result)) + "Unrecognized test response returned for test %s: %s", testname, str(result)) failures += [testname] except status.TestFailure: failures += [testname] diff --git a/integration-test/src/python/local_test_runner/status.py b/integration-test/src/python/local_test_runner/status.py index cc7f4ef3577..7a53808c974 100644 --- a/integration-test/src/python/local_test_runner/status.py +++ b/integration-test/src/python/local_test_runner/status.py @@ -1,5 +1,22 @@ +# Copyright 2016 Twitter. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#!/usr/bin/env python2.7 + import logging +""" Classes to represent the success or failure of an integration test """ + class TestFailure(Exception): def __init__(self, message, error=None): Exception.__init__(self, message, error) diff --git a/integration-test/src/python/local_test_runner/test_template.py b/integration-test/src/python/local_test_runner/test_template.py index d54cfc47991..1ef422e12f2 100644 --- a/integration-test/src/python/local_test_runner/test_template.py +++ b/integration-test/src/python/local_test_runner/test_template.py @@ -187,13 +187,13 @@ def _check_results(self): # Compare the actual and expected result if actual_sorted == expected_sorted: result = status.TestSuccess( - "Actual result matched expected result for test %s" % self.testname) + "Actual result matched expected result for test %s" % self.testname) logging.info("Actual result ---------- \n%s", actual_sorted) logging.info("Expected result ---------- \n%s", expected_sorted) return result else: result = status.TestFailure( - "Actual result did not match expected result for test %s" % self.testname) + "Actual result did not match expected result for test %s" % self.testname) logging.info("Actual result ---------- \n%s", actual_sorted) logging.info("Expected result ---------- \n%s", expected_sorted) raise result @@ -254,7 +254,7 @@ def _get_tracker_pplan(self): if 'result' not in physical_plan_json: raise status.TestFailure( - "Could not find result json in physical plan request to tracker: %s" % url) + "Could not find result json in physical plan request to tracker: %s" % url) return physical_plan_json['result'] @@ -280,7 +280,7 @@ def _block_until_topology_running(self, min_instances): (self.testname, RETRY_COUNT - retries_left, RETRY_COUNT), RETRY_INTERVAL) else: raise status.TestFailure("Failed to get pplan from tracker for test %s after %s attempts." - % (self.testname, RETRY_COUNT)) + % (self.testname, RETRY_COUNT)) def _block_until_stmgr_running(expected_stmgrs): # block until ./heron-stmgr exists From 183d82325fac41877285ea4b6e31bbd834c4510d Mon Sep 17 00:00:00 2001 From: Bill Graham Date: Thu, 10 Nov 2016 08:44:16 -0800 Subject: [PATCH 3/3] Fixing checkstyles and using finally block --- .../src/python/local_test_runner/status.py | 3 +-- .../src/python/local_test_runner/test_template.py | 15 +++++++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/integration-test/src/python/local_test_runner/status.py b/integration-test/src/python/local_test_runner/status.py index 7a53808c974..648138a519f 100644 --- a/integration-test/src/python/local_test_runner/status.py +++ b/integration-test/src/python/local_test_runner/status.py @@ -13,10 +13,9 @@ # limitations under the License. #!/usr/bin/env python2.7 +"""Classes to represent the success or failure of an integration test""" import logging -""" Classes to represent the success or failure of an integration test """ - class TestFailure(Exception): def __init__(self, message, error=None): Exception.__init__(self, message, error) diff --git a/integration-test/src/python/local_test_runner/test_template.py b/integration-test/src/python/local_test_runner/test_template.py index 1ef422e12f2..bf7d5ebb5d4 100644 --- a/integration-test/src/python/local_test_runner/test_template.py +++ b/integration-test/src/python/local_test_runner/test_template.py @@ -82,15 +82,13 @@ def run_test(self): return result except status.TestFailure as e: - if topology_submitted: - logging.error("Test failed, attempting to clean up") - self.cleanup_test() raise e except Exception as e: + raise status.TestFailure("Exception thrown during test", e) + finally: if topology_submitted: logging.error("Test failed, attempting to clean up") self.cleanup_test() - return status.TestFailure("Exception thrown during test", e) def submit_topology(self): _submit_topology( @@ -186,17 +184,17 @@ def _check_results(self): # Compare the actual and expected result if actual_sorted == expected_sorted: - result = status.TestSuccess( + success = status.TestSuccess( "Actual result matched expected result for test %s" % self.testname) logging.info("Actual result ---------- \n%s", actual_sorted) logging.info("Expected result ---------- \n%s", expected_sorted) - return result + return success else: - result = status.TestFailure( + failure = status.TestFailure( "Actual result did not match expected result for test %s" % self.testname) logging.info("Actual result ---------- \n%s", actual_sorted) logging.info("Expected result ---------- \n%s", expected_sorted) - raise result + raise failure # pylint: disable=no-self-use def get_pid(self, process_name, heron_working_directory): @@ -272,6 +270,7 @@ def _block_until_topology_running(self, min_instances): return packing_plan elif retries_left == 0: raise status.TestFailure( + #pylint: disable=too-many-format-args "Got pplan from tracker for test %s but the number of instances found (%d) was " +\ "less than min expected (%s)." % (self.testname, instances_found, min_instances))