Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus Remote Write Exporter (3/7) #11

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from typing import Dict, Sequence

from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
WriteRequest,
)
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import (
Label,
Sample,
Expand All @@ -24,6 +27,13 @@
MetricsExporter,
MetricsExportResult,
)
from opentelemetry.sdk.metrics.export.aggregate import (
HistogramAggregator,
LastValueAggregator,
MinMaxSumCountAggregator,
SumAggregator,
ValueObserverAggregator,
)


class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
Expand Down Expand Up @@ -131,31 +141,88 @@ def shutdown(self) -> None:
def convert_to_timeseries(
self, export_records: Sequence[ExportRecord]
) -> Sequence[TimeSeries]:
raise NotImplementedError()
converter_map = {
MinMaxSumCountAggregator: self.convert_from_min_max_sum_count,
SumAggregator: self.convert_from_sum,
HistogramAggregator: self.convert_from_histogram,
LastValueAggregator: self.convert_from_last_value,
ValueObserverAggregator: self.convert_from_last_value,
}
timeseries = []
for export_record in export_records:
aggregator_type = type(export_record.aggregator)
converter = converter_map.get(aggregator_type)
if not converter:
raise ValueError(
str(aggregator_type) + " conversion is not supported"
)
timeseries.extend(converter(export_record))
return timeseries

def convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries:
raise NotImplementedError()
name = sum_record.instrument.name
value = sum_record.aggregator.checkpoint
return [self.create_timeseries(sum_record, name, value)]

