Skip to content

Added support for more than 100 metrics #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aws_embedded_metrics/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@

DEFAULT_NAMESPACE = "aws-embedded-metrics"
MAX_DIMENSIONS = 9
MAX_METRICS_PER_EVENT = 100
3 changes: 2 additions & 1 deletion aws_embedded_metrics/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@

import abc
from aws_embedded_metrics.logger.metrics_context import MetricsContext
from typing import List


class Serializer(abc.ABC):
@staticmethod
@abc.abstractmethod
def serialize(context: MetricsContext) -> str:
def serialize(context: MetricsContext) -> List[str]:
"""Flushes the metrics context to the sink."""
50 changes: 31 additions & 19 deletions aws_embedded_metrics/serializers/log_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

from aws_embedded_metrics.logger.metrics_context import MetricsContext
from aws_embedded_metrics.serializers import Serializer
from aws_embedded_metrics.constants import MAX_DIMENSIONS
from aws_embedded_metrics.constants import MAX_DIMENSIONS, MAX_METRICS_PER_EVENT
import json
from typing import Any, Dict, List


class LogSerializer(Serializer):
@staticmethod
def serialize(context: MetricsContext) -> str:
def serialize(context: MetricsContext) -> List[str]:
dimension_keys = []
dimensions_properties: Dict[str, str] = {}

Expand All @@ -29,28 +29,40 @@ def serialize(context: MetricsContext) -> str:
dimension_keys.append(keys[0:MAX_DIMENSIONS])
dimensions_properties = {**dimensions_properties, **dimension_set}

metric_pointers: List[Dict[str, str]] = []
def create_body() -> Dict[str, Any]:
return {
**dimensions_properties,
**context.properties,
"_aws": {
**context.meta,
"CloudWatchMetrics": [
{
"Dimensions": dimension_keys,
"Metrics": [],
"Namespace": context.namespace,
},
],
},
}

metric_definitions = {
"Dimensions": dimension_keys,
"Metrics": metric_pointers,
"Namespace": context.namespace,
}
cloud_watch_metrics = [metric_definitions]

body: Dict[str, Any] = {
**dimensions_properties,
**context.properties,
"_aws": {**context.meta, "CloudWatchMetrics": cloud_watch_metrics},
}
current_body: Dict[str, Any] = create_body()
event_batches: List[str] = []

for metric_name, metric in context.metrics.items():

if len(metric.values) == 1:
body[metric_name] = metric.values[0]
current_body[metric_name] = metric.values[0]
else:
body[metric_name] = metric.values
current_body[metric_name] = metric.values

current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit})

