Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[collector][emitter] Split metric payloads bigger than 2MB #3454

Merged
merged 3 commits into from
Aug 21, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 89 additions & 26 deletions emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
control_char_re = re.compile('[%s]' % re.escape(control_chars))


# Only enforced for the metrics API on our end, for now
MAX_COMPRESSED_SIZE = 2 << 20 # 2MB, the backend should accept up to 3MB but let's be conservative here
MAX_SPLIT_DEPTH = 3 # maximum depth of recursive calls to payload splitting function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you arrive at this number? It's cool if it's arbitrary, clearly from the test it can split a giant object.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arbitrary number yes



def remove_control_chars(s, log):
if isinstance(s, str):
sanitized = control_char_re.sub('', s)
Expand Down Expand Up @@ -72,49 +77,106 @@ def sanitize_payload(item, log, sanitize_func):

return item

def post_payload(url, message, agentConfig, log):

def post_payload(url, message, serialize_func, agentConfig, log):
log.debug('http_emitter: attempting postback to ' + url)

try:
try:
payload = json.dumps(message)
except UnicodeDecodeError:
newmessage = sanitize_payload(message, log, remove_control_chars)
try:
payload = json.dumps(newmessage)
except UnicodeDecodeError:
log.info('Removing undecodable characters from payload')
newmessage = sanitize_payload(newmessage, log, remove_undecodable_chars)
payload = json.dumps(newmessage)
payloads = serialize_func(message, MAX_COMPRESSED_SIZE, 0, log)
except UnicodeDecodeError as ude:
log.error('http_emitter: Unable to convert message to json %s', ude)
log.exception('http_emitter: Unable to convert message to json')
# early return as we can't actually process the message
return
except RuntimeError as rte:
log.error('http_emitter: runtime error dumping message to json %s', rte)
log.exception('http_emitter: runtime error dumping message to json')
# early return as we can't actually process the message
return
except Exception as e:
log.error('http_emitter: unknown exception processing message %s', e)
log.exception('http_emitter: unknown exception processing message')
return

zipped = zlib.compress(payload)
for payload in payloads:
try:
headers = get_post_headers(agentConfig, payload)
r = requests.post(url, data=payload, timeout=5, headers=headers)

log.debug("payload_size=%d, compressed_size=%d, compression_ratio=%.3f"
% (len(payload), len(zipped), float(len(payload))/float(len(zipped))))
r.raise_for_status()

if r.status_code >= 200 and r.status_code < 205:
log.debug("Payload accepted")

except Exception:
log.exception("Unable to post payload.")


def serialize_payload(message, log):
payload = ""
try:
headers = get_post_headers(agentConfig, zipped)
r = requests.post(url, data=zipped, timeout=5, headers=headers)
payload = json.dumps(message)
except UnicodeDecodeError:
newmessage = sanitize_payload(message, log, remove_control_chars)
try:
payload = json.dumps(newmessage)
except UnicodeDecodeError:
log.info('Removing undecodable characters from payload')
newmessage = sanitize_payload(newmessage, log, remove_undecodable_chars)
payload = json.dumps(newmessage)

r.raise_for_status()
return payload


def serialize_and_compress_legacy_payload(legacy_payload, max_compressed_size, depth, log):
"""
Serialize and compress the legacy payload
"""
serialized_payload = serialize_payload(legacy_payload, log)
zipped = zlib.compress(serialized_payload)
log.debug("payload_size=%d, compressed_size=%d, compression_ratio=%.3f"
% (len(serialized_payload), len(zipped), float(len(serialized_payload))/float(len(zipped))))

if r.status_code >= 200 and r.status_code < 205:
log.debug("Payload accepted")

except Exception:
log.exception("Unable to post payload.")
compressed_payloads = [zipped]

if len(zipped) > max_compressed_size:
# let's just log a warning for now, splitting the legacy payload is tricky
log.warning("collector payload is above the limit of %dKB compressed", max_compressed_size/(1<<10))

return compressed_payloads


def serialize_and_compress_metrics_payload(metrics_payload, max_compressed_size, depth, log):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really cool, elegant solution.

However, it took me a bit to understand everything that was going on in it. I think it could stand to use a few comments. I think it would be very easy to make a mistake in editing this function in the future without some added clarity, and I specified some of the places where I think it could stand to be clearer in my other comments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very valid comment, I'm going to document this more

"""
Serialize and compress the metrics payload
If the compressed payload is too big, we attempt to split it into smaller payloads
"""
compressed_payloads = []

serialized_payload = serialize_payload(metrics_payload, log)
zipped = zlib.compress(serialized_payload)
compression_ratio = float(len(serialized_payload))/float(len(zipped))
log.debug("payload_size=%d, compressed_size=%d, compression_ratio=%.3f"
% (len(serialized_payload), len(zipped), compression_ratio))

