Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FR] Add Integration Schema Query Validation #2470

Merged
merged 53 commits into from
Feb 2, 2023
Merged
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
4308f9a
add method to get packge integration window
Mikaayenson Jan 15, 2023
f50fa7f
update get_packaged_integrations func name
Mikaayenson Jan 15, 2023
06bcc3f
Loosen restrictions for some edge cases
Mikaayenson Jan 15, 2023
b0f828e
Add method to build flatten integration schemas based on pulled inte…
Mikaayenson Jan 16, 2023
1d609e1
Add a method to load compressed integration schemas from disk
Mikaayenson Jan 16, 2023
ce3b82e
update schema
Mikaayenson Jan 16, 2023
85da15f
update compressed schema to include integration key and logic to vali…
Mikaayenson Jan 16, 2023
38ab887
add logic to validate eql rules against integration schemas
Mikaayenson Jan 17, 2023
1775e3f
update schema name
Mikaayenson Jan 17, 2023
e8f05bb
Add todo
Mikaayenson Jan 17, 2023
2d49789
check for field key before continue
Mikaayenson Jan 17, 2023
faec3dd
update a delta of the integration schema based on manifest
Mikaayenson Jan 17, 2023
48c83a5
expose find_compatible_version_window, small cleanup, and move integr…
Mikaayenson Jan 19, 2023
c8630d2
refactor logic to use latest integration versions
Mikaayenson Jan 20, 2023
f4abcf0
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 20, 2023
4c74cf3
update cli command to reflect latest version changes
Mikaayenson Jan 20, 2023
f3509c5
print rule name before verbose integration output
Mikaayenson Jan 20, 2023
7c35e61
Only prod rules
Mikaayenson Jan 25, 2023
8942784
Merge branch '1994-add-integration-specific-query-validation' of gith…
Mikaayenson Jan 25, 2023
171e1ee
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 25, 2023
3460f4a
Update integrations.py
Mikaayenson Jan 25, 2023
ff7f29a
move maturity check up
Mikaayenson Jan 26, 2023
8cda74c
Merge branch '1994-add-integration-specific-query-validation' of gith…
Mikaayenson Jan 26, 2023
a6f33cb
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 26, 2023
6d3c475
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 27, 2023
d1312b7
warn once for available integration upgrades
Mikaayenson Jan 27, 2023
a02edf2
validate rules with endgame index and add non-ecs-mapping to schema
Mikaayenson Jan 27, 2023
3eefb93
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 27, 2023
4de416f
lint cleanup
Mikaayenson Jan 27, 2023
d6ebfe4
Merge branch '1994-add-integration-specific-query-validation' of gith…
Mikaayenson Jan 27, 2023
57ff704
refactor endgame check within available loop
Mikaayenson Jan 27, 2023
d184621
return endgame schema
Mikaayenson Jan 30, 2023
baa23d7
rebuild manifest
Mikaayenson Jan 30, 2023
407eea8
rebuild schema without prerelease packages
Mikaayenson Jan 30, 2023
ba8b7b1
add in validation to raise error
Mikaayenson Jan 30, 2023
5002d8e
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 30, 2023
e4caab4
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 31, 2023
75f2750
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Jan 31, 2023
9ba8f33
check unknown fields aginst the combined schemas per stack version
Mikaayenson Feb 1, 2023
12347f5
Catch missing tags in integration unit test
Mikaayenson Feb 1, 2023
5e5b484
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Feb 1, 2023
04cdae1
Update manifests and schemas
Mikaayenson Feb 1, 2023
e9267e3
Update integration tag unit test to catch missing index and integrati…
Mikaayenson Feb 1, 2023
58eb52f
Merge branch '1994-add-integration-specific-query-validation' of gith…
Mikaayenson Feb 1, 2023
9ceabc3
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Feb 1, 2023
888caea
only show the notice for later integraiotn once
Mikaayenson Feb 1, 2023
3ef46e7
update notice message
Mikaayenson Feb 1, 2023
9aa6ca9
Merge branch '1994-add-integration-specific-query-validation' of gith…
Mikaayenson Feb 2, 2023
5cbfb54
update schemas
Mikaayenson Feb 2, 2023
ff6a64f
Merge branch 'main' into 1994-add-integration-specific-query-validation
Mikaayenson Feb 2, 2023
ca83ae1
notify only if env variable set.
Mikaayenson Feb 2, 2023
011511a
get the data nofify field
Mikaayenson Feb 2, 2023
cc70eb5
Merge branch '1994-add-integration-specific-query-validation' of gith…
Mikaayenson Feb 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions detection_rules/beats.py
Original file line number Diff line number Diff line change
@@ -117,6 +117,14 @@ def _flatten_schema(schema: list, prefix="") -> list:
# it's probably not perfect, but we can fix other bugs as we run into them later
if len(schema) == 1 and nested_prefix.startswith(prefix + prefix):
nested_prefix = s["name"] + "."
if "field" in s:
# integrations sometimes have a group with a single field
flattened.extend(_flatten_schema(s["field"], prefix=nested_prefix))
continue
elif "fields" not in s:
# integrations sometimes have a group with no fields
continue

