Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus Remote Write Exporter (4/7) #12

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@
((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206])
- Add conversion to TimeSeries methods
((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207])
- Add request methods
((#212)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/212])
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import re
from typing import Dict, Sequence

import requests

import snappy
from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
WriteRequest,
)
Expand All @@ -36,6 +40,8 @@
ValueObserverAggregator,
)

logger = logging.getLogger(__name__)


class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
"""
Expand Down Expand Up @@ -148,10 +154,13 @@ def headers(self, headers: Dict):
def export(
self, export_records: Sequence[ExportRecord]
) -> MetricsExportResult:
raise NotImplementedError()
timeseries = self.convert_to_timeseries(export_records)
message = self.build_message(timeseries)
headers = self.get_headers()
return self.send_message(message, headers)

def shutdown(self) -> None:
raise NotImplementedError()
pass

def convert_to_timeseries(
self, export_records: Sequence[ExportRecord]
Expand Down Expand Up @@ -280,12 +289,66 @@ def create_label(self, name: str, value: str) -> Label:
return label

def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
raise NotImplementedError()
write_request = WriteRequest()
write_request.timeseries.extend(timeseries)
serialized_message = write_request.SerializeToString()
return snappy.compress(serialized_message)

def get_headers(self) -> Dict:
raise NotImplementedError()
headers = {
"Content-Encoding": "snappy",
"Content-Type": "application/x-protobuf",
"X-Prometheus-Remote-Write-Version": "0.1.0",
}
if self.headers:
for header_name, header_value in self.headers.items():
headers[header_name] = header_value
return headers

def send_message(
self, message: bytes, headers: Dict
) -> MetricsExportResult:
raise NotImplementedError()
auth = None
if self.basic_auth:
basic_auth = self.basic_auth
if "password" in basic_auth:
auth = (basic_auth.username, basic_auth.password)
else:
with open(basic_auth.password_file) as file:
auth = (basic_auth.username, file.readline())

cert = None
verify = True
if self.tls_config:
if "ca_file" in self.tls_config:
verify = self.tls_config["ca_file"]
elif "insecure_skip_verify" in self.tls_config:
verify = self.tls_config["insecure_skip_verify"]

if (
"cert_file" in self.tls_config
and "key_file" in self.tls_config
):
cert = (
self.tls_config["cert_file"],
self.tls_config["key_file"],
)
response = requests.post(
self.endpoint,
data=message,
headers=headers,
auth=auth,
timeout=self.timeout,
proxies=self.proxies,
cert=cert,
verify=verify,
)
if response.status_code != 200:
logger.warning(
"POST request failed with status %s with reason: %s and content: %s",
str(response.status_code),
response.reason,
str(response.content),
)
return MetricsExportResult.FAILURE
return MetricsExportResult.SUCCESS
Original file line number Diff line number Diff line change
Expand Up @@ -313,25 +313,65 @@ def test_create_timeseries(self):
self.assertEqual(timeseries, expected_timeseries)


class ResponseStub:
def __init__(self, status_code):
self.status_code = status_code
self.reason = "dummy_reason"
self.content = "dummy_content"


class TestExport(unittest.TestCase):
# Initializes test data that is reused across tests
def setUp(self):
pass
self._exporter = PrometheusRemoteWriteMetricsExporter(
endpoint="/prom/test_endpoint"
)

# Ensures export is successful with valid export_records and config
def test_export(self):
pass

def test_valid_send_message(self):
pass

def test_invalid_send_message(self):
pass
@mock.patch("requests.post", return_value=ResponseStub(200))
def test_export(self, mock_post):
test_metric = Counter("testname", "testdesc", "testunit", int, None)
labels = {"environment": "testing"}
record = ExportRecord(
test_metric, labels, SumAggregator(), Resource({}),
)
result = self._exporter.export([record])
self.assertIs(result, MetricsExportResult.SUCCESS)
self.assertEqual(mock_post.call_count, 1)

@mock.patch("requests.post", return_value=ResponseStub(200))
def test_valid_send_message(self, mock_post):
result = self._exporter.send_message(bytes(), {})
self.assertEqual(mock_post.call_count, 1)
self.assertEqual(result, MetricsExportResult.SUCCESS)

@mock.patch("requests.post", return_value=ResponseStub(404))
def test_invalid_send_message(self, mock_post):
result = self._exporter.send_message(bytes(), {})
self.assertEqual(mock_post.call_count, 1)
self.assertEqual(result, MetricsExportResult.FAILURE)

# Verifies that build_message calls snappy.compress and returns SerializedString
def test_build_message(self):
pass
@mock.patch("snappy.compress", return_value=bytes())
def test_build_message(self, mock_compress):
test_timeseries = [
TimeSeries(),
TimeSeries(),
]
message = self._exporter.build_message(test_timeseries)
self.assertEqual(mock_compress.call_count, 1)
self.assertIsInstance(message, bytes)

# Ensure correct headers are added when valid config is provided
def test_get_headers(self):
pass
self._exporter.headers = {"Custom Header": "test_header"}

headers = self._exporter.get_headers()
self.assertEqual(headers.get("Content-Encoding", ""), "snappy")
self.assertEqual(
headers.get("Content-Type", ""), "application/x-protobuf"
)
self.assertEqual(
headers.get("X-Prometheus-Remote-Write-Version", ""), "0.1.0"
)
self.assertEqual(headers.get("Custom Header", ""), "test_header")