Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add http input connector metrics #578

Merged
merged 5 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
* expose metrics via uvicorn webserver
* makes all uvicorn configuration options possible
* add security best practices to server configuration
* add following metrics to `http_input` connector
* `nummer_of_http_requests`
* `message_backlog_size`

### Improvements

Expand Down
61 changes: 48 additions & 13 deletions logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@
)

from logprep.abc.input import FatalInputError, Input
from logprep.metrics.metrics import CounterMetric, GaugeMetric
from logprep.util import http
from logprep.util.credentials import CredentialsFactory


def decorator_basic_auth(func: Callable):
def basic_auth(func: Callable):
"""Decorator to check basic authentication.
Will raise 401 on wrong credentials or missing Authorization-Header"""

Expand All @@ -120,7 +121,7 @@ async def func_wrapper(*args, **kwargs):
return func_wrapper


def decorator_request_exceptions(func: Callable):
def handle_request_exceptions(func: Callable):
"""Decorator to wrap http calls and raise exceptions"""

async def func_wrapper(*args, **kwargs):
Expand All @@ -143,7 +144,7 @@ async def func_wrapper(*args, **kwargs):
return func_wrapper


def decorator_add_metadata(func: Callable):
def add_metadata(func: Callable):
"""Decorator to add metadata to resulting http event.
Uses attribute collect_meta of endpoint class to decide over metadata collection
Uses attribute metafield_name to define key name for metadata
Expand Down Expand Up @@ -199,27 +200,34 @@ def __init__(
collect_meta: bool,
metafield_name: str,
credentials: dict,
metrics: "HttpConnector.Metrics",
) -> None:
self.messages = messages
self.collect_meta = collect_meta
self.metafield_name = metafield_name
self.credentials = credentials
self.metrics = metrics
if self.credentials:
self.basicauth_b64 = b64encode(
f"{self.credentials.username}:{self.credentials.password}".encode("utf-8")
).decode("utf-8")

def collect_metrics(self):
"""Increment number of requests"""
self.metrics.number_of_http_requests += 1


class JSONHttpEndpoint(HttpEndpoint):
""":code:`json` endpoint to get json from request"""

_decoder = msgspec.json.Decoder()

@decorator_request_exceptions
@decorator_basic_auth
@decorator_add_metadata
@handle_request_exceptions
@basic_auth
@add_metadata
async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ
"""json endpoint method"""
self.collect_metrics()
data = await req.stream.read()
data = data.decode("utf8")
metadata = kwargs.get("metadata", {})
Expand All @@ -233,11 +241,12 @@ class JSONLHttpEndpoint(HttpEndpoint):

_decoder = msgspec.json.Decoder()

@decorator_request_exceptions
@decorator_basic_auth
@decorator_add_metadata
@handle_request_exceptions
@basic_auth
@add_metadata
async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ
"""jsonl endpoint method"""
self.collect_metrics()
data = await req.stream.read()
data = data.decode("utf8")
event = kwargs.get("metadata", {})
Expand All @@ -252,11 +261,12 @@ class PlaintextHttpEndpoint(HttpEndpoint):
""":code:`plaintext` endpoint to get the body from request
and put it in :code:`message` field"""

@decorator_request_exceptions
@decorator_basic_auth
@decorator_add_metadata
@handle_request_exceptions
@basic_auth
@add_metadata
async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ
"""plaintext endpoint method"""
self.collect_metrics()
data = await req.stream.read()
metadata = kwargs.get("metadata", {})
event = {"message": data.decode("utf8")}
Expand All @@ -266,6 +276,26 @@ async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-diff
class HttpConnector(Input):
"""Connector to accept log messages as http post requests"""

@define(kw_only=True)
class Metrics(Input.Metrics):
"""Tracks statistics about this connector"""

number_of_http_requests: CounterMetric = field(
factory=lambda: CounterMetric(
description="Number of incomming requests",
name="number_of_http_requests",
)
)
"""Number of incomming requests"""

message_backlog_size: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Size of the message backlog queue",
name="message_backlog_size",
)
)
"""Size of the message backlog queue"""

@define(kw_only=True)
class Config(Input.Config):
"""Config for HTTPInput"""
Expand Down Expand Up @@ -398,7 +428,11 @@ def setup(self):
endpoint_class = self._endpoint_registry.get(endpoint_type)
credentials = cred_factory.from_endpoint(endpoint_path)
endpoints_config[endpoint_path] = endpoint_class(
self.messages, collect_meta, metafield_name, credentials
self.messages,
collect_meta,
metafield_name,
credentials,
self.metrics,
)

app = self._get_asgi_app(endpoints_config)
Expand All @@ -417,6 +451,7 @@ def _get_asgi_app(endpoints_config: dict) -> falcon.asgi.App:

def _get_event(self, timeout: float) -> Tuple:
"""Returns the first message from the queue"""
self.metrics.message_backlog_size += self.messages.qsize()
try:
message = self.messages.get(timeout=timeout)
raw_message = str(message).encode("utf8")
Expand Down
5 changes: 2 additions & 3 deletions logprep/connector/s3/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
EndpointConnectionError,
)

from logprep.abc.output import Output, FatalOutputError
from logprep.metrics.metrics import Metric, CounterMetric
from logprep.abc.output import FatalOutputError, Output
from logprep.metrics.metrics import CounterMetric, Metric
from logprep.util.helper import get_dotted_field_value
from logprep.util.time import TimeParser

Expand Down Expand Up @@ -264,7 +264,6 @@ def store(self, document: dict):
Document to store.
"""
self.metrics.number_of_processed_events += 1

prefix_value = get_dotted_field_value(document, self._config.prefix_field)
if prefix_value is None:
document = self._build_no_prefix_document(
Expand Down
26 changes: 25 additions & 1 deletion tests/unit/connector/test_http_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
# pylint: disable=protected-access
# pylint: disable=attribute-defined-outside-init
import multiprocessing
import random
import re
from copy import deepcopy
from unittest import mock

import falcon
import pytest
import requests
import uvicorn
Expand Down Expand Up @@ -73,6 +73,12 @@ def setup_method(self):
},
}

expected_metrics = [
*BaseInputTestCase.expected_metrics,
"logprep_message_backlog_size",
"logprep_number_of_http_requests",
]

def teardown_method(self):
while not self.object.messages.empty():
self.object.messages.get(timeout=0.001)
Expand Down Expand Up @@ -368,3 +374,21 @@ def test_sets_target_to_http_schema_if_no_ssl_options(self):
connector_config = deepcopy(self.CONFIG)
connector = Factory.create({"test connector": connector_config}, logger=self.logger)
assert connector.target.startswith("http://")

def test_get_event_sets_message_backlog_size_metric(self):
self.object.metrics.message_backlog_size = 0
random_number = random.randint(1, 100)
for number in range(random_number):
self.object.messages.put({"message": f"my message{number}"})
self.object.get_next(0.001)
assert self.object.metrics.message_backlog_size == random_number

def test_enpoints_count_requests(self):
self.object.metrics.number_of_http_requests = 0
self.object.setup()
random_number = random.randint(1, 100)
for number in range(random_number):
requests.post(
url=f"{self.target}/json", json={"message": f"my message{number}"}, timeout=0.5
)
assert self.object.metrics.number_of_http_requests == random_number
Loading