flattened.extend(_flatten_schema(s["fields"], prefix=nested_prefix))
elif "fields" in s:
flattened.extend(_flatten_schema(s["fields"], prefix=prefix))
@@ -131,6 +139,10 @@ def _flatten_schema(schema: list, prefix="") -> list:
return flattened


def flatten_ecs_schema(schema: dict) -> dict:
return _flatten_schema(schema)


def get_field_schema(base_directory, prefix="", include_common=False):
base_directory = base_directory.get("folders", {}).get("_meta", {}).get("files", {})
flattened = []
47 changes: 43 additions & 4 deletions detection_rules/devtools.py
Original file line number Diff line number Diff line change
@@ -33,7 +33,8 @@
from .endgame import EndgameSchemaManager
from .eswrap import CollectEvents, add_range_to_dsl
from .ghwrap import GithubClient, update_gist
from .integrations import build_integrations_manifest
from .integrations import (build_integrations_manifest, build_integrations_schemas, find_latest_compatible_version,
load_integrations_manifests)
from .main import root
from .misc import PYTHON_LICENSE, add_client, client_error
from .packaging import (CURRENT_RELEASE_PATH, PACKAGE_FILE, RELEASE_DIR,
@@ -1174,10 +1175,48 @@ def integrations_group():
def build_integration_manifests(overwrite: bool):
"""Builds consolidated integrations manifests file."""
click.echo("loading rules to determine all integration tags")

def flatten(tag_list: List[str]) -> List[str]:
return list(set([tag for tags in tag_list for tag in (flatten(tags) if isinstance(tags, list) else [tags])]))

rules = RuleCollection.default()
integration_tags = list(set([r.contents.metadata.integration for r in rules if r.contents.metadata.integration]))
click.echo(f"integration tags identified: {integration_tags}")
build_integrations_manifest(overwrite, integration_tags)
integration_tags = [r.contents.metadata.integration for r in rules if r.contents.metadata.integration]
unique_integration_tags = flatten(integration_tags)
click.echo(f"integration tags identified: {unique_integration_tags}")
build_integrations_manifest(overwrite, unique_integration_tags)


@integrations_group.command('build-schemas')
@click.option('--overwrite', '-o', is_flag=True, help="Overwrite the entire integrations-schema.json.gz file")
def build_integration_schemas(overwrite: bool):
"""Builds consolidated integrations schemas file."""
click.echo("Building integration schemas...")

start_time = time.perf_counter()
build_integrations_schemas(overwrite)
end_time = time.perf_counter()
click.echo(f"Time taken to generate schemas: {(end_time - start_time)/60:.2f} minutes")


@integrations_group.command('show-latest-compatible')
@click.option('--package', '-p', help='Name of package')
@click.option('--stack_version', '-s', required=True, help='Rule stack version')
def show_latest_compatible_version(package: str, stack_version: str) -> None:
"""Prints the latest integration compatible version for specified package based on stack version supplied."""

packages_manifest = None
try:
packages_manifest = load_integrations_manifests()
except Exception as e:
click.echo(f"Error loading integrations manifests: {str(e)}")
return

try:
version = find_latest_compatible_version(package, "", stack_version, packages_manifest)
click.echo(f"Compatible integration {version=}")
except Exception as e:
click.echo(f"Error finding compatible version: {str(e)}")
return


@dev_group.group('schemas')
Binary file modified detection_rules/etc/integration-manifests.json.gz
Binary file not shown.
Binary file not shown.
183 changes: 178 additions & 5 deletions detection_rules/integrations.py
Original file line number Diff line number Diff line change
@@ -4,20 +4,27 @@
# 2.0.

"""Functions to support and interact with Kibana integrations."""
import glob
import gzip
import json
import os
import re
from collections import OrderedDict
from pathlib import Path
from typing import Generator, Tuple, Union

import requests
import yaml
from marshmallow import EXCLUDE, Schema, fields, post_load

import kql

from . import ecs
from .beats import flatten_ecs_schema
from .semver import Version
from .utils import cached, get_etc_path, read_gzip
from .utils import cached, get_etc_path, read_gzip, unzip

MANIFEST_FILE_PATH = Path(get_etc_path('integration-manifests.json.gz'))
SCHEMA_FILE_PATH = Path(get_etc_path('integration-schemas.json.gz'))


@cached
@@ -26,11 +33,18 @@ def load_integrations_manifests() -> dict:
return json.loads(read_gzip(get_etc_path('integration-manifests.json.gz')))


@cached
def load_integrations_schemas() -> dict:
"""Load the consolidated integrations schemas."""
return json.loads(read_gzip(get_etc_path('integration-schemas.json.gz')))


class IntegrationManifestSchema(Schema):
name = fields.Str(required=True)
version = fields.Str(required=True)
release = fields.Str(required=True)
description = fields.Str(required=True)
download = fields.Str(required=True)
conditions = fields.Dict(required=True)
policy_templates = fields.List(fields.Dict, required=True)
owner = fields.Dict(required=False)
@@ -44,8 +58,8 @@ def transform_policy_template(self, data, **kwargs):
def build_integrations_manifest(overwrite: bool, rule_integrations: list) -> None:
"""Builds a new local copy of manifest.yaml from integrations Github."""
if overwrite:
if os.path.exists(MANIFEST_FILE_PATH):
os.remove(MANIFEST_FILE_PATH)
if MANIFEST_FILE_PATH.exists():
MANIFEST_FILE_PATH.unlink()

final_integration_manifests = {integration: {} for integration in rule_integrations}

@@ -62,6 +76,63 @@ def build_integrations_manifest(overwrite: bool, rule_integrations: list) -> Non
print(f"final integrations manifests dumped: {MANIFEST_FILE_PATH}")


def build_integrations_schemas(overwrite: bool) -> None:
"""Builds a new local copy of integration-schemas.json.gz from EPR integrations."""

final_integration_schemas = {}
saved_integration_schemas = {}

# Check if the file already exists and handle accordingly
if overwrite and SCHEMA_FILE_PATH.exists():
SCHEMA_FILE_PATH.unlink()
elif SCHEMA_FILE_PATH.exists():
saved_integration_schemas = load_integrations_schemas()

# Load the integration manifests
integration_manifests = load_integrations_manifests()

# Loop through the packages and versions
for package, versions in integration_manifests.items():
print(f"processing {package}")
final_integration_schemas.setdefault(package, {})
for version, manifest in versions.items():
if package in saved_integration_schemas and version in saved_integration_schemas[package]:
continue

# Download the zip file
download_url = f"https://epr.elastic.co{manifest['download']}"
response = requests.get(download_url)
response.raise_for_status()

# Update the final integration schemas
final_integration_schemas[package].update({version: {}})

# Open the zip file
with unzip(response.content) as zip_ref:
for file in zip_ref.namelist():
# Check if the file is a match
if glob.fnmatch.fnmatch(file, '*/fields/*.yml'):
integration_name = Path(file).parent.parent.name
final_integration_schemas[package][version].setdefault(integration_name, {})
file_data = zip_ref.read(file)
schema_fields = yaml.safe_load(file_data)

# Parse the schema and add to the integration_manifests
data = flatten_ecs_schema(schema_fields)
flat_data = {field['name']: field['type'] for field in data}

final_integration_schemas[package][version][integration_name].update(flat_data)

del file_data

# Write the final integration schemas to disk
with gzip.open(SCHEMA_FILE_PATH, "w") as schema_file:
schema_file_bytes = json.dumps(final_integration_schemas).encode("utf-8")
schema_file.write(schema_file_bytes)

print(f"final integrations manifests dumped: {SCHEMA_FILE_PATH}")


def find_least_compatible_version(package: str, integration: str,
current_stack_version: str, packages_manifest: dict) -> str:
"""Finds least compatible version for specified integration based on stack version supplied."""
@@ -89,12 +160,54 @@ def find_least_compatible_version(package: str, integration: str,
raise ValueError(f"no compatible version for integration {package}:{integration}")


def find_latest_compatible_version(package: str, integration: str,
rule_stack_version: str, packages_manifest: dict) -> Union[None, Tuple[str, str]]:
"""Finds least compatible version for specified integration based on stack version supplied."""

if not package:
raise ValueError("Package must be specified")

package_manifest = packages_manifest.get(package)
if package_manifest is None:
raise ValueError(f"Package {package} not found in manifest.")

# Converts the dict keys (version numbers) to Version objects for proper sorting (descending)
integration_manifests = sorted(package_manifest.items(), key=lambda x: Version(str(x[0])), reverse=True)
notice = ""

for version, manifest in integration_manifests:
kibana_conditions = manifest.get("conditions", {}).get("kibana", {})
version_requirement = kibana_conditions.get("version")
if not version_requirement:
raise ValueError(f"Manifest for {package}:{integration} version {version} is missing conditions.")

compatible_versions = re.sub(r"\>|\<|\=|\^", "", version_requirement).split(" || ")

if not compatible_versions:
raise ValueError(f"Manifest for {package}:{integration} version {version} is missing compatible versions")

highest_compatible_version = max(compatible_versions, key=lambda x: Version(x))

if Version(highest_compatible_version) > Version(rule_stack_version):
# generate notice message that a later integration version is available
integration = f" {integration.strip()}" if integration else ""

notice = (f"There is a new integration {package}{integration} version {version} available!",
f"Update the rule min_stack version from {rule_stack_version} to "
f"{highest_compatible_version} if using new features in this latest version.")

elif int(highest_compatible_version[0]) == int(rule_stack_version[0]):
return version, notice

raise ValueError(f"no compatible version for integration {package}:{integration}")


def get_integration_manifests(integration: str) -> list:
"""Iterates over specified integrations from package-storage and combines manifests per version."""
epr_search_url = "https://epr.elastic.co/search"

# link for search parameters - https://github.com/elastic/package-registry
epr_search_parameters = {"package": f"{integration}", "prerelease": "true",
epr_search_parameters = {"package": f"{integration}", "prerelease": "false",
"all": "true", "include_policy_templates": "true"}
epr_search_response = requests.get(epr_search_url, params=epr_search_parameters)
epr_search_response.raise_for_status()
@@ -106,3 +219,63 @@ def get_integration_manifests(integration: str) -> list:
print(f"loaded {integration} manifests from the following package versions: "
f"{[manifest['version'] for manifest in manifests]}")
return manifests


def get_integration_schema_data(data, meta, package_integrations: dict) -> Generator[dict, None, None]:
"""Iterates over specified integrations from package-storage and combines schemas per version."""

# lazy import to avoid circular import
from .rule import ( # pylint: disable=import-outside-toplevel
QueryRuleData, RuleMeta
)

data: QueryRuleData = data
meta: RuleMeta = meta

packages_manifest = load_integrations_manifests()
integrations_schemas = load_integrations_schemas()

# validate the query against related integration fields
if isinstance(data, QueryRuleData) and data.language != 'lucene' and meta.maturity == "production":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe data.language should use data.get()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then again, this case (where data.language is invalid/not set) might have already been caught in schema validation.


# flag to only warn once per integration for available upgrades
notify_update_available = True

for stack_version, mapping in meta.get_validation_stack_versions().items():
ecs_version = mapping['ecs']
endgame_version = mapping['endgame']

ecs_schema = ecs.flatten_multi_fields(ecs.get_schema(ecs_version, name='ecs_flat'))

for pk_int in package_integrations:
package = pk_int["package"]
integration = pk_int["integration"]

package_version, notice = find_latest_compatible_version(package=package,
integration=integration,
rule_stack_version=meta.min_stack_version,
packages_manifest=packages_manifest)

if notify_update_available and notice and data.get("notify", False):
# Notify for now, as to not lock rule stacks to integrations
notify_update_available = False
print(f"\n{data.get('name')}")
print(*notice)

schema = {}
if integration is None:
# Use all fields from each dataset
for dataset in integrations_schemas[package][package_version]:
schema.update(integrations_schemas[package][package_version][dataset])
else:
if integration not in integrations_schemas[package][package_version]:
raise ValueError(f"Integration {integration} not found in package {package} "
f"version {package_version}")
schema = integrations_schemas[package][package_version][integration]
schema.update(ecs_schema)
integration_schema = {k: kql.parser.elasticsearch_type_family(v) for k, v in schema.items()}

data = {"schema": integration_schema, "package": package, "integration": integration,
"stack_version": stack_version, "ecs_version": ecs_version,
"package_version": package_version, "endgame_version": endgame_version}
yield data
16 changes: 11 additions & 5 deletions detection_rules/rule.py
Original file line number Diff line number Diff line change
@@ -240,6 +240,10 @@ def get_restricted_fields(self) -> Optional[Dict[str, tuple]]:
def data_validator(self) -> Optional['DataValidator']:
return DataValidator(is_elastic_rule=self.is_elastic_rule, **self.to_dict())

@cached_property
def notify(self) -> bool:
return os.environ.get('DR_NOTIFY_INTEGRATION_UPDATE_AVAILABLE') is not None

@cached_property
def parsed_note(self) -> Optional[MarkoDocument]:
dv = self.data_validator
@@ -847,7 +851,7 @@ def _add_related_integrations(self, obj: dict) -> None:

if self.check_restricted_field_version(field_name):
if isinstance(self.data, QueryRuleData) and self.data.language != 'lucene':
package_integrations = self._get_packaged_integrations(packages_manifest)
package_integrations = self.get_packaged_integrations(self.data, self.metadata, packages_manifest)

if not package_integrations:
return
@@ -947,11 +951,13 @@ def compare_field_versions(min_stack: Version, max_stack: Version) -> bool:
max_stack = max_stack or current_version
return Version(min_stack) <= current_version >= Version(max_stack)

def _get_packaged_integrations(self, package_manifest: dict) -> Optional[List[dict]]:
@classmethod
def get_packaged_integrations(cls, data: QueryRuleData, meta: RuleMeta,
package_manifest: dict) -> Optional[List[dict]]:
packaged_integrations = []
datasets = set()

for node in self.data.get('ast', []):
for node in data.get('ast', []):
if isinstance(node, eql.ast.Comparison) and str(node.left) == 'event.dataset':
datasets.update(set(n.value for n in node if isinstance(n, eql.ast.Literal)))
elif isinstance(node, FieldComparison) and str(node.field) == 'event.dataset':
@@ -960,10 +966,10 @@ def _get_packaged_integrations(self, package_manifest: dict) -> Optional[List[di
if not datasets:
# windows and endpoint integration do not have event.dataset fields in queries
# integration is None to remove duplicate references upstream in Kibana
rule_integrations = self.metadata.get("integration", [])
rule_integrations = meta.get("integration", [])
if rule_integrations:
for integration in rule_integrations:
if integration in ["windows", "endpoint", "apm"]:
if integration in definitions.NON_DATASET_PACKAGES:
packaged_integrations.append({"package": integration, "integration": None})

for value in sorted(datasets):
Loading