|
5 | 5 | import os
|
6 | 6 | import pathlib
|
7 | 7 | from enum import Enum
|
8 |
| -from typing import Dict, List, Union |
| 8 | +from typing import Any, Dict, List, Union |
9 | 9 |
|
10 | 10 | import fastjsonschema
|
11 | 11 |
|
@@ -78,14 +78,20 @@ class MetricManager:
|
78 | 78 | """
|
79 | 79 |
|
80 | 80 | def __init__(
|
81 |
| - self, metric_set: Dict[str, str] = None, dimension_set: Dict = None, namespace: str = None, service: str = None |
| 81 | + self, |
| 82 | + metric_set: Dict[str, str] = None, |
| 83 | + dimension_set: Dict = None, |
| 84 | + namespace: str = None, |
| 85 | + metadata_set: Dict[str, Any] = None, |
| 86 | + service: str = None, |
82 | 87 | ):
|
83 | 88 | self.metric_set = metric_set if metric_set is not None else {}
|
84 | 89 | self.dimension_set = dimension_set if dimension_set is not None else {}
|
85 | 90 | self.namespace = namespace or os.getenv("POWERTOOLS_METRICS_NAMESPACE")
|
86 | 91 | self.service = service or os.environ.get("POWERTOOLS_SERVICE_NAME")
|
87 | 92 | self._metric_units = [unit.value for unit in MetricUnit]
|
88 | 93 | self._metric_unit_options = list(MetricUnit.__members__)
|
| 94 | + self.metadata_set = self.metadata_set if metadata_set is not None else {} |
89 | 95 |
|
90 | 96 | def add_metric(self, name: str, unit: MetricUnit, value: Union[float, int]):
|
91 | 97 | """Adds given metric
|
@@ -131,7 +137,7 @@ def add_metric(self, name: str, unit: MetricUnit, value: Union[float, int]):
|
131 | 137 | # since we could have more than 100 metrics
|
132 | 138 | self.metric_set.clear()
|
133 | 139 |
|
134 |
| - def serialize_metric_set(self, metrics: Dict = None, dimensions: Dict = None) -> Dict: |
| 140 | + def serialize_metric_set(self, metrics: Dict = None, dimensions: Dict = None, metadata: Dict = None) -> Dict: |
135 | 141 | """Serializes metric and dimensions set
|
136 | 142 |
|
137 | 143 | Parameters
|
@@ -165,39 +171,48 @@ def serialize_metric_set(self, metrics: Dict = None, dimensions: Dict = None) ->
|
165 | 171 | if dimensions is None: # pragma: no cover
|
166 | 172 | dimensions = self.dimension_set
|
167 | 173 |
|
| 174 | + if metadata is None: # pragma: no cover |
| 175 | + metadata = self.metadata_set |
| 176 | + |
168 | 177 | if self.service and not self.dimension_set.get("service"):
|
169 | 178 | self.dimension_set["service"] = self.service
|
170 | 179 |
|
171 | 180 | logger.debug("Serializing...", {"metrics": metrics, "dimensions": dimensions})
|
172 | 181 |
|
173 |
| - dimension_keys: List[str] = list(dimensions.keys()) |
174 |
| - metric_names_unit: List[Dict[str, str]] = [] |
175 |
| - metric_set: Dict[str, str] = {} |
| 182 | + metric_names_and_units: List[Dict[str, str]] = [] # [ { "Name": "metric_name", "Unit": "Count" } ] |
| 183 | + metric_names_and_values: Dict[str, str] = {} # { "metric_name": 1.0 } |
176 | 184 |
|
177 | 185 | for metric_name in metrics:
|
178 | 186 | metric: str = metrics[metric_name]
|
179 | 187 | metric_value: int = metric.get("Value", 0)
|
180 | 188 | metric_unit: str = metric.get("Unit", "")
|
181 | 189 |
|
182 |
| - metric_names_unit.append({"Name": metric_name, "Unit": metric_unit}) |
183 |
| - metric_set.update({metric_name: metric_value}) |
184 |
| - |
185 |
| - metrics_definition = { |
186 |
| - "CloudWatchMetrics": [ |
187 |
| - {"Namespace": self.namespace, "Dimensions": [dimension_keys], "Metrics": metric_names_unit} |
188 |
| - ] |
| 190 | + metric_names_and_units.append({"Name": metric_name, "Unit": metric_unit}) |
| 191 | + metric_names_and_values.update({metric_name: metric_value}) |
| 192 | + |
| 193 | + embedded_metrics_object = { |
| 194 | + "_aws": { |
| 195 | + "Timestamp": int(datetime.datetime.now().timestamp() * 1000), # epoch |
| 196 | + "CloudWatchMetrics": [ |
| 197 | + { |
| 198 | + "Namespace": self.namespace, # "test_namespace" |
| 199 | + "Dimensions": [list(dimensions.keys())], # [ "service" ] |
| 200 | + "Metrics": metric_names_and_units, |
| 201 | + } |
| 202 | + ], |
| 203 | + }, |
| 204 | + **dimensions, # "service": "test_service" |
| 205 | + **metadata, # "username": "test" |
| 206 | + **metric_names_and_values, # "single_metric": 1.0 |
189 | 207 | }
|
190 |
| - metrics_timestamp = {"Timestamp": int(datetime.datetime.now().timestamp() * 1000)} |
191 |
| - metric_set["_aws"] = {**metrics_timestamp, **metrics_definition} |
192 |
| - metric_set.update(**dimensions) |
193 | 208 |
|
194 | 209 | try:
|
195 |
| - logger.debug("Validating serialized metrics against CloudWatch EMF schema", metric_set) |
196 |
| - fastjsonschema.validate(definition=CLOUDWATCH_EMF_SCHEMA, data=metric_set) |
| 210 | + logger.debug("Validating serialized metrics against CloudWatch EMF schema", embedded_metrics_object) |
| 211 | + fastjsonschema.validate(definition=CLOUDWATCH_EMF_SCHEMA, data=embedded_metrics_object) |
197 | 212 | except fastjsonschema.JsonSchemaException as e:
|
198 | 213 | message = f"Invalid format. Error: {e.message}, Invalid item: {e.name}" # noqa: B306, E501
|
199 | 214 | raise SchemaValidationError(message)
|
200 |
| - return metric_set |
| 215 | + return embedded_metrics_object |
201 | 216 |
|
202 | 217 | def add_dimension(self, name: str, value: str):
|
203 | 218 | """Adds given dimension to all metrics
|
@@ -225,6 +240,38 @@ def add_dimension(self, name: str, value: str):
|
225 | 240 | else:
|
226 | 241 | self.dimension_set[name] = str(value)
|
227 | 242 |
|
| 243 | + def add_metadata(self, key: str, value: Any): |
| 244 | + """Adds high cardinal metadata for metrics object |
| 245 | +
|
| 246 | + This will not be available during metrics visualization. |
| 247 | + Instead, this will be searchable through logs. |
| 248 | +
|
| 249 | + If you're looking to add metadata to filter metrics, then |
| 250 | + use add_dimensions method. |
| 251 | +
|
| 252 | + Example |
| 253 | + ------- |
| 254 | + **Add metrics metadata** |
| 255 | +
|
| 256 | + metric.add_metadata(key="booking_id", value="booking_id") |
| 257 | +
|
| 258 | + Parameters |
| 259 | + ---------- |
| 260 | + name : str |
| 261 | + Metadata key |
| 262 | + value : any |
| 263 | + Metadata value |
| 264 | + """ |
| 265 | + logger.debug(f"Adding metadata: {key}:{value}") |
| 266 | + |
| 267 | + # Cast key to str according to EMF spec |
| 268 | + # Majority of keys are expected to be string already, so |
| 269 | + # checking before casting improves performance in most cases |
| 270 | + if isinstance(key, str): |
| 271 | + self.metadata_set[key] = value |
| 272 | + else: |
| 273 | + self.metadata_set[str(key)] = value |
| 274 | + |
228 | 275 | def __extract_metric_unit_value(self, unit: Union[str, MetricUnit]) -> str:
|
229 | 276 | """Return metric value from metric unit whether that's str or MetricUnit enum
|
230 | 277 |
|
|
0 commit comments