Skip to content

Commit

Permalink
Adding exporter methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Azfaar Qureshi committed Nov 23, 2020
1 parent a6732c4 commit dbcb381
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 16 deletions.
2 changes: 2 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ readme-renderer~=24.0
grpcio-tools==1.29.0
mypy-protobuf>=1.23
protobuf>=3.13.0
snappy==0.5.4
requests==2.25.0
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
from typing import Dict, Sequence

import requests

import snappy
from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
WriteRequest,
)
Expand All @@ -35,6 +39,8 @@
ValueObserverAggregator,
)

logger = logging.getLogger(__name__)


class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
"""
Expand Down Expand Up @@ -133,7 +139,10 @@ def headers(self, headers: Dict):
def export(
self, export_records: Sequence[ExportRecord]
) -> MetricsExportResult:
raise NotImplementedError()
timeseries = self.convert_to_timeseries(export_records)
message = self.build_message(timeseries)
headers = self.get_headers()
return self.send_message(message, headers)

def shutdown(self) -> None:
raise NotImplementedError()
Expand Down Expand Up @@ -265,12 +274,49 @@ def create_label(self, name: str, value: str) -> Label:
return label

def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
raise NotImplementedError()
write_request = WriteRequest()
write_request.timeseries.extend(timeseries)
serialized_message = write_request.SerializeToString()
return snappy.compress(serialized_message)

def get_headers(self) -> Dict:
raise NotImplementedError()
headers = {
"Content-Encoding": "snappy",
"Content-Type": "application/x-protobuf",
"X-Prometheus-Remote-Write-Version": "0.1.0",
}
if hasattr(self, "headers"):
for header_name, header_value in self.headers.items():
headers[header_name] = header_value

if "Authorization" not in headers:
if hasattr(self, "bearer_token"):
headers["Authorization"] = "Bearer " + self.bearer_token
elif hasattr(self, "bearer_token_file"):
with open(self.bearer_token_file) as file:
headers["Authorization"] = "Bearer " + file.readline()
return headers

def send_message(
self, message: bytes, headers: Dict
) -> MetricsExportResult:
raise NotImplementedError()
auth = None
if hasattr(self, "basic_auth"):
basic_auth = self.basic_auth
if "password" in basic_auth:
auth = (basic_auth.username, basic_auth.password)
else:
with open(basic_auth.password_file) as file:
auth = (basic_auth.username, file.readline())
response = requests.post(
self.endpoint, data=message, headers=headers, auth=auth
)
if response.status_code != 200:
logger.warning(
"POST request failed with status %s with reason: %s and content: %s",
str(response.status_code),
response.reason,
str(response.content),
)
return MetricsExportResult.FAILURE
return MetricsExportResult.SUCCESS
Original file line number Diff line number Diff line change
Expand Up @@ -288,25 +288,68 @@ def test_create_timeseries(self):
self.assertEqual(timeseries, expected_timeseries)


class ResponseStub:
def __init__(self, status_code):
self.status_code = status_code
self.reason = "dummy_reason"
self.content = "dummy_content"


class TestExport(unittest.TestCase):
# Initializes test data that is reused across tests
def setUp(self):
pass
self._exporter = PrometheusRemoteWriteMetricsExporter(
endpoint="/prom/test_endpoint"
)

# Ensures export is successful with valid export_records and config
def test_export(self):
pass

def test_valid_send_message(self):
pass

def test_invalid_send_message(self):
pass
@mock.patch("requests.post", return_value=ResponseStub(200))
def test_export(self, mock_post):
test_metric = Counter("testname", "testdesc", "testunit", int, None)
labels = {"environment": "testing"}
record = ExportRecord(
test_metric, labels, SumAggregator(), Resource({}),
)
result = self._exporter.export([record])
self.assertIs(result, MetricsExportResult.SUCCESS)
self.assertEqual(mock_post.call_count, 1)

@mock.patch("requests.post", return_value=ResponseStub(200))
def test_valid_send_message(self, mock_post):
result = self._exporter.send_message(bytes(), {})
self.assertEqual(mock_post.call_count, 1)
self.assertEqual(result, MetricsExportResult.SUCCESS)

@mock.patch("requests.post", return_value=ResponseStub(404))
def test_invalid_send_message(self, mock_post):
result = self._exporter.send_message(bytes(), {})
self.assertEqual(mock_post.call_count, 1)
self.assertEqual(result, MetricsExportResult.FAILURE)

# Verifies that build_message calls snappy.compress and returns SerializedString
def test_build_message(self):
pass
@mock.patch("snappy.compress", return_value=bytes())
def test_build_message(self, mock_compress):
test_timeseries = [
TimeSeries(),
TimeSeries(),
]
message = self._exporter.build_message(test_timeseries)
self.assertEqual(mock_compress.call_count, 1)
self.assertIsInstance(message, bytes)

# Ensure correct headers are added when valid config is provided
def test_get_headers(self):
pass
self._exporter.headers = {"Custom Header": "test_header"}
self._exporter.headers = {"Custom Header": "test_header"}
self._exporter.bearer_token = "test_token"

headers = self._exporter.get_headers()
self.assertEqual(headers.get("Content-Encoding", ""), "snappy")
self.assertEqual(
headers.get("Content-Type", ""), "application/x-protobuf"
)
self.assertEqual(
headers.get("X-Prometheus-Remote-Write-Version", ""), "0.1.0"
)
self.assertEqual(headers.get("Authorization", ""), "Bearer test_token")
self.assertEqual(headers.get("Custom Header", ""), "test_header")

0 comments on commit dbcb381

Please sign in to comment.