Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EQL rules and schema validation #297

Merged
merged 8 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 43 additions & 24 deletions detection_rules/beats.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import os

import kql
import eql
import requests
import yaml

from .semver import Version
from .utils import unzip, load_etc_dump, save_etc_dump, get_etc_path
from .utils import unzip, load_etc_dump, save_etc_dump, get_etc_path, cached


def download_latest_beats_schema():
Expand Down Expand Up @@ -129,34 +130,16 @@ def get_beats_sub_schema(schema: dict, beat: str, module: str, *datasets: str):
return {field["name"]: field for field in sorted(flattened, key=lambda f: f["name"])}


SCHEMA = None


@cached
def read_beats_schema():
global SCHEMA

if SCHEMA is None:
beats_schemas = os.listdir(get_etc_path("beats_schemas"))
latest = max(beats_schemas, key=lambda b: Version(b.lstrip("v")))
beats_schemas = os.listdir(get_etc_path("beats_schemas"))
latest = max(beats_schemas, key=lambda b: Version(b.lstrip("v")))

SCHEMA = load_etc_dump("beats_schemas", latest)
return load_etc_dump("beats_schemas", latest)

return SCHEMA


def get_schema_for_query(tree: kql.ast, beats: list) -> dict:
def get_schema_from_datasets(beats, modules, datasets):
filtered = {}
modules = set()
datasets = set()

# extract out event.module and event.dataset from the query's AST
for node in tree:
if isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.module"):
modules.update(child.value for child in node.value if isinstance(child, kql.ast.String))

if isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.dataset"):
datasets.update(child.value for child in node.value if isinstance(child, kql.ast.String))

beats_schema = read_beats_schema()

# infer the module if only a dataset are defined
Expand All @@ -173,3 +156,39 @@ def get_schema_for_query(tree: kql.ast, beats: list) -> dict:
filtered.update(get_beats_sub_schema(beats_schema, beat, module, *datasets))

return filtered


def get_schema_from_eql(tree: eql.ast.BaseNode, beats: list) -> dict:
modules = set()
datasets = set()

# extract out event.module and event.dataset from the query's AST
for node in tree:
if isinstance(node, eql.ast.Comparison) and node.comparator == node.EQ and \
isinstance(node.right, eql.ast.String):
if node.left == eql.ast.Field("event", ["module"]):
modules.add(node.right.render())
elif node.left == eql.ast.Field("event", ["dataset"]):
datasets.add(node.right.render())
elif isinstance(node, eql.ast.InSet):
if node.expression == eql.ast.Field("event", ["module"]):
modules.add(node.get_literals())
elif node.expression == eql.ast.Field("event", ["dataset"]):
datasets.add(node.get_literals())

return get_schema_from_datasets(beats, modules, datasets)


def get_schema_from_kql(tree: kql.ast.BaseNode, beats: list) -> dict:
modules = set()
datasets = set()

# extract out event.module and event.dataset from the query's AST
for node in tree:
if isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.module"):
modules.update(child.value for child in node.value if isinstance(child, kql.ast.String))

if isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.dataset"):
datasets.update(child.value for child in node.value if isinstance(child, kql.ast.String))

return get_schema_from_datasets(beats, modules, datasets)
30 changes: 30 additions & 0 deletions detection_rules/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import json

import requests
import eql
import eql.types
import yaml

from .semver import Version
Expand Down Expand Up @@ -164,6 +166,34 @@ def flatten_multi_fields(schema):
return converted


class KqlSchema2Eql(eql.Schema):
type_mapping = {
"keyword": eql.types.TypeHint.String,
"ip": eql.types.TypeHint.String,
"float": eql.types.TypeHint.Numeric,
"double": eql.types.TypeHint.Numeric,
"long": eql.types.TypeHint.Numeric,
"short": eql.types.TypeHint.Numeric,
}

def __init__(self, kql_schema):
self.kql_schema = kql_schema
eql.Schema.__init__(self, {}, allow_any=True, allow_generic=False, allow_missing=False)

def validate_event_type(self, event_type):
# allow all event types to fill in X:
# `X` where ....
return True

def get_event_type_hint(self, event_type, path):
dotted = ".".join(path)
elasticsearch_type = self.kql_schema.get(dotted)
eql_hint = self.type_mapping.get(elasticsearch_type)

if eql_hint is not None:
return eql_hint, None


@cached
def get_kql_schema(version=None, indexes=None, beat_schema=None):
"""Get schema for KQL."""
Expand Down
47 changes: 44 additions & 3 deletions detection_rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import click
import kql
import eql

from . import ecs, beats
from .attack import TACTICS, build_threat_map_entry, technique_lookup
Expand Down Expand Up @@ -152,18 +153,58 @@ def validate(self, as_rule=False, versioned=False, query=True):

schema_cls.validate(contents, role=self.type)

if query and self.query and self.contents['language'] == 'kuery':
if query and self.query is not None:
ecs_versions = self.metadata.get('ecs_version')
indexes = self.contents.get("index", [])
self._validate_kql(ecs_versions, indexes, self.query, self.name)

if self.contents['language'] == 'kuery':
self._validate_kql(ecs_versions, indexes, self.query, self.name)

if self.contents['language'] == 'eql':
self._validate_eql(ecs_versions, indexes, self.query, self.name)

