Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

advanced logging configuration #584

Merged
merged 26 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
53588c3
implement dictconfig
ekneg54 May 7, 2024
1a0464d
change logging format and logger names
ekneg54 May 7, 2024
b3caca4
shorten logger names
ekneg54 May 7, 2024
6af7e71
add documentation and rewrite LoggerConfig
ekneg54 May 8, 2024
51e8451
fix tests
ekneg54 May 8, 2024
4c1efc0
fix changing DEFAULT_LOG_CONFIG by deepcopy
ekneg54 May 8, 2024
a32c20b
refactor LoggerConfig
ekneg54 May 8, 2024
411f9d4
increase max-instance-attributes because of attrs classes
ekneg54 May 8, 2024
e304718
revert change for fine grained opensearch and elasticsearch log levels
ekneg54 May 8, 2024
9220717
update changelog
ekneg54 May 8, 2024
b8f4b8a
rewrite logging config for http konnektor
ekneg54 May 8, 2024
d1e9822
update documentation
ekneg54 May 8, 2024
406bece
WIP: configure queuehandler throught dictconfig
ekneg54 May 8, 2024
275149b
fix duplicate log entries
ekneg54 May 8, 2024
e5aac60
fix duplicate log entries
ekneg54 May 10, 2024
74af69a
WIP
ekneg54 May 10, 2024
01a4f25
fix auto rule corpus tests
ekneg54 May 10, 2024
b351f0b
move logqueue to logging module
ekneg54 May 10, 2024
e7e8de1
fix http and metrics tests
ekneg54 May 11, 2024
e468b68
fix tests
ekneg54 May 11, 2024
bfed9e4
fix acceptance tests
ekneg54 May 11, 2024
6d65673
remove log levels from default config
ekneg54 May 11, 2024
1974824
fix blocking tests
ekneg54 May 11, 2024
8a5c512
Update logprep/util/configuration.py
ekneg54 May 13, 2024
0c92b74
Update logprep/util/configuration.py
ekneg54 May 13, 2024
8d183c1
link to external documentation for formatter attributes
ekneg54 May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ dist/
error_file
experiments
**/_static/*.xlsx
logprep.log
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ disable=too-few-public-methods
[DESIGN]
min-public-methods=1
max-public-methods=40
max-attributes=12


[CLASSES]
Expand Down
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@

### Features

* add fine-grained logger configuration for OpenSearch/ElasticSearch libraries
* add gzip handling to `http_input` connector
* adds advanced logging configuration
* add configurable log format
* add configurable datetime formate in logs
* makes `hostname` available in custom log formats
* add fine grained log level configuration for every logger instance

### Improvements

* rename `logprep.event_generator` module to `logprep.generator`
* shorten logger instance names

### Bugfix

* fixes exposing OpenSearch/ElasticSearch stacktraces in log when errors happen
* fixes exposing OpenSearch/ElasticSearch stacktraces in log when errors happen by making loglevel configurable for loggers `opensearch` and `elasticsearch`

## 11.2.1

Expand Down
4 changes: 0 additions & 4 deletions logprep/connector/elasticsearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ class Config(Output.Config):
flush_timeout: Optional[int] = field(validator=validators.instance_of(int), default=60)
"""(Optional) Timeout after :code:`message_backlog` is flushed if
:code:`message_backlog_size` is not reached."""
loglevel: Optional[str] = field(validator=validators.instance_of(str), default="INFO")
"""(Optional) Log level for the underlying library. Enables fine-grained control over the
logging, e.g. stacktraces can be activated or deactivated. Defaults to :code:`INFO`."""

__slots__ = ["_message_backlog", "_size_error_pattern"]

Expand Down Expand Up @@ -172,7 +169,6 @@ def _search_context(self) -> search.Elasticsearch:
elasticsearch.Elasticsearch
the eleasticsearch context
"""
logging.getLogger("elasticsearch").setLevel(self._config.loglevel)
return search.Elasticsearch(
self._config.hosts,
scheme=self.schema,
Expand Down
3 changes: 1 addition & 2 deletions logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,6 @@ def setup(self):
raise FatalInputError(
self, "Necessary instance attribute `pipeline_index` could not be found."
)

self._logger.debug(
f"HttpInput Connector started on target {self.target} and "
f"queue {id(self.messages)} "
Expand Down Expand Up @@ -462,7 +461,7 @@ def setup(self):

app = self._get_asgi_app(endpoints_config)
self.http_server = http.ThreadingHTTPServer(
self._config.uvicorn_config, app, daemon=False, logger_name="Logprep HTTPServer"
self._config.uvicorn_config, app, daemon=False, logger_name="HTTPServer"
)
self.http_server.start()

Expand Down
1 change: 0 additions & 1 deletion logprep/connector/opensearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ class Config(ElasticsearchOutput.Config):

@cached_property
def _search_context(self):
logging.getLogger("opensearch").setLevel(self._config.loglevel)
return search.OpenSearch(
self._config.hosts,
scheme=self.schema,
Expand Down
2 changes: 1 addition & 1 deletion logprep/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
class Factory:
"""Create components for logprep."""

_logger: "Logger" = logging.getLogger(__name__)
_logger: "Logger" = logging.getLogger("Factory")

@classmethod
def create(cls, configuration: dict, logger: "Logger") -> "Component":
Expand Down
16 changes: 4 additions & 12 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ class Metrics(Component.Metrics):
_logprep_config: Configuration
""" the logprep configuration dict """

_log_queue: multiprocessing.Queue
""" the handler for the logs """

_continue_iterating: Value
""" a flag to signal if iterating continues """

Expand Down Expand Up @@ -160,15 +157,10 @@ def _input(self) -> Input:
return Factory.create(input_connector_config, self.logger)

def __init__(
self,
config: Configuration,
pipeline_index: int = None,
log_queue: multiprocessing.Queue = None,
lock: Lock = None,
self, config: Configuration, pipeline_index: int = None, lock: Lock = None
) -> None:
self._log_queue = log_queue
self.logger = logging.getLogger(f"Logprep Pipeline {pipeline_index}")
self.logger.addHandler(logging.handlers.QueueHandler(log_queue))
self.logger = logging.getLogger("Pipeline")
self.logger.name = f"Pipeline{pipeline_index}"
self._logprep_config = config
self._timeout = config.timeout
self._continue_iterating = Value(c_bool)
Expand Down Expand Up @@ -207,7 +199,7 @@ def _create_processor(self, entry: dict) -> "Processor":
self.logger.debug(f"Created '{processor}' processor")
return processor

def run(self) -> None:
def run(self) -> None: # pylint: disable=method-hidden
"""Start processing processors in the Pipeline."""
with self._continue_iterating.get_lock():
self._continue_iterating.value = True
Expand Down
42 changes: 2 additions & 40 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@
from logprep.util.configuration import Configuration


def logger_process(queue: multiprocessing.queues.Queue, logger: logging.Logger):
"""Process log messages from a queue."""

while True:
message = queue.get()
if message is None:
break
logger.handle(message)


class PipelineManager:
"""Manage pipelines via multi-processing."""

Expand Down Expand Up @@ -60,9 +50,8 @@ class Metrics(Component.Metrics):

def __init__(self, configuration: Configuration):
self.metrics = self.Metrics(labels={"component": "manager"})
self._logger = logging.getLogger("Logprep PipelineManager")
self._logger = logging.getLogger("Manager")
if multiprocessing.current_process().name == "MainProcess":
self._start_multiprocess_logger()
self._set_http_input_queue(configuration)
self._pipelines: list[multiprocessing.Process] = []
self._configuration = configuration
Expand All @@ -86,25 +75,6 @@ def _set_http_input_queue(self, configuration):
message_backlog_size = input_config.get("message_backlog_size", 15000)
HttpConnector.messages = multiprocessing.Queue(maxsize=message_backlog_size)

def _start_multiprocess_logger(self):
self.log_queue = multiprocessing.Queue(-1)
self._log_process = multiprocessing.Process(
target=logger_process, args=(self.log_queue, self._logger), daemon=True
)
self._log_process.start()

def get_count(self) -> int:
"""Get the pipeline count.

Parameters
----------
count : int
The pipeline count will be incrementally changed until it reaches this value.

"""
self._logger.debug(f"Getting pipeline count: {len(self._pipelines)}")
return len(self._pipelines)

def set_count(self, count: int):
"""Set the pipeline count.

Expand Down Expand Up @@ -161,9 +131,6 @@ def stop(self):
self._decrease_to_count(0)
if self.prometheus_exporter:
self.prometheus_exporter.cleanup_prometheus_multiprocess_dir()
self.log_queue.put(None) # signal the logger process to stop
self._log_process.join()
self.log_queue.close()

def restart(self):
"""Restarts all pipelines"""
Expand All @@ -175,12 +142,7 @@ def restart(self):
self.prometheus_exporter.run()

def _create_pipeline(self, index) -> multiprocessing.Process:
pipeline = Pipeline(
pipeline_index=index,
config=self._configuration,
log_queue=self.log_queue,
lock=self._lock,
)
pipeline = Pipeline(pipeline_index=index, config=self._configuration, lock=self._lock)
self._logger.info("Created new pipeline")
process = multiprocessing.Process(target=pipeline.run, daemon=True)
process.stop = pipeline.stop
Expand Down
2 changes: 1 addition & 1 deletion logprep/metrics/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class PrometheusExporter:

def __init__(self, configuration: MetricsConfig):
self.is_running = False
logger_name = "Prometheus Exporter"
logger_name = "Exporter"
self._logger = getLogger(logger_name)
self._logger.debug("Initializing Prometheus Exporter")
self.configuration = configuration
Expand Down
39 changes: 13 additions & 26 deletions logprep/run_logprep.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# pylint: disable=logging-fstring-interpolation
"""This module can be used to start the logprep."""
import logging
import logging.config
import os
import signal
import sys
Expand All @@ -12,21 +13,17 @@
from logprep.generator.http.controller import Controller
from logprep.generator.kafka.run_load_tester import LoadTester
from logprep.runner import Runner
from logprep.util import defaults
from logprep.util.auto_rule_tester.auto_rule_corpus_tester import RuleCorpusTester
from logprep.util.auto_rule_tester.auto_rule_tester import AutoRuleTester
from logprep.util.configuration import Configuration, InvalidConfigurationError
from logprep.util.defaults import DEFAULT_LOG_CONFIG
from logprep.util.helper import get_versions_string, print_fcolor
from logprep.util.rule_dry_runner import DryRunner

warnings.simplefilter("always", DeprecationWarning)
logging.captureWarnings(True)

logging.getLogger("filelock").setLevel(logging.ERROR)
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
logging.getLogger("elasticsearch").setLevel(logging.ERROR)


logging.config.dictConfig(DEFAULT_LOG_CONFIG)
logger = logging.getLogger("logprep")
EPILOG_STR = "Check out our docs at https://logprep.readthedocs.io/en/latest/"


Expand All @@ -35,17 +32,13 @@ def _print_version(config: "Configuration") -> None:
sys.exit(0)


def _get_logger(logger_config: dict) -> logging.Logger:
log_level = logger_config.get("level", "INFO")
logging.basicConfig(level=log_level, format=defaults.DEFAULT_LOG_FORMAT)
logger = logging.getLogger("Logprep")
logger.setLevel(log_level)
return logger


def _get_configuration(config_paths: list[str]) -> Configuration:
try:
return Configuration.from_sources(config_paths)
config = Configuration.from_sources(config_paths)
config.logger.setup_logging()
logger = logging.getLogger("root") # pylint: disable=redefined-outer-name
logger.info(f"Log level set to '{logging.getLevelName(logger.level)}'")
return config
except InvalidConfigurationError as error:
print(f"InvalidConfigurationError: {error}", file=sys.stderr)
sys.exit(1)
Expand Down Expand Up @@ -80,8 +73,6 @@ def run(configs: tuple[str], version=None) -> None:
configuration = _get_configuration(configs)
if version:
_print_version(configuration)
logger = _get_logger(configuration.logger)
logger.info(f"Log level set to '{logging.getLevelName(logger.level)}'")
for version in get_versions_string(configuration).split("\n"):
logger.info(version)
logger.debug(f"Metric export enabled: {configuration.metrics.enabled}")
Expand Down Expand Up @@ -150,7 +141,7 @@ def dry_run(configs: tuple[str], events: str, input_type: str, full_output: bool
"""
config = _get_configuration(configs)
json_input = input_type == "json"
dry_runner = DryRunner(events, config, full_output, json_input, logging.getLogger("DryRunner"))
dry_runner = DryRunner(events, config, full_output, json_input)
dry_runner.run()


Expand Down Expand Up @@ -270,7 +261,7 @@ def generate_kafka(config, file):
@click.option(
"--loglevel",
help="Sets the log level for the logger.",
type=click.Choice(logging._levelToName.values()),
type=click.Choice(logging._levelToName.values()), # pylint: disable=protected-access
required=False,
default="INFO",
)
Expand All @@ -286,12 +277,8 @@ def generate_http(**kwargs):
Generates events based on templated sample files stored inside a dataset directory.
The events will be sent to a http endpoint.
"""
log_level = kwargs.get("loglevel")
logging.basicConfig(
level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("generator")
logger.info(f"Log level set to '{log_level}'")
generator_logger = logging.getLogger("Generator")
generator_logger.info(f"Log level set to '{logging.getLevelName(generator_logger.level)}'")
generator = Controller(**kwargs)
generator.run()

Expand Down
12 changes: 7 additions & 5 deletions logprep/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def get_runner(configuration: Configuration) -> "Runner":
def __init__(self, configuration: Configuration) -> None:
self._configuration = configuration
self.metrics = self.Metrics(labels={"logprep": "unset", "config": "unset"})
self._logger = logging.getLogger("Logprep Runner")
self._logger = logging.getLogger("Runner")

self._manager = PipelineManager(configuration)
self.scheduler = Scheduler()
Expand All @@ -147,15 +147,17 @@ def start(self):
self._manager.restart()
self._logger.info("Startup complete")
self._logger.debug("Runner iterating")
self._iterate()
self._logger.info("Shutting down")
self._manager.stop()
self._logger.info("Shutdown complete")

def _iterate(self):
for _ in self._keep_iterating():
if self._exit_received:
break
self.scheduler.run_pending()
self._manager.restart_failed_pipeline()
self._logger.info("Shutting down")
self._logger.info("Initiated shutdown")
self._manager.stop()
self._logger.info("Shutdown complete")

def reload_configuration(self):
"""Reloads the configuration"""
Expand Down
Loading
Loading