def convert_from_min_max_sum_count(
self, min_max_sum_count_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
timeseries = []
agg_types = ["min", "max", "sum", "count"]
for agg_type in agg_types:
name = min_max_sum_count_record.instrument.name + "_" + agg_type
value = getattr(
min_max_sum_count_record.aggregator.checkpoint, agg_type
)
timeseries.append(
self.create_timeseries(min_max_sum_count_record, name, value)
)
return timeseries

def convert_from_histogram(
self, histogram_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
count = 0
timeseries = []
for bound in histogram_record.aggregator.checkpoint.keys():
bb = "+Inf" if bound == float("inf") else str(bound)
name = (
histogram_record.instrument.name + '_bucket{le="' + bb + '"}'
)
value = histogram_record.aggregator.checkpoint[bound]
timeseries.append(
self.create_timeseries(histogram_record, name, value)
)
count += value
name = histogram_record.instrument.name + "_count"
timeseries.append(
self.create_timeseries(histogram_record, name, float(count))
)
return timeseries

def convert_from_last_value(
self, last_value_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
name = last_value_record.instrument.name
value = last_value_record.aggregator.checkpoint
return [self.create_timeseries(last_value_record, name, value)]

def convert_from_value_observer(
self, value_observer_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
timeseries = []
agg_types = ["min", "max", "sum", "count", "last"]
for agg_type in agg_types:
name = value_observer_record.instrument.name + "_" + agg_type
value = getattr(
value_observer_record.aggregator.checkpoint, agg_type
)
timeseries.append(
self.create_timeseries(value_observer_record, name, value)
)
return timeseries

# TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries
def convert_from_quantile(
self, summary_record: ExportRecord

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For things you haven't written yet might want to throw NotImplementedError

) -> TimeSeries:
Expand All @@ -165,13 +232,37 @@ def convert_from_quantile(
def create_timeseries(
self, export_record: ExportRecord, name, value: float
) -> TimeSeries:
raise NotImplementedError()
timeseries = TimeSeries()
# Add name label, record labels and resource labels
timeseries.labels.append(self.create_label("__name__", name))
resource_attributes = export_record.resource.attributes
for label_name, label_value in resource_attributes.items():
timeseries.labels.append(
self.create_label(label_name, label_value)
)
for label in export_record.labels:
if label[0] not in resource_attributes.keys():
timeseries.labels.append(self.create_label(label[0], label[1]))
# Add sample
timeseries.samples.append(
self.create_sample(
export_record.aggregator.last_update_timestamp, value
)
)
return timeseries

def create_sample(self, timestamp: int, value: float) -> Sample:
raise NotImplementedError()
sample = Sample()
sample.timestamp = int(timestamp / 1000000)
sample.value = value
return sample

def create_label(self, name: str, value: str) -> Label:
raise NotImplementedError()
label = Label()
# Label name must contain only alphanumeric characters and underscores
label.name = re.sub("[^0-9a-zA-Z_]+", "_", name)
label.value = value
return label

def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
raise NotImplementedError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,25 @@
# limitations under the License.

import unittest
from unittest import mock

from opentelemetry.exporter.prometheus_remote_write import (
PrometheusRemoteWriteMetricsExporter,
)
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import (
TimeSeries,
)
from opentelemetry.sdk.metrics import Counter
from opentelemetry.sdk.metrics.export import ExportRecord, MetricsExportResult
from opentelemetry.sdk.metrics.export.aggregate import (
HistogramAggregator,
LastValueAggregator,
MinMaxSumCountAggregator,
SumAggregator,
ValueObserverAggregator,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util import get_dict_as_key


class TestValidation(unittest.TestCase):
Expand Down Expand Up @@ -90,35 +105,151 @@ def test_invalid_conflicting_auth_param(self):
class TestConversion(unittest.TestCase):
# Initializes test data that is reused across tests
def setUp(self):
pass
self._test_metric = Counter(
"testname", "testdesc", "testunit", int, None
)
self._exporter = PrometheusRemoteWriteMetricsExporter(
endpoint="/prom/test_endpoint"
)

def generate_record(aggregator_type):
return ExportRecord(
self._test_metric, None, aggregator_type(), Resource({}),
)

self._generate_record = generate_record

def converter_method(record, name, value):
return (type(record.aggregator), name, value)

self._converter_mock = mock.MagicMock(return_value=converter_method)

# Ensures conversion to timeseries function works with valid aggregation types
def test_valid_convert_to_timeseries(self):
pass
timeseries_mock_method = mock.Mock(return_value=["test_value"])
self._exporter.convert_from_sum = timeseries_mock_method
self._exporter.convert_from_min_max_sum_count = timeseries_mock_method
self._exporter.convert_from_histogram = timeseries_mock_method
self._exporter.convert_from_last_value = timeseries_mock_method
self._exporter.convert_from_value_observer = timeseries_mock_method
test_records = [
self._generate_record(SumAggregator),
self._generate_record(MinMaxSumCountAggregator),
self._generate_record(HistogramAggregator),
self._generate_record(LastValueAggregator),
self._generate_record(ValueObserverAggregator),
]
data = self._exporter.convert_to_timeseries(test_records)
self.assertEqual(len(data), 5)
for timeseries in data:
self.assertEqual(timeseries, "test_value")

no_type_records = [self._generate_record(lambda: None)]
with self.assertRaises(ValueError):
self._exporter.convert_to_timeseries(no_type_records)

# Ensures conversion to timeseries fails for unsupported aggregation types
def test_invalid_convert_to_timeseries(self):
pass
no_type_records = [self._generate_record(lambda: None)]
with self.assertRaises(ValueError):
self._exporter.convert_to_timeseries(no_type_records)

# Ensures sum aggregator is correctly converted to timeseries
def test_convert_from_sum(self):
pass
sum_record = self._generate_record(SumAggregator)
sum_record.aggregator.update(3)
sum_record.aggregator.update(2)
sum_record.aggregator.take_checkpoint()

self._exporter.create_timeseries = self._converter_mock()
timeseries = self._exporter.convert_from_sum(sum_record)
self.assertEqual(timeseries[0], (SumAggregator, "testname", 5))

# Ensures sum min_max_count aggregator is correctly converted to timeseries
def test_convert_from_min_max_sum_count(self):
pass
min_max_sum_count_record = self._generate_record(
MinMaxSumCountAggregator
)
min_max_sum_count_record.aggregator.update(5)
min_max_sum_count_record.aggregator.update(1)
min_max_sum_count_record.aggregator.take_checkpoint()

self._exporter.create_timeseries = self._converter_mock()
timeseries = self._exporter.convert_from_min_max_sum_count(
min_max_sum_count_record
)
self.assertEqual(
timeseries[0], (MinMaxSumCountAggregator, "testname_min", 1)
)
self.assertEqual(
timeseries[1], (MinMaxSumCountAggregator, "testname_max", 5)
)
self.assertEqual(
timeseries[2], (MinMaxSumCountAggregator, "testname_sum", 6)
)
self.assertEqual(
timeseries[3], (MinMaxSumCountAggregator, "testname_count", 2)
)

# Ensures histogram aggregator is correctly converted to timeseries
def test_convert_from_histogram(self):
pass
histogram_record = self._generate_record(HistogramAggregator)
histogram_record.aggregator.update(5)
histogram_record.aggregator.update(2)
histogram_record.aggregator.update(-1)
histogram_record.aggregator.take_checkpoint()

self._exporter.create_timeseries = self._converter_mock()
timeseries = self._exporter.convert_from_histogram(histogram_record)
self.assertEqual(
timeseries[0], (HistogramAggregator, 'testname_bucket{le="0"}', 1)
)
self.assertEqual(
timeseries[1],
(HistogramAggregator, 'testname_bucket{le="+Inf"}', 2),
)
self.assertEqual(
timeseries[2], (HistogramAggregator, "testname_count", 3)
)

# Ensures last value aggregator is correctly converted to timeseries
def test_convert_from_last_value(self):
pass
last_value_record = self._generate_record(LastValueAggregator)
last_value_record.aggregator.update(1)
last_value_record.aggregator.update(5)
last_value_record.aggregator.take_checkpoint()

self._exporter.create_timeseries = self._converter_mock()
timeseries = self._exporter.convert_from_last_value(last_value_record)
self.assertEqual(timeseries[0], (LastValueAggregator, "testname", 5))

# Ensures value observer aggregator is correctly converted to timeseries
def test_convert_from_value_observer(self):
pass
value_observer_record = self._generate_record(ValueObserverAggregator)
value_observer_record.aggregator.update(5)
value_observer_record.aggregator.update(1)
value_observer_record.aggregator.update(2)
value_observer_record.aggregator.take_checkpoint()

self._exporter.create_timeseries = self._converter_mock()
timeseries = self._exporter.convert_from_value_observer(
value_observer_record
)
self.assertEqual(
timeseries[0], (ValueObserverAggregator, "testname_min", 1)
)
self.assertEqual(
timeseries[1], (ValueObserverAggregator, "testname_max", 5)
)
self.assertEqual(
timeseries[2], (ValueObserverAggregator, "testname_sum", 8)
)
self.assertEqual(
timeseries[3], (ValueObserverAggregator, "testname_count", 3)
)
self.assertEqual(
timeseries[4], (ValueObserverAggregator, "testname_last", 2)
)

# Ensures quantile aggregator is correctly converted to timeseries
# TODO: Add test once method is implemented
Expand All @@ -127,7 +258,34 @@ def test_convert_from_quantile(self):

# Ensures timeseries produced contains appropriate sample and labels
def test_create_timeseries(self):
pass
sum_aggregator = SumAggregator()
sum_aggregator.update(5)
sum_aggregator.take_checkpoint()
sum_aggregator.last_update_timestamp = 10
export_record = ExportRecord(
self._test_metric,
get_dict_as_key({"record_name": "record_value"}),
sum_aggregator,
Resource({"resource_name": "resource_value"}),
)

expected_timeseries = TimeSeries()
expected_timeseries.labels.append(
self._exporter.create_label("__name__", "testname")
)
expected_timeseries.labels.append(
self._exporter.create_label("resource_name", "resource_value")
)
expected_timeseries.labels.append(
self._exporter.create_label("record_name", "record_value")
)
expected_timeseries.samples.append(
self._exporter.create_sample(10, 5.0),
)
timeseries = self._exporter.create_timeseries(
export_record, "testname", 5.0,
)
self.assertEqual(timeseries, expected_timeseries)


class TestExport(unittest.TestCase):
Expand Down