@staticmethod
@cached
def _validate_eql(ecs_versions, indexes, query, name):
# validate against all specified schemas or the latest if none specified
parsed = eql.parse_query(query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change the parsed_kql property to

    def parsed_query(self):
        language = self.contents.get('language')
        if self.query and language in ('kuery', 'eql'):
            return kql.parse(self.query) if language == 'kuery' else eql.parse_query(self.query)

Then you can use that here (and more consistently as needed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can't use it here because this is a static method and needs to be for caching to work.

but i did update the method regardless, even though it's never used

beat_types = [index.split("-")[0] for index in indexes if "beat-*" in index]
beat_schema = beats.get_schema_from_kql(parsed, beat_types) if beat_types else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be get_schema_from_eql?


ecs_versions = ecs_versions or [ecs_versions]
schemas = []

for version in ecs_versions:
try:
schemas.append(ecs.get_kql_schema(indexes=indexes, beat_schema=beat_schema, version=version))
except KeyError:
raise KeyError('Unknown ecs schema version: {} in rule {}.\n'
'Do you need to update schemas?'.format(version, name)) from None

for schema in schemas:
try:
with ecs.KqlSchema2Eql(schema):
eql.parse_query(query)

except eql.EqlTypeMismatchError:
raise

except eql.EqlParseError as exc:
message = exc.error_msg
trailer = None
if "Unknown field" in message and beat_types:
trailer = "\nTry adding event.module and event.dataset to specify beats module"

raise type(exc)(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None

@staticmethod
@cached
def _validate_kql(ecs_versions, indexes, query, name):
# validate against all specified schemas or the latest if none specified
parsed = kql.parse(query)
beat_types = [index.split("-")[0] for index in indexes if "beat-*" in index]
beat_schema = beats.get_schema_for_query(parsed, beat_types) if beat_types else None
beat_schema = beats.get_schema_from_kql(parsed, beat_types) if beat_types else None

if not ecs_versions:
kql.parse(query, schema=ecs.get_kql_schema(indexes=indexes, beat_schema=beat_schema))
Expand Down
8 changes: 5 additions & 3 deletions detection_rules/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from ..semver import Version

# import all of the schema versions
from .v78 import ApiSchema78
from .v79 import ApiSchema79
from .v7_8 import ApiSchema78
from .v7_9 import ApiSchema79
from .v7_10 import ApiSchema710

__all__ = (
"all_schemas",
Expand All @@ -21,9 +22,10 @@
all_schemas = [
ApiSchema78,
ApiSchema79,
ApiSchema710,
]

CurrentSchema = max(all_schemas, key=lambda cls: Version(cls.STACK_VERSION))
CurrentSchema = all_schemas[-1]


def downgrade(api_contents: dict, target_version: str):
Expand Down
36 changes: 36 additions & 0 deletions detection_rules/schemas/v7_10.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.

"""Definitions for rule metadata and schemas."""

import jsl
from .v7_9 import ApiSchema79


# rule types
EQL = "eql"


class ApiSchema710(ApiSchema79):
"""Schema for siem rule in API format."""

STACK_VERSION = "7.10"
RULE_TYPES = ApiSchema79.RULE_TYPES + [EQL]

type = jsl.StringField(enum=RULE_TYPES, required=True)

# there might be a bug in jsl that requires us to redefine these here
query_scope = ApiSchema79.query_scope
saved_id_scope = ApiSchema79.saved_id_scope
ml_scope = ApiSchema79.ml_scope
threshold_scope = ApiSchema79.threshold_scope

with jsl.Scope(EQL) as eql_scope:
eql_scope.index = jsl.ArrayField(jsl.StringField(), required=False)
eql_scope.query = jsl.StringField(required=True)
eql_scope.language = jsl.StringField(enum=[EQL], required=True)
rw-access marked this conversation as resolved.
Show resolved Hide resolved
eql_scope.type = jsl.StringField(enum=[EQL], required=True)
rw-access marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rw-access I believe this is what you wanted 👀 on, schema-wise? I can confirm that these are the EQL-specific fields and their correct types 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, thanks!


with jsl.Scope(jsl.DEFAULT_ROLE) as default_scope:
default_scope.type = type
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""Definitions for rule metadata and schemas."""

import jsl
from .v78 import ApiSchema78
from .v7_8 import ApiSchema78


OPERATORS = ['equals']
Expand Down
34 changes: 34 additions & 0 deletions tests/test_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""Test stack versioned schemas."""
import unittest
import uuid
import eql

from detection_rules.rule import Rule
from detection_rules.schemas import downgrade, CurrentSchema
Expand Down Expand Up @@ -106,3 +107,36 @@ def test_threshold_downgrade(self):

with self.assertRaisesRegex(ValueError, "Unsupported rule type"):
downgrade(api_contents, "7.8")

def test_eql_validation(self):
base_fields = {
"author": ["Elastic"],
"description": "test description",
"index": ["filebeat-*"],
"language": "eql",
"license": "Elastic License",
"name": "test rule",
"risk_score": 21,
"rule_id": str(uuid.uuid4()),
"severity": "low",
"type": "eql"
}

Rule("test.toml", dict(base_fields, query="""
process where process.name == "cmd.exe"
"""))

with self.assertRaises(eql.EqlSyntaxError):
Rule("test.toml", dict(base_fields, query="""
process where process.name == this!is$not#v@lid
"""))

with self.assertRaises(eql.EqlSemanticError):
Rule("test.toml", dict(base_fields, query="""
process where process.invalid_field == "hello world"
"""))

with self.assertRaises(eql.EqlTypeMismatchError):
Rule("test.toml", dict(base_fields, query="""
process where process.pid == "some string field"
"""))