should_serialize: bool = len(current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"]) == MAX_METRICS_PER_EVENT
if should_serialize:
event_batches.append(json.dumps(current_body))
current_body = create_body()

metric_pointers.append({"Name": metric_name, "Unit": metric.unit})
if not event_batches or current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"]:
event_batches.append(json.dumps(current_body))

return json.dumps(body)
return event_batches
5 changes: 3 additions & 2 deletions aws_embedded_metrics/sinks/agent_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ def accept(self, context: MetricsContext) -> None:
if self.log_steam_name is not None:
context.meta["LogStreamName"] = self.log_steam_name

serialized_content = self.serializer.serialize(context) + '\n'
log.info(
"Parsed agent endpoint (%s) %s:%s",
self.endpoint.scheme,
self.endpoint.hostname,
self.endpoint.port,
)
self.client.send_message(serialized_content.encode('utf-8'))
for serialized_content in self.serializer.serialize(context):
message = serialized_content + "\n"
self.client.send_message(message.encode('utf-8'))

@staticmethod
def name() -> str:
Expand Down
3 changes: 2 additions & 1 deletion aws_embedded_metrics/sinks/lambda_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def __init__(self, serializer: Serializer = LogSerializer()):
self.serializer = serializer

def accept(self, context: MetricsContext) -> None:
print(self.serializer.serialize(context))
for serialized_content in self.serializer.serialize(context):
print(serialized_content)

@staticmethod
def name() -> str:
Expand Down
85 changes: 81 additions & 4 deletions tests/serializer/test_log_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def test_serialize_dimensions():
context.put_dimensions(dimensions)

# act
result_json = serializer.serialize(context)
result_json = serializer.serialize(context)[0]

# assert
assert_json_equality(result_json, expected)
Expand Down Expand Up @@ -53,7 +53,7 @@ def test_cannot_serialize_more_than_9_dimensions():
context.put_dimensions(dimensions)

# act
result_json = serializer.serialize(context)
result_json = serializer.serialize(context)[0]

# assert
assert_json_equality(result_json, expected)
Expand All @@ -71,7 +71,7 @@ def test_serialize_properties():
context.set_property(expected_key, expected_value)

# act
result_json = serializer.serialize(context)
result_json = serializer.serialize(context)[0]

# assert
assert_json_equality(result_json, expected)
Expand All @@ -94,12 +94,89 @@ def test_serialize_metrics():
context.put_metric(expected_key, expected_value)

# act
result_json = serializer.serialize(context)
result_json = serializer.serialize(context)[0]

# assert
assert_json_equality(result_json, expected)


def test_serialize_more_than_100_metrics():
# arrange
expected_value = fake.word()
expected_batches = 3
metrics = 295

context = get_context()
for index in range(metrics):
expected_key = f"Metric-{index}"
context.put_metric(expected_key, expected_value)

# act
results = serializer.serialize(context)

# assert
assert len(results) == expected_batches

metric_index = 0
for batch_index in range(expected_batches):
expected_metric_count = metrics % 100 if (batch_index == expected_batches - 1) else 100
result_json = results[batch_index]
result_obj = json.loads(result_json)
assert len(result_obj["_aws"]["CloudWatchMetrics"][0]["Metrics"]) == expected_metric_count

for index in range(expected_metric_count):
assert result_obj[f"Metric-{metric_index}"] == expected_value
metric_index += 1


def test_serialize_with_multiple_metrics():
# arrange
metrics = 2
expected = {**get_empty_payload()}
context = get_context()

for index in range(metrics):
expected_key = f"Metric-{index}"
expected_value = fake.word()
context.put_metric(expected_key, expected_value)

expected_metric_definition = {"Name": expected_key, "Unit": "None"}
expected[expected_key] = expected_value
expected["_aws"]["CloudWatchMetrics"][0]["Metrics"].append(
expected_metric_definition
)

# act
results = serializer.serialize(context)

# assert
assert len(results) == 1
assert results == [json.dumps(expected)]


def test_serialize_metrics_with_multiple_datapoints():
# arrange
expected_key = fake.word()
expected_values = [fake.word(), fake.word()]
expected_metric_definition = {"Name": expected_key, "Unit": "None"}
expected = {**get_empty_payload()}
expected[expected_key] = expected_values
expected["_aws"]["CloudWatchMetrics"][0]["Metrics"].append(
expected_metric_definition
)

context = get_context()
for expected_value in expected_values:
context.put_metric(expected_key, expected_value)

# act
results = serializer.serialize(context)

# assert
assert len(results) == 1
assert results == [json.dumps(expected)]


# Test utility method


Expand Down
23 changes: 23 additions & 0 deletions tests/sinks/test_agent_sink.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from aws_embedded_metrics.logger.metrics_context import MetricsContext
from aws_embedded_metrics.sinks.agent_sink import AgentSink
from aws_embedded_metrics.config import get_config
from unittest.mock import patch, Mock


Config = get_config()

Expand Down Expand Up @@ -70,3 +73,23 @@ def test_fallback_to_default_endpoint_on_parse_failure():
assert sink.endpoint.hostname == expected_hostname
assert sink.endpoint.port == expected_port
assert sink.endpoint.scheme == expected_protocol


@patch("aws_embedded_metrics.sinks.agent_sink.get_socket_client")
def test_more_than_max_number_of_metrics(mock_get_socket_client):
# arrange
context = MetricsContext.empty()
expected_metrics = 401
expected_send_message_calls = 5
for index in range(expected_metrics):
context.put_metric(f"{index}", 1)

mock_tcp_client = Mock()
mock_get_socket_client.return_value = mock_tcp_client

# act
sink = AgentSink("")
sink.accept(context)

# assert
assert expected_send_message_calls == mock_tcp_client.send_message.call_count
23 changes: 23 additions & 0 deletions tests/sinks/test_lambda_sink.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from aws_embedded_metrics.sinks.lambda_sink import LambdaSink
from aws_embedded_metrics.logger.metrics_context import MetricsContext
from faker import Faker
from unittest.mock import patch


fake = Faker()


def test_accept_writes_to_stdout(capfd):
Expand All @@ -17,3 +22,21 @@ def test_accept_writes_to_stdout(capfd):
out
== '{"_aws": {"Timestamp": 1, "CloudWatchMetrics": [{"Dimensions": [], "Metrics": [], "Namespace": "aws-embedded-metrics"}]}}\n'
)


@patch("aws_embedded_metrics.serializers.log_serializer.LogSerializer")
def test_accept_writes_multiple_messages_to_stdout(mock_serializer, capfd):
# arrange
expected_messages = [fake.word() for _ in range(10)]
mock_serializer.serialize.return_value = expected_messages
sink = LambdaSink(serializer=mock_serializer)
context = MetricsContext.empty()
context.meta["Timestamp"] = 1

# act
sink.accept(context)

# assert
out, err = capfd.readouterr()
assert len(out.split()) == len(expected_messages)
assert out.split() == expected_messages