Skip to content

Commit

Permalink
OpenSearch indexes partitioned to daily partitions
Browse files Browse the repository at this point in the history
- allow to match them through pattern matching
  • Loading branch information
Tomáš Daniel committed Nov 7, 2023
1 parent 2261751 commit ba7f05b
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 11 deletions.
2 changes: 1 addition & 1 deletion example/apps/test_security/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_call_command(*args, **kwargs):

def assert_equal_logstash(logstash_output, expected_index, expected_version, expected_logger_id, expected_data):
prefix_and_index, version, logger_id, data = logstash_output.split(' ', 3)
assert_equal(prefix_and_index, f'INFO:security.logstash:{expected_index}')
assert_equal(prefix_and_index, f'INFO:security.logstash:{expected_index}*')
assert_equal(version, str(expected_version))
assert_equal(logger_id, str(expected_logger_id))
parsed_data = json.loads(data)
Expand Down
2 changes: 1 addition & 1 deletion logstash.example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ filter {
output {
elasticsearch {
ecs_compatibility => "v1"
hosts => "elasticsearch:9200"
hosts => "elasticsearch:9200-%{+YYYY-MM-dd}"
index => "%{[@metadata][index]}"
document_id => "%{[@metadata][id]}"
version => "%{[@metadata][version]}"
Expand Down
6 changes: 5 additions & 1 deletion security/backends/elasticsearch/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ def init_documents(self):
)

for document in InputRequestLog, OutputRequestLog, CommandLog, CeleryTaskRunLog, CeleryTaskInvocationLog:
document.init()
if isinstance(document_class, PartitionedLog):
logs = document._index.as_template(document._index._name.replace("*", ""), order=0)
logs.save()
else:
document.init()


connection = ConnectionHandler()
Expand Down
42 changes: 35 additions & 7 deletions security/backends/elasticsearch/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json

from datetime import datetime
from elasticsearch import NotFoundError

from django.db import router
Expand All @@ -23,11 +23,15 @@
from .connection import set_connection


def get_index_name(logger_name):
return '{}-{}-log'.format(
def get_index_name(logger_name, partitioned=True):
index_name = '{}-{}-log'.format(
settings.ELASTICSEARCH_DATABASE.get('prefix', 'security'),
logger_name.value,
)
if partitioned:
index_name += '*'

return index_name


class JSONTextField(CustomField):
Expand Down Expand Up @@ -111,7 +115,31 @@ def update(
)


class RequestLog(Log):
class PartitionedLog(Log):

DAY_FORMAT = "%Y-%m-%d"

def save(self, **kwargs):
# assign now if no timestamp given
if not self.start:
self.start = datetime.now()

# override the index to go to the proper timeslot
kwargs['index'] = self._format_index_name(self.start)
return super().save(**kwargs)

@classmethod
def get(cls, *args, **kwargs):
now = datetime.now()
kwargs['index'] = cls._format_index_name(now)
return super().get(*args, **kwargs)

@staticmethod
def _format_index_name(dt):
return dt.strftime(f'{cls._index._name.replace("*", "")}-%Y-%m-%d')


class RequestLog(PartitionedLog):

host = Keyword()
method = Keyword()
Expand Down Expand Up @@ -152,7 +180,7 @@ class Index:
name = get_index_name(LoggerName.OUTPUT_REQUEST)


class CommandLog(CommandLogStrMixin, Log):
class CommandLog(CommandLogStrMixin, PartitionedLog):

name = Keyword()
input = Text()
Expand All @@ -166,7 +194,7 @@ class Index:
name = get_index_name(LoggerName.COMMAND)


class CeleryTaskInvocationLog(CeleryTaskInvocationLogStrMixin, Log):
class CeleryTaskInvocationLog(CeleryTaskInvocationLogStrMixin, PartitionedLog):

celery_task_id = Keyword()
name = Keyword()
Expand Down Expand Up @@ -195,7 +223,7 @@ class Index:
name = get_index_name(LoggerName.CELERY_TASK_INVOCATION)


class CeleryTaskRunLog(CeleryTaskRunLogStrMixin, Log):
class CeleryTaskRunLog(CeleryTaskRunLogStrMixin, PartitionedLog):

celery_task_id = Keyword()
state = EnumField(enum=CeleryTaskRunLogState)
Expand Down
6 changes: 5 additions & 1 deletion security/backends/elasticsearch/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ def enable(self):
for document_class in (CommandLog, CeleryTaskRunLog, CeleryTaskInvocationLog,
InputRequestLog, OutputRequestLog):
document_class._index._name = f'{uuid}.{document_class._index._name}'
document_class.init()
if isinstance(document_class, PartitionedLog):
logs = document_class._index.as_template(document_class._index._name.split("*")[0], order=0)
logs.save()
else:
document_class.init()

def disable(self):
for document_class in (CommandLog, CeleryTaskRunLog, CeleryTaskInvocationLog,
Expand Down

0 comments on commit ba7f05b

Please sign in to comment.