Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding tagging based on request context #16

Merged
merged 6 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# locust-influxdb-boilerplate

LocustIO base project with a custom influxDB listener.
LocustIO base project with a custom influxDB listener. This package requires Locust v1.5.0 or greater.

## Instructions

Expand Down
2 changes: 1 addition & 1 deletion example/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
locust_influxdb_listener == 0.0.5
influxdb==5.3.1
locust==1.4.1
locust==1.5.0
34 changes: 16 additions & 18 deletions locust_influxdb_listener/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def __init__(
events = env.events

# requests
events.request_success.add_listener(self.request_success)
events.request_failure.add_listener(self.request_failure)
events.request.add_listener(self.request)
# events
events.test_stop.add_listener(self.test_stop)
events.user_error.add_listener(self.user_error)
Expand All @@ -98,11 +97,11 @@ def __init__(
# complete
atexit.register(self.quitting)

def request_success(self, request_type, name, response_time, response_length, **_kwargs) -> None:
self.__listen_for_requests_events(self.node_id, 'locust_requests', request_type, name, response_time, response_length, True, None)

def request_failure(self, request_type, name, response_time, response_length, exception, **_kwargs) -> None:
self.__listen_for_requests_events(self.node_id, 'locust_requests', request_type, name, response_time, response_length, False, exception)
def request(self, request_type, name, response_time, response_length, response,
context, exception, start_time=None, url=None) -> None:
self.__listen_for_requests_events(
self.node_id, 'locust_requests', request_type, name, response_time,
response_length, response, context, exception, start_time, url)

def spawning_complete(self, user_count) -> None:
self.__register_event(self.node_id, user_count, 'spawning_complete')
Expand All @@ -125,7 +124,6 @@ def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs)
"""
Persist locust event such as hatching started or stopped to influxdb.
Append user_count in case that it exists

:param node_id: The id of the node reporting the event.
:param event: The event name or description.
"""
Expand All @@ -143,23 +141,28 @@ def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs)
self.cache.append(point)

def __listen_for_requests_events(self, node_id, measurement, request_type, name,
response_time, response_length, success, exception) -> None:
response_time, response_length, response,
context, exception, start_time, url) -> None:
"""
Persist request information to influxdb.

:param node_id: The id of the node reporting the event.
:param measurement: The measurement where to save this point.
:param success: Flag the info to as successful request or not
"""

time = datetime.utcnow()
was_successful = True
if response:
jmfiola marked this conversation as resolved.
Show resolved Hide resolved
# override with response code
was_successful = 199 < response.status_code < 400
tags = {
'node_id': node_id,
'request_type': request_type,
'name': name,
'success': success,
'success': was_successful,
'exception': repr(exception),
}
if context and type(context) == dict:
tags.update(context)

if isinstance(exception, HTTPError):
tags['code'] = exception.response.status_code
Expand All @@ -175,7 +178,6 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name,
def __listen_for_locust_errors(self, node_id, user_instance, exception: Exception = None, tb=None) -> None:
"""
Persist locust errors to InfluxDB.

:param node_id: The id of the node reporting the error.
:return: None
"""
Expand All @@ -197,7 +199,6 @@ def __listen_for_locust_errors(self, node_id, user_instance, exception: Exceptio
def __flush_cached_points_worker(self) -> None:
"""
Background job that puts the points into the cache to be flushed according tot he interval defined.

:param influxdb_client:
:param interval:
:return: None
Expand All @@ -210,7 +211,6 @@ def __flush_cached_points_worker(self) -> None:
def __make_data_point(self, measurement: str, tags: dict, fields: dict, time: datetime) -> dict:
"""
Create a list with a single point to be saved to influxdb.

:param measurement: The measurement where to save this point.
:param tags: Dictionary of tags to be saved in the measurement.
:param fields: Dictionary of field to be saved to measurement.
Expand All @@ -228,7 +228,6 @@ def last_flush_on_quitting(self):
def __flush_points(self, influxdb_client: InfluxDBClient) -> None:
"""
Write the cached data points to influxdb

:param influxdb_client: An instance of InfluxDBClient
:return: None
"""
Expand All @@ -239,5 +238,4 @@ def __flush_points(self, influxdb_client: InfluxDBClient) -> None:
if not success:
log.error('Failed to write points to influxdb.')
# If failed for any reason put back into the beginning of cache
self.cache.insert(0, to_be_flushed)

self.cache.insert(0, to_be_flushed)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"Operating System :: OS Independent",
],
install_requires=[
'locust>=1.1.1',
'locust>=1.5.0',
'influxdb>=5.2.2',
],
python_requires='>=3.6',
Expand Down