diff --git a/aws_embedded_metrics/constants.py b/aws_embedded_metrics/constants.py index 4fcdc5f..756e9cd 100644 --- a/aws_embedded_metrics/constants.py +++ b/aws_embedded_metrics/constants.py @@ -13,3 +13,4 @@ DEFAULT_NAMESPACE = "aws-embedded-metrics" MAX_DIMENSIONS = 9 +MAX_METRICS_PER_EVENT = 100 diff --git a/aws_embedded_metrics/serializers/__init__.py b/aws_embedded_metrics/serializers/__init__.py index 2fa74b4..20c021d 100644 --- a/aws_embedded_metrics/serializers/__init__.py +++ b/aws_embedded_metrics/serializers/__init__.py @@ -13,10 +13,11 @@ import abc from aws_embedded_metrics.logger.metrics_context import MetricsContext +from typing import List class Serializer(abc.ABC): @staticmethod @abc.abstractmethod - def serialize(context: MetricsContext) -> str: + def serialize(context: MetricsContext) -> List[str]: """Flushes the metrics context to the sink.""" diff --git a/aws_embedded_metrics/serializers/log_serializer.py b/aws_embedded_metrics/serializers/log_serializer.py index 55149d1..afb5f73 100644 --- a/aws_embedded_metrics/serializers/log_serializer.py +++ b/aws_embedded_metrics/serializers/log_serializer.py @@ -13,14 +13,14 @@ from aws_embedded_metrics.logger.metrics_context import MetricsContext from aws_embedded_metrics.serializers import Serializer -from aws_embedded_metrics.constants import MAX_DIMENSIONS +from aws_embedded_metrics.constants import MAX_DIMENSIONS, MAX_METRICS_PER_EVENT import json from typing import Any, Dict, List class LogSerializer(Serializer): @staticmethod - def serialize(context: MetricsContext) -> str: + def serialize(context: MetricsContext) -> List[str]: dimension_keys = [] dimensions_properties: Dict[str, str] = {} @@ -29,28 +29,40 @@ def serialize(context: MetricsContext) -> str: dimension_keys.append(keys[0:MAX_DIMENSIONS]) dimensions_properties = {**dimensions_properties, **dimension_set} - metric_pointers: List[Dict[str, str]] = [] + def create_body() -> Dict[str, Any]: + return { + **dimensions_properties, + **context.properties, + "_aws": { + **context.meta, + "CloudWatchMetrics": [ + { + "Dimensions": dimension_keys, + "Metrics": [], + "Namespace": context.namespace, + }, + ], + }, + } - metric_definitions = { - "Dimensions": dimension_keys, - "Metrics": metric_pointers, - "Namespace": context.namespace, - } - cloud_watch_metrics = [metric_definitions] - - body: Dict[str, Any] = { - **dimensions_properties, - **context.properties, - "_aws": {**context.meta, "CloudWatchMetrics": cloud_watch_metrics}, - } + current_body: Dict[str, Any] = create_body() + event_batches: List[str] = [] for metric_name, metric in context.metrics.items(): if len(metric.values) == 1: - body[metric_name] = metric.values[0] + current_body[metric_name] = metric.values[0] else: - body[metric_name] = metric.values + current_body[metric_name] = metric.values + + current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit}) + + should_serialize: bool = len(current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"]) == MAX_METRICS_PER_EVENT + if should_serialize: + event_batches.append(json.dumps(current_body)) + current_body = create_body() - metric_pointers.append({"Name": metric_name, "Unit": metric.unit}) + if not event_batches or current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"]: + event_batches.append(json.dumps(current_body)) - return json.dumps(body) + return event_batches diff --git a/aws_embedded_metrics/sinks/agent_sink.py b/aws_embedded_metrics/sinks/agent_sink.py index 125b140..59b202b 100644 --- a/aws_embedded_metrics/sinks/agent_sink.py +++ b/aws_embedded_metrics/sinks/agent_sink.py @@ -66,14 +66,15 @@ def accept(self, context: MetricsContext) -> None: if self.log_steam_name is not None: context.meta["LogStreamName"] = self.log_steam_name - serialized_content = self.serializer.serialize(context) + '\n' log.info( "Parsed agent endpoint (%s) %s:%s", self.endpoint.scheme, self.endpoint.hostname, self.endpoint.port, ) - self.client.send_message(serialized_content.encode('utf-8')) + for serialized_content in self.serializer.serialize(context): + message = serialized_content + "\n" + self.client.send_message(message.encode('utf-8')) @staticmethod def name() -> str: diff --git a/aws_embedded_metrics/sinks/lambda_sink.py b/aws_embedded_metrics/sinks/lambda_sink.py index bd7727d..54e390b 100644 --- a/aws_embedded_metrics/sinks/lambda_sink.py +++ b/aws_embedded_metrics/sinks/lambda_sink.py @@ -22,7 +22,8 @@ def __init__(self, serializer: Serializer = LogSerializer()): self.serializer = serializer def accept(self, context: MetricsContext) -> None: - print(self.serializer.serialize(context)) + for serialized_content in self.serializer.serialize(context): + print(serialized_content) @staticmethod def name() -> str: diff --git a/tests/serializer/test_log_serializer.py b/tests/serializer/test_log_serializer.py index 2a7eedd..2d7c05f 100644 --- a/tests/serializer/test_log_serializer.py +++ b/tests/serializer/test_log_serializer.py @@ -22,7 +22,7 @@ def test_serialize_dimensions(): context.put_dimensions(dimensions) # act - result_json = serializer.serialize(context) + result_json = serializer.serialize(context)[0] # assert assert_json_equality(result_json, expected) @@ -53,7 +53,7 @@ def test_cannot_serialize_more_than_9_dimensions(): context.put_dimensions(dimensions) # act - result_json = serializer.serialize(context) + result_json = serializer.serialize(context)[0] # assert assert_json_equality(result_json, expected) @@ -71,7 +71,7 @@ def test_serialize_properties(): context.set_property(expected_key, expected_value) # act - result_json = serializer.serialize(context) + result_json = serializer.serialize(context)[0] # assert assert_json_equality(result_json, expected) @@ -94,12 +94,89 @@ def test_serialize_metrics(): context.put_metric(expected_key, expected_value) # act - result_json = serializer.serialize(context) + result_json = serializer.serialize(context)[0] # assert assert_json_equality(result_json, expected) +def test_serialize_more_than_100_metrics(): + # arrange + expected_value = fake.word() + expected_batches = 3 + metrics = 295 + + context = get_context() + for index in range(metrics): + expected_key = f"Metric-{index}" + context.put_metric(expected_key, expected_value) + + # act + results = serializer.serialize(context) + + # assert + assert len(results) == expected_batches + + metric_index = 0 + for batch_index in range(expected_batches): + expected_metric_count = metrics % 100 if (batch_index == expected_batches - 1) else 100 + result_json = results[batch_index] + result_obj = json.loads(result_json) + assert len(result_obj["_aws"]["CloudWatchMetrics"][0]["Metrics"]) == expected_metric_count + + for index in range(expected_metric_count): + assert result_obj[f"Metric-{metric_index}"] == expected_value + metric_index += 1 + + +def test_serialize_with_multiple_metrics(): + # arrange + metrics = 2 + expected = {**get_empty_payload()} + context = get_context() + + for index in range(metrics): + expected_key = f"Metric-{index}" + expected_value = fake.word() + context.put_metric(expected_key, expected_value) + + expected_metric_definition = {"Name": expected_key, "Unit": "None"} + expected[expected_key] = expected_value + expected["_aws"]["CloudWatchMetrics"][0]["Metrics"].append( + expected_metric_definition + ) + + # act + results = serializer.serialize(context) + + # assert + assert len(results) == 1 + assert results == [json.dumps(expected)] + + +def test_serialize_metrics_with_multiple_datapoints(): + # arrange + expected_key = fake.word() + expected_values = [fake.word(), fake.word()] + expected_metric_definition = {"Name": expected_key, "Unit": "None"} + expected = {**get_empty_payload()} + expected[expected_key] = expected_values + expected["_aws"]["CloudWatchMetrics"][0]["Metrics"].append( + expected_metric_definition + ) + + context = get_context() + for expected_value in expected_values: + context.put_metric(expected_key, expected_value) + + # act + results = serializer.serialize(context) + + # assert + assert len(results) == 1 + assert results == [json.dumps(expected)] + + # Test utility method diff --git a/tests/sinks/test_agent_sink.py b/tests/sinks/test_agent_sink.py index 718337f..46c5ea8 100644 --- a/tests/sinks/test_agent_sink.py +++ b/tests/sinks/test_agent_sink.py @@ -1,5 +1,8 @@ +from aws_embedded_metrics.logger.metrics_context import MetricsContext from aws_embedded_metrics.sinks.agent_sink import AgentSink from aws_embedded_metrics.config import get_config +from unittest.mock import patch, Mock + Config = get_config() @@ -70,3 +73,23 @@ def test_fallback_to_default_endpoint_on_parse_failure(): assert sink.endpoint.hostname == expected_hostname assert sink.endpoint.port == expected_port assert sink.endpoint.scheme == expected_protocol + + +@patch("aws_embedded_metrics.sinks.agent_sink.get_socket_client") +def test_more_than_max_number_of_metrics(mock_get_socket_client): + # arrange + context = MetricsContext.empty() + expected_metrics = 401 + expected_send_message_calls = 5 + for index in range(expected_metrics): + context.put_metric(f"{index}", 1) + + mock_tcp_client = Mock() + mock_get_socket_client.return_value = mock_tcp_client + + # act + sink = AgentSink("") + sink.accept(context) + + # assert + assert expected_send_message_calls == mock_tcp_client.send_message.call_count diff --git a/tests/sinks/test_lambda_sink.py b/tests/sinks/test_lambda_sink.py index 373cc69..2856c49 100644 --- a/tests/sinks/test_lambda_sink.py +++ b/tests/sinks/test_lambda_sink.py @@ -1,5 +1,10 @@ from aws_embedded_metrics.sinks.lambda_sink import LambdaSink from aws_embedded_metrics.logger.metrics_context import MetricsContext +from faker import Faker +from unittest.mock import patch + + +fake = Faker() def test_accept_writes_to_stdout(capfd): @@ -17,3 +22,21 @@ def test_accept_writes_to_stdout(capfd): out == '{"_aws": {"Timestamp": 1, "CloudWatchMetrics": [{"Dimensions": [], "Metrics": [], "Namespace": "aws-embedded-metrics"}]}}\n' ) + + +@patch("aws_embedded_metrics.serializers.log_serializer.LogSerializer") +def test_accept_writes_multiple_messages_to_stdout(mock_serializer, capfd): + # arrange + expected_messages = [fake.word() for _ in range(10)] + mock_serializer.serialize.return_value = expected_messages + sink = LambdaSink(serializer=mock_serializer) + context = MetricsContext.empty() + context.meta["Timestamp"] = 1 + + # act + sink.accept(context) + + # assert + out, err = capfd.readouterr() + assert len(out.split()) == len(expected_messages) + assert out.split() == expected_messages