Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FR] Add Integration Schema Query Validation #2470

Merged
merged 53 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
4308f9a
add method to get packge integration window
Mikaayenson Jan 15, 2023
f50fa7f
update get_packaged_integrations func name
Mikaayenson Jan 15, 2023
06bcc3f
Loosen restrictions for some edge cases
Mikaayenson Jan 15, 2023
b0f828e
Add method to build flatten integration schemas based on pulled inte…
Mikaayenson Jan 16, 2023
1d609e1
Add a method to load compressed integration schemas from disk
Mikaayenson Jan 16, 2023
ce3b82e
update schema
Mikaayenson Jan 16, 2023
85da15f
update compressed schema to include integration key and logic to vali…
Mikaayenson Jan 16, 2023
38ab887
add logic to validate eql rules against integration schemas
Mikaayenson Jan 17, 2023
1775e3f
update schema name
Mikaayenson Jan 17, 2023
e8f05bb
Add todo
Mikaayenson Jan 17, 2023
2d49789
check for field key before continue
Mikaayenson Jan 17, 2023
faec3dd
update a delta of the integration schema based on manifest
Mikaayenson Jan 17, 2023
48c83a5
expose find_compatible_version_window, small cleanup, and move integr…
Mikaayenson Jan 19, 2023
c8630d2
refactor logic to use latest integration versions
Mikaayenson Jan 20, 2023
f4abcf0
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 20, 2023
4c74cf3
update cli command to reflect latest version changes
Mikaayenson Jan 20, 2023
f3509c5
print rule name before verbose integration output
Mikaayenson Jan 20, 2023
7c35e61
Only prod rules
Mikaayenson Jan 25, 2023
8942784
Merge branch '1994-add-integration-specific-query-validation' of gith…
Mikaayenson Jan 25, 2023
171e1ee
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 25, 2023
3460f4a
Update integrations.py
Mikaayenson Jan 25, 2023
ff7f29a
move maturity check up
Mikaayenson Jan 26, 2023
8cda74c
Merge branch '1994-add-integration-specific-query-validation' of gith…
Mikaayenson Jan 26, 2023
a6f33cb
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 26, 2023
6d3c475
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 27, 2023
d1312b7
warn once for available integration upgrades
Mikaayenson Jan 27, 2023
a02edf2
validate rules with endgame index and add non-ecs-mapping to schema
Mikaayenson Jan 27, 2023
3eefb93
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 27, 2023
4de416f
lint cleanup
Mikaayenson Jan 27, 2023
d6ebfe4
Merge branch '1994-add-integration-specific-query-validation' of gith…
Mikaayenson Jan 27, 2023
57ff704
refactor endgame check within available loop
Mikaayenson Jan 27, 2023
d184621
return endgame schema
Mikaayenson Jan 30, 2023
baa23d7
rebuild manifest
Mikaayenson Jan 30, 2023
407eea8
rebuild schema without prerelease packages
Mikaayenson Jan 30, 2023
ba8b7b1
add in validation to raise error
Mikaayenson Jan 30, 2023
5002d8e
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 30, 2023
e4caab4
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 31, 2023
75f2750
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 31, 2023
9ba8f33
check unknown fields aginst the combined schemas per stack version
Mikaayenson Feb 1, 2023
12347f5
Catch missing tags in integration unit test
Mikaayenson Feb 1, 2023
5e5b484
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Feb 1, 2023
04cdae1
Update manifests and schemas
Mikaayenson Feb 1, 2023
e9267e3
Update integration tag unit test to catch missing index and integrati…
Mikaayenson Feb 1, 2023
58eb52f
Merge branch '1994-add-integration-specific-query-validation' of gith…
Mikaayenson Feb 1, 2023
9ceabc3
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Feb 1, 2023
888caea
only show the notice for later integraiotn once
Mikaayenson Feb 1, 2023
3ef46e7
update notice message
Mikaayenson Feb 1, 2023
9aa6ca9
Merge branch '1994-add-integration-specific-query-validation' of gith…
Mikaayenson Feb 2, 2023
5cbfb54
update schemas
Mikaayenson Feb 2, 2023
ff6a64f
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Feb 2, 2023
ca83ae1
notify only if env variable set.
Mikaayenson Feb 2, 2023
011511a
get the data nofify field
Mikaayenson Feb 2, 2023
cc70eb5
Merge branch '1994-add-integration-specific-query-validation' of gith…
Mikaayenson Feb 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions detection_rules/beats.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ def _flatten_schema(schema: list, prefix="") -> list:
# it's probably not perfect, but we can fix other bugs as we run into them later
if len(schema) == 1 and nested_prefix.startswith(prefix + prefix):
nested_prefix = s["name"] + "."
if "field" in s:
# integrations sometimes have a group with a single field
flattened.extend(_flatten_schema(s["field"], prefix=nested_prefix))
continue
elif "fields" not in s:
# integrations sometimes have a group with no fields
continue

