Skip to content

Commit

Permalink
Small fixes, improving logging (#35)
Browse files Browse the repository at this point in the history
* Refactoring, fix typos, improve logs

- renamed run_time to time_elapses, as run_time is also used in the Lambda payload
and also in locust and it was sighlty confusing
- added get_ prefix to getter methods
- replaced log_ prefix with increase_ to clarify the intent
- added some log messages and output to help understand what's happening
- fixed some typos in comments
- added object variables for better control of error thresholds
- updated readme example outputs

* Bump invokust version
  • Loading branch information
csillab authored Jun 9, 2020
1 parent 4580bf5 commit 4ceab70
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 136 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ invokust.egg-info
dist
build
.terraform
.idea/*
155 changes: 78 additions & 77 deletions README.md

Large diffs are not rendered by default.

27 changes: 19 additions & 8 deletions invokr.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ def print_stats_exit(load_test_state):
agg_results['ramp_time'] = load_test_state.ramp_time
agg_results['time_limit'] = load_test_state.time_limit
logging.info('Aggregated results: {0}'.format(json.dumps(agg_results)))

logging.info("\n============================================================"
f"\nRamp up time: {agg_results['ramp_time']}s"
f"\nStarted ramp down after {agg_results['time_limit']}s (time_limit)"
f"\nThread count: {agg_results['threads']}"
f"\nLambda invocation count: {agg_results['lambda_invocations']}"
f"\nLambda invocation error ratio: {agg_results['invocation_error_ratio']}"
f"\nCumulative lambda execution time: {agg_results['total_lambda_execution_time']}ms"
f"\nTotal requests sent: {agg_results['num_requests']}"
f"\nTotal requests failed: {agg_results['num_requests_fail']}"
f"\nTotal request failure ratio: {agg_results['request_fail_ratio']}\n"

)
logging.info('===========================================================================================================================')
logging.info(print_stat('TYPE', 'NAME', '#REQUESTS', 'MEDIAN', 'AVERAGE', 'MIN', 'MAX', '#REQS/SEC'))
logging.info('===========================================================================================================================')
Expand All @@ -53,26 +66,24 @@ def print_stats_exit(load_test_state):

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)-6s %(threadName)-11s %(message)s')

'''
We set the lambda_runtime param to be either the length of the test or to 3 minutes of the
test will alst longer. This is because otherwise the lambdas will keep runnning until they
error for exceeding invocation time.
'''
lambda_runtime = f'{args.time_limit}s' if args.time_limit < 180 else "3m"
# AWS Lambda has a maximum execution time ("timeout"). We limit the execution time to 3 minutes if the overall
# load test time is longer, to make sure the lambda will not exceed the timeout.

lambda_runtime = f'{min(args.time_limit, 180)}s'
lambda_payload = {
'locustfile': args.locust_file,
'host': args.locust_host,
'num_clients': args.locust_clients,
'hatch_rate': 10,
'run_time': lambda_runtime
'run_time': lambda_runtime,
}

load_test_state = LambdaLoadTest(
args.function_name,
args.threads,
args.ramp_time,
args.time_limit,
lambda_payload
lambda_payload,
)

try:
Expand Down
101 changes: 57 additions & 44 deletions invokust/aws_lambda/lambda_load_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ def __init__(self, lambda_function_name, threads, ramp_time, time_limit, lambda_
self.logger = logging.getLogger()
self.threads = threads
self.ramp_time = ramp_time
self.time_limit = time_limit
self.time_limit = time_limit # don't start new threads after {time_limit} seconds
self.lambda_function_name = lambda_function_name
self.lambda_payload = lambda_payload
self.lambda_invocation_errors = 0
self.lambda_invocation_count = 0
self.lambda_invocation_error_threshold = 20
self.lambda_total_execution_time = 0
self.requests_fail = 0
self.request_fail_ratio_threshold = 0.5
self.requests_total = 0
self.locust_results = []
self.thread_data = {}
Expand All @@ -55,27 +57,27 @@ def get_thread_count(self):
'''
return len([t for t in threading.enumerate() if t.getName() is not 'MainThread'])

def get_run_time(self):
def get_time_elapsed(self):
'''
Returns total run time of the load test
Returns elapsed time in seconds since starting the load test
'''
return round(time.time() - self.start_time)

def log_lambda_invocation_error(self):
def increase_lambda_invocation_error(self):
'''
Increases Lambda invocation error count
'''
with self.lock:
self.lambda_invocation_errors += 1

def log_lambda_invocation_count(self):
def increase_lambda_invocation_count(self):
'''
Increases Lambda invocation count
'''
with self.lock:
self.lambda_invocation_count += 1

def invocation_error_ratio(self):
def get_invocation_error_ratio(self):
'''
Returns ratio of Lambda invocations to invocation errors
'''
Expand All @@ -84,21 +86,21 @@ def invocation_error_ratio(self):
except ZeroDivisionError:
return 0

def log_requests_total(self, requests):
def increase_requests_total(self, requests):
'''
Increases total request count
'''
with self.lock:
self.requests_total += requests

def log_requests_fail(self, requests):
def increase_requests_fail(self, requests):
'''
Increases total request fail count
'''
with self.lock:
self.requests_fail += requests

def request_fail_ratio(self):
def get_request_fail_ratio(self):
'''
Returns ratio of failed to total requests
'''
Expand All @@ -107,9 +109,9 @@ def request_fail_ratio(self):
except ZeroDivisionError:
return 0

def log_locust_results(self, results):
def append_locust_results(self, results):
'''
Logs results from a locust exection. All results needs to be aggregated in order to show meaningful statistics of the whole load test
Logs results from a locust execution. All results needs to be aggregated in order to show meaningful statistics of the whole load test
'''
with self.lock:
self.locust_results.append(results)
Expand All @@ -122,8 +124,8 @@ def get_summary_stats(self):
'lambda_invocation_count': self.lambda_invocation_count,
'total_lambda_execution_time': self.lambda_total_execution_time,
'requests_total': self.requests_total,
'request_fail_ratio': self.request_fail_ratio(),
'invocation_error_ratio': self.invocation_error_ratio()
'request_fail_ratio': self.get_request_fail_ratio(),
'invocation_error_ratio': self.get_invocation_error_ratio()
}

def get_stats(self):
Expand All @@ -133,10 +135,10 @@ def get_stats(self):
return {
'thread_count': self.get_thread_count(),
'rpm': self.calculate_rpm(),
'run_time': self.get_run_time(),
'time_elapsed': self.get_time_elapsed(),
'requests_total': self.requests_total,
'request_fail_ratio': self.request_fail_ratio(),
'invocation_error_ratio': self.invocation_error_ratio(),
'request_fail_ratio': self.get_request_fail_ratio(),
'invocation_error_ratio': self.get_invocation_error_ratio(),
}

def get_locust_results(self):
Expand All @@ -145,24 +147,36 @@ def get_locust_results(self):
'''
return self.locust_results

def log_lambda_execution_time(self, time):
def increase_lambda_execution_time(self, time):
'''
Add Lambda exection time to the total
Add Lambda execution time to the total
'''
with self.lock:
self.lambda_total_execution_time += time

def calculate_rpm(self):
'''
Returns current total RPM across all threads
Returns current total request per minute across all threads
'''
return round(sum(self.thread_data[thread_id]['rpm'] for thread_id in self.thread_data if 'rpm' in self.thread_data[thread_id]))

def check_error_threshold(self):
'''
Checks if the current Lambda and request fail ratios are within thresholds
'''
if self.lambda_invocation_errors > 20 or self.request_fail_ratio() > 0.5:


if self.lambda_invocation_errors > self.lambda_invocation_error_threshold:
self.logger.error(
f"Error limit reached. invocation error count/threshold: "
f"{self.lambda_invocation_errors}/{self.lambda_invocation_error_threshold}"
)
return True
elif self.get_request_fail_ratio() > self.request_fail_ratio_threshold:
self.logger.error(
f"Error limit reached. requests failed ratio/threshold: "
f"{self.get_request_fail_ratio()}/{self.request_fail_ratio_threshold}"
)
return True
else:
return False
Expand All @@ -174,7 +188,7 @@ def thread_required(self):
result = False
if self.get_thread_count() < self.threads:
next_thread_interval = (self.ramp_time / self.threads) * self.get_thread_count()
if self.get_run_time() > next_thread_interval:
if self.get_time_elapsed() > next_thread_interval:
result = True
return result

Expand Down Expand Up @@ -216,6 +230,7 @@ def thread(self):
function_start_time = time.time()

try:
self.logger.info("Invoking lambda...")
response = client.invoke(FunctionName=self.lambda_function_name, Payload=json.dumps(self.lambda_payload))
except Exception as e:
self.logger.critical('Lambda invocation failed: {0}'.format(repr(e)))
Expand All @@ -224,11 +239,11 @@ def thread(self):

function_end_time = time.time()

self.log_lambda_invocation_count()
self.increase_lambda_invocation_count()

if 'FunctionError' in response:
logger.error('error {0}: {1}'.format(response['FunctionError'], response['Payload'].read()))
self.log_lambda_invocation_error()
self.increase_lambda_invocation_error()
time.sleep(2)
continue

Expand All @@ -237,7 +252,7 @@ def thread(self):

if not payload_json_str:
logger.error('No results in payload')
self.log_lambda_invocation_error()
self.increase_lambda_invocation_error()
time.sleep(2)
continue

Expand All @@ -246,21 +261,20 @@ def thread(self):
total_rpm = results['num_requests'] / (function_duration / 60)
lambda_execution_time = 300000 - results['remaining_time']

self.log_locust_results(results)
self.log_requests_fail(results['num_requests_fail'])
self.log_requests_total(results['num_requests'])
self.append_locust_results(results)
self.increase_requests_fail(results['num_requests_fail'])
self.increase_requests_total(results['num_requests'])
self.update_thread_data(thread_id, 'rpm', total_rpm)
self.update_thread_data(thread_id, 'lambda_execution_time', lambda_execution_time)
self.log_lambda_execution_time(lambda_execution_time)
self.increase_lambda_execution_time(lambda_execution_time)

logger.info('Invocation complete. Requests (errors): {0} ({1}), execution time: {2}, sleeping: {3}'.format(
logger.info('Lambda invocation complete. Requests (errors): {0} ({1}), execution time: {2}ms, sleeping: {3}s'.format(
results['num_requests'],
results['num_requests_fail'],
lambda_execution_time,
sleep_time
)
)

time.sleep(sleep_time)

self.logger.info('thread finished')
Expand All @@ -269,30 +283,27 @@ def run(self):
'''
Starts the load test, periodically prints statistics and starts new threads
'''
self.logger.info('\nStarting load test\nFunction: {0}\nRamp time: {1}\nThreads: {2}\nLambda payload: {3}\n'.format(
self.lambda_function_name,
self.ramp_time,
self.threads,
self.lambda_payload
)
self.logger.info(
"\nStarting load test..."
f"\nFunction name: {self.lambda_function_name}"
f"\nRamp time: {self.ramp_time}s"
f"\nThreads: {self.threads}"
f"\nLambda payload: {self.lambda_payload}"
f"\nStart ramping down after: {self.time_limit}s"
)

self.start_new_thread()

while True:
self.logger.info(
'threads: {thread_count}, rpm: {rpm}, run_time: {run_time}, requests_total: {requests_total}, request_fail_ratio: {request_fail_ratio}, invocation_error_ratio: {invocation_error_ratio}'.format(**self.get_stats())
'threads: {thread_count}, rpm: {rpm}, time elapsed: {time_elapsed}s, total requests from finished threads: {requests_total}, '
'request fail ratio: {request_fail_ratio}, invocation error ratio: {invocation_error_ratio}'.format(**self.get_stats())
)

if self.thread_required():
self.start_new_thread()

if self.check_error_threshold():
self.logger.error('Error limit reached, invocation error ratio: {0}, request fail ratio: {1}'.format(
self.invocation_error_ratio(),
self.request_fail_ratio()
)
)
self.stop_threads()
self.logger.info('Waiting for threads to exit...')
while self.get_thread_count() > 0:
Expand All @@ -301,9 +312,11 @@ def run(self):
break


if self.time_limit and self.get_run_time() > self.time_limit:
self.logger.info('Time limit reached')
if self.time_limit and self.get_time_elapsed() > self.time_limit:
self.logger.info('Time limit reached. Starting ramp down...')
self.stop_threads()

self.logger.info("Waiting for all Lambdas to return. This may take up to {0}.".format(self.lambda_payload["run_time"]))
while self.get_thread_count() > 0:
time.sleep(1)
else:
Expand Down
10 changes: 5 additions & 5 deletions locustfile_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
from locust import HttpLocust, TaskSet, task, between


class GetHomePageTask(TaskSet):
class HomePageTaskSet(TaskSet):
@task()
def get_home_page(self):
'''
Gets /
'''
self.client.get("/")

@task()
def get_another_page(self):
def login(self):
'''
Post /
Posts to /post
'''
response=self.client.post("/post", {"username":"password"})


class WebsiteUser(HttpLocust):
task_set = GetHomePageTask
task_set = HomePageTaskSet
wait_time = between(0, 0)
Empty file modified package_lambda_code.sh
100644 → 100755
Empty file.
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

setup(
name = 'invokust',
version = '0.71',
version = '0.72',
author = 'Max Williams',
author_email = 'futuresharks@gmail.com',
description = 'A small wrapper for locust to allow running load tests from within Python or on AWS Lambda',
long_description=long_description,
long_description_content_type="text/markdown",
url = 'https://github.com/FutureSharks/invokust',
download_url = 'https://github.com/FutureSharks/invokust/archive/0.71.tar.gz',
download_url = 'https://github.com/FutureSharks/invokust/archive/0.72.tar.gz',
license = 'MIT',
scripts = ['invokr.py'],
packages = [
Expand Down

0 comments on commit 4ceab70

Please sign in to comment.