Skip to content

Commit

Permalink
Merge pull request #4 from skip-pay/tda/feat/index_pattern_matching
Browse files Browse the repository at this point in the history
OpenSearch indexes partitioned to daily partitions
  • Loading branch information
xdaniel3 authored Nov 8, 2023
2 parents 2261751 + 071ed96 commit e4742b8
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 12 deletions.
2 changes: 1 addition & 1 deletion example/apps/test_security/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_call_command(*args, **kwargs):

def assert_equal_logstash(logstash_output, expected_index, expected_version, expected_logger_id, expected_data):
prefix_and_index, version, logger_id, data = logstash_output.split(' ', 3)
assert_equal(prefix_and_index, f'INFO:security.logstash:{expected_index}')
assert_equal(prefix_and_index, f'INFO:security.logstash:{expected_index}*')
assert_equal(version, str(expected_version))
assert_equal(logger_id, str(expected_logger_id))
parsed_data = json.loads(data)
Expand Down
2 changes: 1 addition & 1 deletion logstash.example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ output {
elasticsearch {
ecs_compatibility => "v1"
hosts => "elasticsearch:9200"
index => "%{[@metadata][index]}"
index => "%{[@metadata][index]}-%{+YYYY-MM-dd}"
document_id => "%{[@metadata][id]}"
version => "%{[@metadata][version]}"
version_type => "external"
Expand Down
8 changes: 6 additions & 2 deletions security/backends/elasticsearch/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ def connect(self, init_documents):

def init_documents(self):
from security.backends.elasticsearch.models import (
InputRequestLog, OutputRequestLog, CommandLog, CeleryTaskRunLog, CeleryTaskInvocationLog
InputRequestLog, OutputRequestLog, CommandLog, CeleryTaskRunLog, CeleryTaskInvocationLog, PartitionedLog
)

for document in InputRequestLog, OutputRequestLog, CommandLog, CeleryTaskRunLog, CeleryTaskInvocationLog:
document.init()
if issubclass(document, PartitionedLog):
template = document.get_template()
template.save()
else:
document.init()


connection = ConnectionHandler()
Expand Down
47 changes: 40 additions & 7 deletions security/backends/elasticsearch/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json

from datetime import datetime
from elasticsearch import NotFoundError

from django.db import router
Expand All @@ -23,11 +23,15 @@
from .connection import set_connection


def get_index_name(logger_name):
return '{}-{}-log'.format(
def get_index_name(logger_name, partitioned=True):
index_name = '{}-{}-log'.format(
settings.ELASTICSEARCH_DATABASE.get('prefix', 'security'),
logger_name.value,
)
if partitioned:
index_name += '*'

return index_name


class JSONTextField(CustomField):
Expand Down Expand Up @@ -111,7 +115,36 @@ def update(
)


class RequestLog(Log):
class PartitionedLog(Log):

DAY_FORMAT = "%Y-%m-%d"

def save(self, **kwargs):
# assign now if no timestamp given
if not self.start:
self.start = datetime.now()

# override the index to go to the proper timeslot
kwargs['index'] = self._format_index_name(self.start)
return super().save(**kwargs)

@classmethod
def get(cls, *args, **kwargs):
now = datetime.now()
kwargs['index'] = cls._format_index_name(now)
return super().get(*args, **kwargs)

@classmethod
def _format_index_name(cls, dt):
return dt.strftime(f'{cls._index._name.replace("*", "")}-%Y-%m-%d')

@classmethod
def get_template(cls):
index_name = cls._index._name.split("*")[0]
return cls._index.as_template(index_name, order=0)


class RequestLog(PartitionedLog):

host = Keyword()
method = Keyword()
Expand Down Expand Up @@ -152,7 +185,7 @@ class Index:
name = get_index_name(LoggerName.OUTPUT_REQUEST)


class CommandLog(CommandLogStrMixin, Log):
class CommandLog(CommandLogStrMixin, PartitionedLog):

name = Keyword()
input = Text()
Expand All @@ -166,7 +199,7 @@ class Index:
name = get_index_name(LoggerName.COMMAND)


class CeleryTaskInvocationLog(CeleryTaskInvocationLogStrMixin, Log):
class CeleryTaskInvocationLog(CeleryTaskInvocationLogStrMixin, PartitionedLog):

celery_task_id = Keyword()
name = Keyword()
Expand Down Expand Up @@ -195,7 +228,7 @@ class Index:
name = get_index_name(LoggerName.CELERY_TASK_INVOCATION)


class CeleryTaskRunLog(CeleryTaskRunLogStrMixin, Log):
class CeleryTaskRunLog(CeleryTaskRunLogStrMixin, PartitionedLog):

celery_task_id = Keyword()
state = EnumField(enum=CeleryTaskRunLogState)
Expand Down
7 changes: 6 additions & 1 deletion security/backends/elasticsearch/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from django.test.utils import override_settings

from .models import CommandLog, CeleryTaskRunLog, CeleryTaskInvocationLog, InputRequestLog, OutputRequestLog
from .models import PartitionedLog


class store_elasticsearch_log(override_settings):
Expand All @@ -21,7 +22,11 @@ def enable(self):
for document_class in (CommandLog, CeleryTaskRunLog, CeleryTaskInvocationLog,
InputRequestLog, OutputRequestLog):
document_class._index._name = f'{uuid}.{document_class._index._name}'
document_class.init()
if issubclass(document_class, PartitionedLog):
template = document_class.get_template()
template.save()
else:
document_class.init()

def disable(self):
for document_class in (CommandLog, CeleryTaskRunLog, CeleryTaskInvocationLog,
Expand Down

0 comments on commit e4742b8

Please sign in to comment.