From dbcb38110c09cc65ce736943a0e55324a67995cc Mon Sep 17 00:00:00 2001 From: Azfaar Qureshi Date: Fri, 20 Nov 2020 13:54:29 -0500 Subject: [PATCH] Adding exporter methods --- dev-requirements.txt | 2 + .../prometheus_remote_write/__init__.py | 54 +++++++++++++-- .../test_prometheus_remote_write_exporter.py | 67 +++++++++++++++---- 3 files changed, 107 insertions(+), 16 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 4ef8070934..ea1e8a666a 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -12,3 +12,5 @@ readme-renderer~=24.0 grpcio-tools==1.29.0 mypy-protobuf>=1.23 protobuf>=3.13.0 +snappy==0.5.4 +requests==2.25.0 \ No newline at end of file diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py index b841c6600a..4e10beed26 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py @@ -11,9 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging import re from typing import Dict, Sequence +import requests + +import snappy from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import ( WriteRequest, ) @@ -35,6 +39,8 @@ ValueObserverAggregator, ) +logger = logging.getLogger(__name__) + class PrometheusRemoteWriteMetricsExporter(MetricsExporter): """ @@ -133,7 +139,10 @@ def headers(self, headers: Dict): def export( self, export_records: Sequence[ExportRecord] ) -> MetricsExportResult: - raise NotImplementedError() + timeseries = self.convert_to_timeseries(export_records) + message = self.build_message(timeseries) + headers = self.get_headers() + return self.send_message(message, headers) def shutdown(self) -> None: raise NotImplementedError() @@ -265,12 +274,49 @@ def create_label(self, name: str, value: str) -> Label: return label def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: - raise NotImplementedError() + write_request = WriteRequest() + write_request.timeseries.extend(timeseries) + serialized_message = write_request.SerializeToString() + return snappy.compress(serialized_message) def get_headers(self) -> Dict: - raise NotImplementedError() + headers = { + "Content-Encoding": "snappy", + "Content-Type": "application/x-protobuf", + "X-Prometheus-Remote-Write-Version": "0.1.0", + } + if hasattr(self, "headers"): + for header_name, header_value in self.headers.items(): + headers[header_name] = header_value + + if "Authorization" not in headers: + if hasattr(self, "bearer_token"): + headers["Authorization"] = "Bearer " + self.bearer_token + elif hasattr(self, "bearer_token_file"): + with open(self.bearer_token_file) as file: + headers["Authorization"] = "Bearer " + file.readline() + return headers def send_message( self, message: bytes, headers: Dict ) -> MetricsExportResult: - raise NotImplementedError() + auth = None + if hasattr(self, "basic_auth"): + basic_auth = self.basic_auth + if "password" in basic_auth: + auth = (basic_auth.username, basic_auth.password) + else: + with open(basic_auth.password_file) as file: + auth = (basic_auth.username, file.readline()) + response = requests.post( + self.endpoint, data=message, headers=headers, auth=auth + ) + if response.status_code != 200: + logger.warning( + "POST request failed with status %s with reason: %s and content: %s", + str(response.status_code), + response.reason, + str(response.content), + ) + return MetricsExportResult.FAILURE + return MetricsExportResult.SUCCESS diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py index 8cdd3b46b4..f7cd3db233 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py @@ -288,25 +288,68 @@ def test_create_timeseries(self): self.assertEqual(timeseries, expected_timeseries) +class ResponseStub: + def __init__(self, status_code): + self.status_code = status_code + self.reason = "dummy_reason" + self.content = "dummy_content" + + class TestExport(unittest.TestCase): # Initializes test data that is reused across tests def setUp(self): - pass + self._exporter = PrometheusRemoteWriteMetricsExporter( + endpoint="/prom/test_endpoint" + ) # Ensures export is successful with valid export_records and config - def test_export(self): - pass - - def test_valid_send_message(self): - pass - - def test_invalid_send_message(self): - pass + @mock.patch("requests.post", return_value=ResponseStub(200)) + def test_export(self, mock_post): + test_metric = Counter("testname", "testdesc", "testunit", int, None) + labels = {"environment": "testing"} + record = ExportRecord( + test_metric, labels, SumAggregator(), Resource({}), + ) + result = self._exporter.export([record]) + self.assertIs(result, MetricsExportResult.SUCCESS) + self.assertEqual(mock_post.call_count, 1) + + @mock.patch("requests.post", return_value=ResponseStub(200)) + def test_valid_send_message(self, mock_post): + result = self._exporter.send_message(bytes(), {}) + self.assertEqual(mock_post.call_count, 1) + self.assertEqual(result, MetricsExportResult.SUCCESS) + + @mock.patch("requests.post", return_value=ResponseStub(404)) + def test_invalid_send_message(self, mock_post): + result = self._exporter.send_message(bytes(), {}) + self.assertEqual(mock_post.call_count, 1) + self.assertEqual(result, MetricsExportResult.FAILURE) # Verifies that build_message calls snappy.compress and returns SerializedString - def test_build_message(self): - pass + @mock.patch("snappy.compress", return_value=bytes()) + def test_build_message(self, mock_compress): + test_timeseries = [ + TimeSeries(), + TimeSeries(), + ] + message = self._exporter.build_message(test_timeseries) + self.assertEqual(mock_compress.call_count, 1) + self.assertIsInstance(message, bytes) # Ensure correct headers are added when valid config is provided def test_get_headers(self): - pass + self._exporter.headers = {"Custom Header": "test_header"} + self._exporter.headers = {"Custom Header": "test_header"} + self._exporter.bearer_token = "test_token" + + headers = self._exporter.get_headers() + self.assertEqual(headers.get("Content-Encoding", ""), "snappy") + self.assertEqual( + headers.get("Content-Type", ""), "application/x-protobuf" + ) + self.assertEqual( + headers.get("X-Prometheus-Remote-Write-Version", ""), "0.1.0" + ) + self.assertEqual(headers.get("Authorization", ""), "Bearer test_token") + self.assertEqual(headers.get("Custom Header", ""), "test_header")