Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Change local integration tests to use test status objects instead of True/False #1546

Merged
merged 3 commits into from
Nov 10, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion heron/tools/cli/src/python/cli_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,5 @@ def run(command, parser, cl_args, unknown_args, action):
Log.error('Failed to %s \'%s\'' % (action, topology_name))
return False

Log.info('Successfully %s \'%s\'' % (action, topology_name))
Log.info('Successfully executed %s \'%s\'' % (action, topology_name))
return True
17 changes: 14 additions & 3 deletions integration-test/src/python/local_test_runner/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import sys
from collections import namedtuple

import status

# import test_kill_bolt
import test_kill_metricsmgr
import test_kill_stmgr
Expand Down Expand Up @@ -60,10 +62,19 @@ def run_tests(test_classes, args):
logging.info("==== Starting test %s of %s: %s ====",
len(successes) + len(failures) + 1, len(test_classes), testname)
template = test_class(testname, args)
if template.run_test(): # testcase passed
successes += [testname]
else:
try:
result = template.run_test()
if isinstance(result, status.TestSuccess): # testcase passed
successes += [testname]
elif isinstance(result, status.TestFailure):
failures += [testname]
else:
logging.error(
"Unrecognized test response returned for test %s: %s", testname, str(result))
failures += [testname]
except status.TestFailure:
failures += [testname]

except Exception as e:
logging.error("Exception thrown while running tests: %s", str(e))
finally:
Expand Down
31 changes: 31 additions & 0 deletions integration-test/src/python/local_test_runner/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env python2.7

import logging

""" Classes to represent the success or failure of an integration test """

class TestFailure(Exception):
def __init__(self, message, error=None):
Exception.__init__(self, message, error)
if error:
logging.error("%s :: %s", message, str(error))
else:
logging.error(message)

class TestSuccess(object):
def __init__(self, message=None):
if message:
logging.info(message)
10 changes: 4 additions & 6 deletions integration-test/src/python/local_test_runner/test_scale_up.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import logging
import subprocess

import status
import test_template

class TestScaleUp(test_template.TestTemplate):
Expand All @@ -39,11 +40,8 @@ def pre_check_results(self, physical_plan_json):
instances = physical_plan_json['instances']
instance_count = len(instances)
if instance_count != self.expected_instance_count:
logging.error("Found %s instances but expected %s: %s",
instance_count, self.expected_instance_count, instances)
return False

return True
raise status.TestFailure("Found %s instances but expected %s: %s" %
(instance_count, self.expected_instance_count, instances))

def scale_up(heron_cli_path, test_cluster, topology_name):
splitcmd = [
Expand All @@ -52,5 +50,5 @@ def scale_up(heron_cli_path, test_cluster, topology_name):
]
logging.info("Increasing number of component instances: %s", splitcmd)
if subprocess.call(splitcmd) != 0:
raise RuntimeError("Unable to update topology")
raise status.TestFailure("Unable to update topology %s" % topology_name)
logging.info("Increased number of component instances")
105 changes: 51 additions & 54 deletions integration-test/src/python/local_test_runner/test_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import subprocess
from collections import namedtuple

import status