if len(zipped) < max_compressed_size:
compressed_payloads.append(zipped)
else:
series = metrics_payload["series"]

if depth > MAX_SPLIT_DEPTH:
log.error("Maximum depth of payload splitting reached, dropping the %d metrics in this chunk", len(series))
return compressed_payloads

nb_chunks = len(zipped)/max_compressed_size + 1 + int(compression_ratio/2) # try to account for the compression
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does nb_chunks mean? I am honestly uncertain, even after thinking about it for a few minutes. We might want to rename it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nb_chunks is the number of "chunks" (i.e. smaller payloads) we'll split the current metrics_payload into. I can definitely document this more, let me know if you have a better idea for the name of the variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nb is number! ha that makes sense. ha. I kept thinking n and b were different words. No I think this is fine, n_chucks is the only other one I can think of.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood what it did, the naming just threw me off

log.debug("payload is too big (%d bytes), splitting it in %d chunks", len(zipped), nb_chunks)

series_per_chunk = len(series)/nb_chunks + 1

for i in range(nb_chunks):
compressed_payloads.extend(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🍰

serialize_and_compress_metrics_payload({"series": series[i*series_per_chunk:(i+1)*series_per_chunk]}, max_compressed_size, depth+1, log)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments here? There's a lot going on in this line.

Copy link
Member Author

@olivielpeau olivielpeau Jul 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good comment, this definitely needs more explanations

)

return compressed_payloads


def split_payload(legacy_payload):
Expand Down Expand Up @@ -149,6 +211,7 @@ def split_payload(legacy_payload):

return legacy_payload, metrics_payload


def http_emitter(message, log, agentConfig, endpoint):
api_key = message.get('apiKey')

Expand All @@ -164,10 +227,10 @@ def http_emitter(message, log, agentConfig, endpoint):
legacy_payload, metrics_payload = split_payload(message)

# Post legacy payload
post_payload(legacy_url, legacy_payload, agentConfig, log)
post_payload(legacy_url, legacy_payload, serialize_and_compress_legacy_payload, agentConfig, log)

# Post metrics payload
post_payload(metrics_endpoint, metrics_payload, agentConfig, log)
post_payload(metrics_endpoint, metrics_payload, serialize_and_compress_metrics_payload, agentConfig, log)


def get_post_headers(agentConfig, payload):
Expand Down
44 changes: 39 additions & 5 deletions tests/core/test_emitter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# 3p
from mock import Mock
import mock
import unittest
import simplejson as json

Expand All @@ -9,7 +9,9 @@
remove_control_chars,
remove_undecodable_chars,
sanitize_payload,
split_payload
serialize_and_compress_metrics_payload,
serialize_payload,
split_payload,
)

import os
Expand Down Expand Up @@ -51,7 +53,7 @@ def test_remove_control_chars(self):
(u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪', u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪')
]

log = Mock()
log = mock.Mock()
for bad, good in messages:
self.assertTrue(remove_control_chars(bad, log) == good, (bad,good))

Expand All @@ -70,7 +72,7 @@ def test_remove_control_chars_from_payload(self):
{"processes":[1234,[[u'db🖫', 0, 2.2,12,34,u'☢compiz☢',1]]]}
]

log = Mock()
log = mock.Mock()

def is_converted_same(msg):
new_msg = sanitize_payload(msg, log, remove_control_chars)
Expand All @@ -92,6 +94,38 @@ def test_remove_undecodable_characters(self):
]

for bad, good, log_called in messages:
log = Mock()
log = mock.Mock()
self.assertEqual(good, remove_undecodable_chars(bad, log))
self.assertEqual(log_called, log.warning.called)

# Make compression a no-op for the tests
@mock.patch('zlib.compress', side_effect=lambda x: x)
def test_metrics_payload_chunks(self, compress_mock):
log = mock.Mock()
nb_series = 10000
max_compressed_size = 1 << 10

metrics_payload = {"series": [
{
"metric": "%d" % i, # use an integer so that it's easy to find the metric afterwards
"points": [(i, i)],
"source_type_name": "System",
} for i in xrange(nb_series)
]}

compressed_payloads = serialize_and_compress_metrics_payload(metrics_payload, max_compressed_size, 0, log)

# check that all the payloads are smaller than the max size
for compressed_payload in compressed_payloads:
self.assertLess(len(compressed_payload), max_compressed_size)

# check that all the series are there (correct number + correct metric names)
series_after = []
for compressed_payload in compressed_payloads:
series_after.extend(json.loads(compressed_payload)["series"])

self.assertEqual(nb_series, len(series_after))

metrics_sorted = sorted([int(metric["metric"]) for metric in series_after])
for i, metric_name in enumerate(metrics_sorted):
self.assertEqual(i, metric_name)