Skip to content

Commit

Permalink
add http input connector metrics (#578)
Browse files Browse the repository at this point in the history
* add queue size metric
* add setting number_http_requests
* add tests for metrics
* update changelog
  • Loading branch information
ekneg54 authored Apr 29, 2024
1 parent 0e5198d commit 1f08a51
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 17 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
* expose metrics via uvicorn webserver
* makes all uvicorn configuration options possible
* add security best practices to server configuration
* add following metrics to `http_input` connector
* `nummer_of_http_requests`
* `message_backlog_size`

### Improvements

Expand Down
61 changes: 48 additions & 13 deletions logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@
)

from logprep.abc.input import FatalInputError, Input
from logprep.metrics.metrics import CounterMetric, GaugeMetric
from logprep.util import http
from logprep.util.credentials import CredentialsFactory


def decorator_basic_auth(func: Callable):
def basic_auth(func: Callable):
"""Decorator to check basic authentication.
Will raise 401 on wrong credentials or missing Authorization-Header"""

Expand All @@ -120,7 +121,7 @@ async def func_wrapper(*args, **kwargs):
return func_wrapper


def decorator_request_exceptions(func: Callable):
def handle_request_exceptions(func: Callable):
"""Decorator to wrap http calls and raise exceptions"""

async def func_wrapper(*args, **kwargs):
Expand All @@ -143,7 +144,7 @@ async def func_wrapper(*args, **kwargs):
return func_wrapper


def decorator_add_metadata(func: Callable):
def add_metadata(func: Callable):
"""Decorator to add metadata to resulting http event.
Uses attribute collect_meta of endpoint class to decide over metadata collection
Uses attribute metafield_name to define key name for metadata
Expand Down Expand Up @@ -199,27 +200,34 @@ def __init__(
collect_meta: bool,
metafield_name: str,
credentials: dict,
metrics: "HttpConnector.Metrics",
) -> None:
self.messages = messages
self.collect_meta = collect_meta
self.metafield_name = metafield_name
self.credentials = credentials
self.metrics = metrics
if self.credentials:
self.basicauth_b64 = b64encode(
f"{self.credentials.username}:{self.credentials.password}".encode("utf-8")
).decode("utf-8")

def collect_metrics(self):
"""Increment number of requests"""
self.metrics.number_of_http_requests += 1


class JSONHttpEndpoint(HttpEndpoint):
""":code:`json` endpoint to get json from request"""

_decoder = msgspec.json.Decoder()

@decorator_request_exceptions
@decorator_basic_auth
@decorator_add_metadata
@handle_request_exceptions
@basic_auth
@add_metadata
async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ
"""json endpoint method"""
self.collect_metrics()
data = await req.stream.read()
data = data.decode("utf8")
metadata = kwargs.get("metadata", {})
Expand All @@ -233,11 +241,12 @@ class JSONLHttpEndpoint(HttpEndpoint):

_decoder = msgspec.json.Decoder()

@decorator_request_exceptions
@decorator_basic_auth
@decorator_add_metadata
@handle_request_exceptions
@basic_auth
@add_metadata
async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ
"""jsonl endpoint method"""
self.collect_metrics()
data = await req.stream.read()
data = data.decode("utf8")
event = kwargs.get("metadata", {})
Expand All @@ -252,11 +261,12 @@ class PlaintextHttpEndpoint(HttpEndpoint):
""":code:`plaintext` endpoint to get the body from request
and put it in :code:`message` field"""

@decorator_request_exceptions
@decorator_basic_auth
@decorator_add_metadata
@handle_request_exceptions
@basic_auth
@add_metadata
async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ
"""plaintext endpoint method"""
self.collect_metrics()
data = await req.stream.read()
metadata = kwargs.get("metadata", {})
event = {"message": data.decode("utf8")}
Expand All @@ -266,6 +276,26 @@ async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-diff
class HttpConnector(Input):
"""Connector to accept log messages as http post requests"""

@define(kw_only=True)
class Metrics(Input.Metrics):
"""Tracks statistics about this connector"""

number_of_http_requests: CounterMetric = field(
factory=lambda: CounterMetric(
description="Number of incomming requests",
name="number_of_http_requests",
)
)
"""Number of incomming requests"""

message_backlog_size: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Size of the message backlog queue",
name="message_backlog_size",
)
)
"""Size of the message backlog queue"""

@define(kw_only=True)
class Config(Input.Config):
"""Config for HTTPInput"""
Expand Down Expand Up @@ -398,7 +428,11 @@ def setup(self):
endpoint_class = self._endpoint_registry.get(endpoint_type)
credentials = cred_factory.from_endpoint(endpoint_path)
endpoints_config[endpoint_path] = endpoint_class(
self.messages, collect_meta, metafield_name, credentials
self.messages,
collect_meta,
metafield_name,
credentials,
self.metrics,
)

app = self._get_asgi_app(endpoints_config)
Expand All @@ -417,6 +451,7 @@ def _get_asgi_app(endpoints_config: dict) -> falcon.asgi.App:

def _get_event(self, timeout: float) -> Tuple:
"""Returns the first message from the queue"""
self.metrics.message_backlog_size += self.messages.qsize()
try:
message = self.messages.get(timeout=timeout)
raw_message = str(message).encode("utf8")
Expand Down
5 changes: 2 additions & 3 deletions logprep/connector/s3/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
EndpointConnectionError,
)

from logprep.abc.output import Output, FatalOutputError
from logprep.metrics.metrics import Metric, CounterMetric
from logprep.abc.output import FatalOutputError, Output
from logprep.metrics.metrics import CounterMetric, Metric
from logprep.util.helper import get_dotted_field_value
from logprep.util.time import TimeParser

Expand Down Expand Up @@ -264,7 +264,6 @@ def store(self, document: dict):
Document to store.
"""
self.metrics.number_of_processed_events += 1

prefix_value = get_dotted_field_value(document, self._config.prefix_field)
if prefix_value is None:
document = self._build_no_prefix_document(
Expand Down
26 changes: 25 additions & 1 deletion tests/unit/connector/test_http_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
# pylint: disable=protected-access
# pylint: disable=attribute-defined-outside-init
import multiprocessing
import random
import re
from copy import deepcopy
from unittest import mock

import falcon
import pytest
import requests
import uvicorn
Expand Down Expand Up @@ -73,6 +73,12 @@ def setup_method(self):
},
}

expected_metrics = [
*BaseInputTestCase.expected_metrics,
"logprep_message_backlog_size",
"logprep_number_of_http_requests",
]

def teardown_method(self):
while not self.object.messages.empty():
self.object.messages.get(timeout=0.001)
Expand Down Expand Up @@ -368,3 +374,21 @@ def test_sets_target_to_http_schema_if_no_ssl_options(self):
connector_config = deepcopy(self.CONFIG)
connector = Factory.create({"test connector": connector_config}, logger=self.logger)
assert connector.target.startswith("http://")

def test_get_event_sets_message_backlog_size_metric(self):
self.object.metrics.message_backlog_size = 0
random_number = random.randint(1, 100)
for number in range(random_number):
self.object.messages.put({"message": f"my message{number}"})
self.object.get_next(0.001)
assert self.object.metrics.message_backlog_size == random_number

def test_enpoints_count_requests(self):
self.object.metrics.number_of_http_requests = 0
self.object.setup()
random_number = random.randint(1, 100)
for number in range(random_number):
requests.post(
url=f"{self.target}/json", json={"message": f"my message{number}"}, timeout=0.5
)
assert self.object.metrics.number_of_http_requests == random_number

0 comments on commit 1f08a51

Please sign in to comment.