From 6d9d995452a3dc1583141f263ca421fcc95d4ed4 Mon Sep 17 00:00:00 2001 From: Peter Mazarovich Date: Thu, 28 Jan 2021 19:04:21 +0300 Subject: [PATCH] Logstash persistent queue improvement --- CHANGELOG.md | 1 + .../analytics/docker-compose.analytics.yml | 2 +- components/analytics/logstash/Dockerfile | 5 +++-- components/analytics/logstash/logstash.conf | 21 +++++++++++++++++-- components/analytics/logstash/logstash.yml | 3 +++ cvat/requirements/base.txt | 2 +- cvat/settings/base.py | 17 +++++++++++++-- 7 files changed, 43 insertions(+), 8 deletions(-) create mode 100644 components/analytics/logstash/logstash.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index e7b08362d5e6..2f0c41dbecd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Updated HTTPS install README section (cleanup and described more robust deploy) - Logstash is improved for using with configurable elasticsearch outputs () +- Persistent queue added to logstash () ### Deprecated diff --git a/components/analytics/docker-compose.analytics.yml b/components/analytics/docker-compose.analytics.yml index e7890ab16e3d..ac2ff82879d1 100644 --- a/components/analytics/docker-compose.analytics.yml +++ b/components/analytics/docker-compose.analytics.yml @@ -79,7 +79,7 @@ services: cvat: environment: DJANGO_LOG_SERVER_HOST: logstash - DJANGO_LOG_SERVER_PORT: 5000 + DJANGO_LOG_SERVER_PORT: 8080 DJANGO_LOG_VIEWER_HOST: kibana DJANGO_LOG_VIEWER_PORT: 5601 CVAT_ANALYTICS: 1 diff --git a/components/analytics/logstash/Dockerfile b/components/analytics/logstash/Dockerfile index ad012ccfc8fa..f7c869875b56 100644 --- a/components/analytics/logstash/Dockerfile +++ b/components/analytics/logstash/Dockerfile @@ -3,5 +3,6 @@ FROM docker.elastic.co/logstash/logstash-oss:${ELK_VERSION} RUN logstash-plugin install logstash-input-http logstash-filter-aggregate \ logstash-filter-prune logstash-output-email -COPY logstash.conf /usr/share/logstash/pipeline/ -EXPOSE 5000 +COPY logstash.yml /usr/share/logstash/config/ +COPY logstash.conf /usr/share/logstash/pipeline/ +EXPOSE 8080 diff --git a/components/analytics/logstash/logstash.conf b/components/analytics/logstash/logstash.conf index 02b3a5f29f61..5afd6e3fce2c 100644 --- a/components/analytics/logstash/logstash.conf +++ b/components/analytics/logstash/logstash.conf @@ -1,11 +1,22 @@ input { - tcp { - port => 5000 + http { + port => 8080 codec => json } } filter { + mutate { + add_field => {"logger_name" => ""} + add_field => {"path" =>""} + } + mutate { + copy => {"[extra][logger_name]" => "logger_name" } + copy => {"[extra][path]"=>"path"} + } + prune { + blacklist_names => ["type","logsource","extra","program","pid","headers"] + } if [logger_name] =~ /cvat.client/ { # 1. Decode the event from json in 'message' field # 2. Remove unnecessary field from it @@ -14,6 +25,9 @@ filter { mutate { rename => { "message" => "source_message" } } + mutate { + add_field => {"[@metadata][target_index_client]" => "cvat.client.%{+YYYY}.%{+MM}"} + } json { source => "source_message" @@ -77,6 +91,9 @@ filter { # 2. Remove unnecessary field from it # 3. Type it as server if [logger_name] =~ /cvat\.server\.task_[0-9]+/ { + mutate { + add_field => {"[@metadata][target_index_server]" => "cvat.server.%{+YYYY}.%{+MM}"} + } mutate { rename => { "logger_name" => "task_id" } gsub => [ "task_id", "cvat.server.task_", "" ] diff --git a/components/analytics/logstash/logstash.yml b/components/analytics/logstash/logstash.yml new file mode 100644 index 000000000000..73f412c139ed --- /dev/null +++ b/components/analytics/logstash/logstash.yml @@ -0,0 +1,3 @@ +queue.type: persisted +queue.max_bytes: 1gb +queue.checkpoint.writes: 20 diff --git a/cvat/requirements/base.txt b/cvat/requirements/base.txt index fa5b41f3c6ad..89675b73508d 100644 --- a/cvat/requirements/base.txt +++ b/cvat/requirements/base.txt @@ -20,7 +20,7 @@ rq-scheduler==0.10.0 sqlparse==0.3.1 django-sendfile==0.3.11 dj-pagination==2.5.0 -python-logstash==0.4.6 +python-logstash-async==2.2.0 django-revproxy==0.10.0 rules==2.2 GitPython==3.1.8 diff --git a/cvat/settings/base.py b/cvat/settings/base.py index 57db88b1ecb6..f7a694415614 100644 --- a/cvat/settings/base.py +++ b/cvat/settings/base.py @@ -336,7 +336,10 @@ def add_ssh_keys(): os.makedirs(STATIC_ROOT, exist_ok=True) DATA_ROOT = os.path.join(BASE_DIR, 'data') +LOGSTASH_DB = os.path.join(DATA_ROOT,'logstash.db') os.makedirs(DATA_ROOT, exist_ok=True) +if not os.path.exists(LOGSTASH_DB): + os.mknod(LOGSTASH_DB) MEDIA_DATA_ROOT = os.path.join(DATA_ROOT, 'data') os.makedirs(MEDIA_DATA_ROOT, exist_ok=True) @@ -366,6 +369,11 @@ def add_ssh_keys(): 'version': 1, 'disable_existing_loggers': False, 'formatters': { + 'logstash': { + '()': 'logstash_async.formatter.DjangoLogstashFormatter', + 'message_type': 'python-logstash', + 'fqdn': False, # Fully qualified domain name. Default value: false. + }, 'standard': { 'format': '[%(asctime)s] %(levelname)s %(name)s: %(message)s' } @@ -386,11 +394,16 @@ def add_ssh_keys(): }, 'logstash': { 'level': 'INFO', - 'class': 'logstash.TCPLogstashHandler', + 'class': 'logstash_async.handler.AsynchronousLogstashHandler', + 'formatter': 'logstash', + 'transport': 'logstash_async.transport.HttpTransport', + 'ssl_enable': False, + 'ssl_verify': False, 'host': os.getenv('DJANGO_LOG_SERVER_HOST', 'localhost'), - 'port': os.getenv('DJANGO_LOG_SERVER_PORT', 5000), + 'port': os.getenv('DJANGO_LOG_SERVER_PORT', 8080), 'version': 1, 'message_type': 'django', + 'database_path': LOGSTASH_DB, } }, 'loggers': {