flattened.extend(_flatten_schema(s["fields"], prefix=nested_prefix))
elif "fields" in s:
flattened.extend(_flatten_schema(s["fields"], prefix=prefix))
Expand All @@ -131,6 +139,10 @@ def _flatten_schema(schema: list, prefix="") -> list:
return flattened


def flatten_ecs_schema(schema: dict) -> dict:
return _flatten_schema(schema)


def get_field_schema(base_directory, prefix="", include_common=False):
base_directory = base_directory.get("folders", {}).get("_meta", {}).get("files", {})
flattened = []
Expand Down
25 changes: 21 additions & 4 deletions detection_rules/devtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from .endgame import EndgameSchemaManager
from .eswrap import CollectEvents, add_range_to_dsl
from .ghwrap import GithubClient, update_gist
from .integrations import build_integrations_manifest
from .integrations import build_integrations_manifest, build_integrations_schemas
from .main import root
from .misc import PYTHON_LICENSE, add_client, client_error
from .packaging import (CURRENT_RELEASE_PATH, PACKAGE_FILE, RELEASE_DIR,
Expand Down Expand Up @@ -1174,10 +1174,27 @@ def integrations_group():
def build_integration_manifests(overwrite: bool):
"""Builds consolidated integrations manifests file."""
click.echo("loading rules to determine all integration tags")

def flatten(tag_list: List[str]) -> List[str]:
return list(set([tag for tags in tag_list for tag in (flatten(tags) if isinstance(tags, list) else [tags])]))

rules = RuleCollection.default()
integration_tags = list(set([r.contents.metadata.integration for r in rules if r.contents.metadata.integration]))
click.echo(f"integration tags identified: {integration_tags}")
build_integrations_manifest(overwrite, integration_tags)
integration_tags = [r.contents.metadata.integration for r in rules if r.contents.metadata.integration]
unique_integration_tags = flatten(integration_tags)
click.echo(f"integration tags identified: {unique_integration_tags}")
build_integrations_manifest(overwrite, unique_integration_tags)


@integrations_group.command('build-schemas')
@click.option('--overwrite', '-o', is_flag=True, help="Overwrite the entire integrations-schema.json.gz file")
def build_integration_schemas(overwrite: bool):
"""Builds consolidated integrations schemas file."""
click.echo("Building integration schemas...")

start_time = time.perf_counter()
build_integrations_schemas(overwrite)
end_time = time.perf_counter()
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved
click.echo(f"Time taken to generate schemas: {(end_time - start_time)/60:.2f} seconds")


@dev_group.group('schemas')
Expand Down
Binary file modified detection_rules/etc/integration-manifests.json.gz
Binary file not shown.
Binary file added detection_rules/etc/integration-schemas.json.gz
Binary file not shown.
183 changes: 180 additions & 3 deletions detection_rules/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,29 @@
# 2.0.

"""Functions to support and interact with Kibana integrations."""
import glob
import gzip
import io
import json
import os
import re
import zipfile
from collections import OrderedDict
from pathlib import Path
from typing import Generator

import requests
import yaml
from marshmallow import EXCLUDE, Schema, fields, post_load

import kql

from . import ecs
from .beats import flatten_ecs_schema
from .semver import Version
from .utils import cached, get_etc_path, read_gzip

MANIFEST_FILE_PATH = Path(get_etc_path('integration-manifests.json.gz'))
SCHEMA_FILE_PATH = Path(get_etc_path('integration-schemas.json.gz'))


@cached
Expand All @@ -26,11 +35,18 @@ def load_integrations_manifests() -> dict:
return json.loads(read_gzip(get_etc_path('integration-manifests.json.gz')))


@cached
def load_integrations_schemas() -> dict:
"""Load the consolidated integrations schemas."""
return json.loads(read_gzip(get_etc_path('integration-schemas.json.gz')))


class IntegrationManifestSchema(Schema):
name = fields.Str(required=True)
version = fields.Str(required=True)
release = fields.Str(required=True)
description = fields.Str(required=True)
download = fields.Str(required=True)
conditions = fields.Dict(required=True)
policy_templates = fields.List(fields.Dict, required=True)
owner = fields.Dict(required=False)
Expand All @@ -44,8 +60,8 @@ def transform_policy_template(self, data, **kwargs):
def build_integrations_manifest(overwrite: bool, rule_integrations: list) -> None:
"""Builds a new local copy of manifest.yaml from integrations Github."""
if overwrite:
if os.path.exists(MANIFEST_FILE_PATH):
os.remove(MANIFEST_FILE_PATH)
if MANIFEST_FILE_PATH.exists():
MANIFEST_FILE_PATH.unlink()

final_integration_manifests = {integration: {} for integration in rule_integrations}

Expand All @@ -62,6 +78,63 @@ def build_integrations_manifest(overwrite: bool, rule_integrations: list) -> Non
print(f"final integrations manifests dumped: {MANIFEST_FILE_PATH}")


def build_integrations_schemas(overwrite: bool) -> None:
"""Builds a new local copy of integration-schemas.json.gz from EPR integrations."""

final_integration_schemas = {}
saved_integration_schemas = {}

# Check if the file already exists and handle accordingly
if overwrite and SCHEMA_FILE_PATH.exists():
SCHEMA_FILE_PATH.unlink()
elif SCHEMA_FILE_PATH.exists():
saved_integration_schemas = load_integrations_schemas()

# Load the integration manifests
integration_manifests = load_integrations_manifests()

# Loop through the packages and versions
for package, versions in integration_manifests.items():
print(f"processing {package}")
final_integration_schemas.setdefault(package, {})
for version, manifest in versions.items():
if package in saved_integration_schemas and version in saved_integration_schemas[package]:
continue

# Download the zip file
download_url = f"https://epr.elastic.co{manifest['download']}"
response = requests.get(download_url)
response.raise_for_status()
zip_file = io.BytesIO(response.content)

# Update the final integration schemas
final_integration_schemas[package].update({version: {}})

# Open the zip file
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved
for file in zip_ref.namelist():
# Check if the file is a match
if glob.fnmatch.fnmatch(file, '*/fields/*.yml'):
integration_name = Path(file).parent.parent.name
final_integration_schemas[package][version].setdefault(integration_name, {})
file_data = zip_ref.read(file)
schema_fields = yaml.load(file_data, Loader=yaml.FullLoader)

# Parse the schema and add to the integration_manifests
data = flatten_ecs_schema(schema_fields)
flat_data = {field['name']: field.get('type', 'keyword') for field in data}
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved

final_integration_schemas[package][version][integration_name].update(flat_data)

del file_data
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved

# Write the final integration schemas to disk
with gzip.open(SCHEMA_FILE_PATH, "w") as schema_file:
schema_file_bytes = json.dumps(final_integration_schemas).encode("utf-8")
schema_file.write(schema_file_bytes)
print(f"final integrations manifests dumped: {SCHEMA_FILE_PATH}")
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved


def find_least_compatible_version(package: str, integration: str,
current_stack_version: str, packages_manifest: dict) -> str:
"""Finds least compatible version for specified integration based on stack version supplied."""
Expand Down Expand Up @@ -89,6 +162,49 @@ def find_least_compatible_version(package: str, integration: str,
raise ValueError(f"no compatible version for integration {package}:{integration}")


def find_compatible_version_window(package: str, integration: str,
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved
rule_stack_version: str, packages_manifest: dict) -> Generator[str, None, None]:
"""Finds least compatible version for specified integration based on stack version supplied."""

if not package:
raise ValueError("Package must be specified")

package_manifest = packages_manifest.get(package)
if package_manifest is None:
raise ValueError(f"Package {package} not found in manifest.")

integration_manifests = sorted(package_manifest.items(), key=lambda x: Version(str(x[0])))
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved

# iterates through ascending integration manifests
# returns latest major version that is least compatible
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved
for version, manifest in integration_manifests:
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved
kibana_conditions = manifest.get("conditions", {}).get("kibana", {})
version_requirement = kibana_conditions.get("version")
if not version_requirement:
raise ValueError(f"Manifest for {package}:{integration} version {version} is missing conditions.")

compatible_versions = re.sub(r"\>|\<|\=|\^", "", manifest["conditions"]["kibana"]["version"]).split(" || ")

if not compatible_versions:
raise ValueError(f"Manifest for {package}:{integration} version {version} is missing compatible versions")

if len(compatible_versions) > 1:
highest_compatible_version = max(compatible_versions, key=lambda x: Version(x))

if Version(highest_compatible_version) > Version(rule_stack_version):
print(f"Integration {package}-{integration} {version=} has multiple stack version requirements.",
f"Consider updating min_stack version to the latest {compatible_versions}.")
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved

for kibana_ver in compatible_versions:
if Version(kibana_ver) > Version(rule_stack_version):
print(f"Integration {package}-{integration} version {version} has a higher stack version requirement.",
f"Consider updating min_stack version to {kibana_ver} to support this version.")
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved
elif int(kibana_ver[0]) == int(rule_stack_version[0]):
# check versions have the same major
if Version(kibana_ver) <= Version(rule_stack_version):
yield version


def get_integration_manifests(integration: str) -> list:
"""Iterates over specified integrations from package-storage and combines manifests per version."""
epr_search_url = "https://epr.elastic.co/search"
Expand All @@ -106,3 +222,64 @@ def get_integration_manifests(integration: str) -> list:
print(f"loaded {integration} manifests from the following package versions: "
f"{[manifest['version'] for manifest in manifests]}")
return manifests


def get_integration_schema_data(data, meta) -> Generator[dict, None, None]:
"""Iterates over specified integrations from package-storage and combines schemas per version."""

# lazy import to avoid circular import
from .rule import ( # pylint: disable=import-outside-toplevel
QueryRuleData, RuleMeta, TOMLRuleContents)
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved

data: QueryRuleData
meta: RuleMeta
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved

packages_manifest = load_integrations_manifests()
integrations_schemas = load_integrations_schemas()

# validate the query against related integration fields
if isinstance(data, QueryRuleData) and data.language != 'lucene':
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)

if not package_integrations:
return

for stack_version, mapping in meta.get_validation_stack_versions().items():
ecs_version = mapping['ecs']

ecs_schema = ecs.flatten_multi_fields(ecs.get_schema(ecs_version, name='ecs_flat'))

for pk_int in package_integrations:
package = pk_int["package"]
integration = pk_int["integration"]

if meta.min_stack_version is None and meta.maturity == "development":
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved
continue

package_version_window = find_compatible_version_window(package=package,
integration=integration,
rule_stack_version=meta.min_stack_version,
packages_manifest=packages_manifest)

for package_version in package_version_window:
schema = {}
if integration is None:
# Use all fields from each dataset
for dataset in integrations_schemas[package][package_version]:
schema.update(integrations_schemas[package][package_version][dataset])
else:
if integration not in integrations_schemas[package][package_version]:
print(f"Error: Integration {integration} not found in package {package} "
f"version {package_version}")

# TODO: uncomment this once all rules are triaged and remove the print above
# raise ValueError(f"Integration {integration} not found in package {package} "
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved
# f"version {package_version}")
continue
schema = integrations_schemas[package][package_version][integration]
schema.update(ecs_schema)
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved
integration_schema = {k: kql.parser.elasticsearch_type_family(v) for k, v in schema.items()}
data = {"schema": integration_schema, "package": package, "integration": integration,
"stack_version": stack_version, "ecs_version": ecs_version,
"package_version": package_version}
yield data
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved
13 changes: 9 additions & 4 deletions detection_rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,9 @@ def validator(self) -> Optional[QueryValidator]:
def validate_query(self, meta: RuleMeta) -> None:
validator = self.validator
if validator is not None:
# TODO: Decide on whether to only validate_integrations or also validate
if meta.integration:
return validator.validate_integration(self, meta)
Mikaayenson marked this conversation as resolved.
Show resolved Hide resolved
return validator.validate(self, meta)

@cached_property
Expand Down Expand Up @@ -847,7 +850,7 @@ def _add_related_integrations(self, obj: dict) -> None:

if self.check_restricted_field_version(field_name):
if isinstance(self.data, QueryRuleData) and self.data.language != 'lucene':
package_integrations = self._get_packaged_integrations(packages_manifest)
package_integrations = self.get_packaged_integrations(self.data, self.metadata, packages_manifest)

if not package_integrations:
return
Expand Down Expand Up @@ -947,11 +950,13 @@ def compare_field_versions(min_stack: Version, max_stack: Version) -> bool:
max_stack = max_stack or current_version
return Version(min_stack) <= current_version >= Version(max_stack)

def _get_packaged_integrations(self, package_manifest: dict) -> Optional[List[dict]]:
@classmethod
def get_packaged_integrations(cls, data: QueryRuleData, meta: RuleMeta,
package_manifest: dict) -> Optional[List[dict]]:
packaged_integrations = []
datasets = set()

for node in self.data.get('ast', []):
for node in data.get('ast', []):
if isinstance(node, eql.ast.Comparison) and str(node.left) == 'event.dataset':
datasets.update(set(n.value for n in node if isinstance(n, eql.ast.Literal)))
elif isinstance(node, FieldComparison) and str(node.field) == 'event.dataset':
Expand All @@ -960,7 +965,7 @@ def _get_packaged_integrations(self, package_manifest: dict) -> Optional[List[di
if not datasets:
# windows and endpoint integration do not have event.dataset fields in queries
# integration is None to remove duplicate references upstream in Kibana
rule_integrations = self.metadata.get("integration", [])
rule_integrations = meta.get("integration", [])
if rule_integrations:
for integration in rule_integrations:
if integration in ["windows", "endpoint", "apm"]:
Expand Down
Loading