# Test input. Please set each variable as it's own line, ended with \n, otherwise the value of lines
# passed into the topology will be incorrect, and the test will fail.
TEST_INPUT = ["1\n", "2\n", "3\n", "4\n", "5\n", "6\n", "7\n", "8\n",
Expand Down Expand Up @@ -52,56 +54,54 @@ def __init__(self, testname, params):
# pylint: disable=too-many-return-statements, too-many-branches,
# pylint: disable=too-many-statements
def run_test(self):
""" Runs the test template """

""" Runs the test template. Must either return TestSuccess or raise TestFailure"""
topology_submitted = False
try:
# prepare test data, start the topology and block until it's running
self._prepare_test_data()
self.submit_topology()
topology_submitted = True
_block_until_stmgr_running(self.get_expected_container_count())

if not self._block_until_topology_running(self.get_expected_min_instance_count()):
self.cleanup_test()
return False
self._block_until_topology_running(self.get_expected_min_instance_count())

# Execute the specific test logic and block until topology is running again
self.execute_test_case()

_block_until_stmgr_running(self.get_expected_container_count())
physical_plan_json =\
self._block_until_topology_running(self.get_expected_min_instance_count())
if not physical_plan_json:
self.cleanup_test()
return False

# trigger the test data to flow and invoke the pre_check_results hook
self._inject_test_data()
if not self.pre_check_results(physical_plan_json):
self.cleanup_test()
return False
except Exception as e:
logging.error("Test failed, attempting to clean up: %s", e)
self.pre_check_results(physical_plan_json)

# finally verify the expected results
result = self._check_results()
self.cleanup_test()
return False
return result

# finally verify the expected results
return self._check_results()
except status.TestFailure as e:
if topology_submitted:
logging.error("Test failed, attempting to clean up")
self.cleanup_test()
raise e
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate except ... as e?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's ok since e is scoped within each except clause so only 1 will be defined without collision.

if topology_submitted:
logging.error("Test failed, attempting to clean up")
self.cleanup_test()
return status.TestFailure("Exception thrown during test", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using try .. except .. finally clause? https://docs.python.org/2.7/tutorial/errors.html#defining-clean-up-actions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good call.


def submit_topology(self):
#submit topology
try:
_submit_topology(
self.params['cliPath'],
self.params['cluster'],
self.params['testJarPath'],
self.params['topologyClassPath'],
self.params['topologyName'],
self.params['readFile'],
self.params['outputFile']
)
except Exception as e:
logging.error("Failed to submit %s topology: %s", self.params['topologyName'], str(e))
return False
_submit_topology(
self.params['cliPath'],
self.params['cluster'],
self.params['testJarPath'],
self.params['topologyClassPath'],
self.params['topologyName'],
self.params['readFile'],
self.params['outputFile']
)

# pylint: disable=no-self-use
def get_expected_container_count(self):
Expand All @@ -123,7 +123,6 @@ def cleanup_test(self):
_kill_topology(self.params['cliPath'], self.params['cluster'], self.params['topologyName'])
except Exception as e:
logging.error("Failed to kill %s topology: %s", self.params['topologyName'], str(e))
return False
finally:
self._delete_test_data_files()

Expand Down Expand Up @@ -165,11 +164,11 @@ def _check_results(self):
with open(self.params['outputFile'], 'r') as g:
actual_result = g.read()
except Exception as e:
logging.error(
"Failed to read expected or actual results from file for test %s: %s", self.testname, e)
message =\
"Failed to read expected or actual results from file for test %s: %s" % self.testname
if retries_left == 0:
self.cleanup_test()
return False
raise status.TestFailure(message, e)
logging.error(message, e)
# if we get expected result, no need to retry
expected_sorted = sorted(expected_result.split('\n'))
actual_sorted = sorted(actual_result.split('\n'))
Expand All @@ -185,19 +184,19 @@ def _check_results(self):
self.testname, RETRY_COUNT - retries_left, RETRY_COUNT, RETRY_INTERVAL)
time.sleep(RETRY_INTERVAL)

self.cleanup_test()

# Compare the actual and expected result
if actual_sorted == expected_sorted:
logging.info("Actual result matched expected result for test %s", self.testname)
result = status.TestSuccess(
"Actual result matched expected result for test %s" % self.testname)
logging.info("Actual result ---------- \n%s", actual_sorted)
logging.info("Expected result ---------- \n%s", expected_sorted)
return True
return result
else:
logging.error("Actual result did not match expected result for test %s", self.testname)
result = status.TestFailure(
"Actual result did not match expected result for test %s" % self.testname)
logging.info("Actual result ---------- \n%s", actual_sorted)
logging.info("Expected result ---------- \n%s", expected_sorted)
return False
raise result

# pylint: disable=no-self-use
def get_pid(self, process_name, heron_working_directory):
Expand Down Expand Up @@ -254,8 +253,8 @@ def _get_tracker_pplan(self):
physical_plan_json = json.loads(response.read())

if 'result' not in physical_plan_json:
logging.error("Could not find result json in physical plan request to tracker: %s", url)
return None
raise status.TestFailure(
"Could not find result json in physical plan request to tracker: %s" % url)

return physical_plan_json['result']

Expand All @@ -272,20 +271,16 @@ def _block_until_topology_running(self, min_instances):
self.testname, RETRY_COUNT - retries_left)
return packing_plan
elif retries_left == 0:
logging.error(
raise status.TestFailure(
"Got pplan from tracker for test %s but the number of instances found (%d) was " +\
"less than min expected (%s).", self.testname, instances_found, min_instances)
self.cleanup_test()
return None
"less than min expected (%s)." % (self.testname, instances_found, min_instances))

if retries_left > 0:
_sleep("before trying again to fetch pplan for test %s (attempt %s/%s)" %
(self.testname, RETRY_COUNT - retries_left, RETRY_COUNT), RETRY_INTERVAL)
else:
logging.error("Failed to get pplan from tracker for test %s after %s attempts.",
self.testname, RETRY_COUNT)
self.cleanup_test()
return None
raise status.TestFailure("Failed to get pplan from tracker for test %s after %s attempts."
% (self.testname, RETRY_COUNT))

def _block_until_stmgr_running(expected_stmgrs):
# block until ./heron-stmgr exists
Expand All @@ -304,16 +299,18 @@ def _submit_topology(heron_cli_path, test_cluster, test_jar_path, topology_class
logging.info("Submitting topology: %s", splitcmd)
p = subprocess.Popen(splitcmd)
p.wait()
if p.returncode != 0:
raise status.TestFailure("Failed to submit topology %s" % topology_name)

logging.info("Submitted topology %s", topology_name)

def _kill_topology(heron_cli_path, test_cluster, topology_name):
""" Kill a topology using heron-cli """
splitcmd = [heron_cli_path, 'kill', '--verbose', test_cluster, topology_name]
logging.info("Killing topology: %s", splitcmd)
splitcmd = [heron_cli_path, 'kill', test_cluster, topology_name]
logging.info("Killing topology: %s", ' '.join(splitcmd))
# this call can be blocking, no need for subprocess
if subprocess.call(splitcmd) != 0:
raise RuntimeError("Unable to kill the topology: %s" % topology_name)
logging.info("Successfully killed topology %s", topology_name)

def _get_processes():
"""
Expand Down
2 changes: 1 addition & 1 deletion integration-test/src/python/test_runner/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def kill_topology(heron_cli_path, cli_config_path, cluster, role, env, topology_
cmd = "%s kill --config-path=%s %s %s" %\
(heron_cli_path, cli_config_path, cluster_token(cluster, role, env), topology_name)

logging.info("Kill topology: %s", cmd)
logging.info("Killing topology: %s", cmd)
if os.system(cmd) == 0:
logging.info("Successfully killed topology %s", topology_name)
return
Expand Down