Skip to content

Commit

Permalink
[FR] Add Integration Schema Query Validation (#2470)
Browse files Browse the repository at this point in the history
(cherry picked from commit 1784429)
  • Loading branch information
Mikaayenson authored and github-actions[bot] committed Feb 2, 2023
1 parent a604c81 commit b15d8a2
Show file tree
Hide file tree
Showing 54 changed files with 559 additions and 166 deletions.
12 changes: 12 additions & 0 deletions detection_rules/beats.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ def _flatten_schema(schema: list, prefix="") -> list:
# it's probably not perfect, but we can fix other bugs as we run into them later
if len(schema) == 1 and nested_prefix.startswith(prefix + prefix):
nested_prefix = s["name"] + "."
if "field" in s:
# integrations sometimes have a group with a single field
flattened.extend(_flatten_schema(s["field"], prefix=nested_prefix))
continue
elif "fields" not in s:
# integrations sometimes have a group with no fields
continue

flattened.extend(_flatten_schema(s["fields"], prefix=nested_prefix))
elif "fields" in s:
flattened.extend(_flatten_schema(s["fields"], prefix=prefix))
Expand All @@ -131,6 +139,10 @@ def _flatten_schema(schema: list, prefix="") -> list:
return flattened


def flatten_ecs_schema(schema: dict) -> dict:
return _flatten_schema(schema)


def get_field_schema(base_directory, prefix="", include_common=False):
base_directory = base_directory.get("folders", {}).get("_meta", {}).get("files", {})
flattened = []
Expand Down
47 changes: 43 additions & 4 deletions detection_rules/devtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
from .endgame import EndgameSchemaManager
from .eswrap import CollectEvents, add_range_to_dsl
from .ghwrap import GithubClient, update_gist
from .integrations import build_integrations_manifest
from .integrations import (build_integrations_manifest, build_integrations_schemas, find_latest_compatible_version,
load_integrations_manifests)
from .main import root
from .misc import PYTHON_LICENSE, add_client, client_error
from .packaging import (CURRENT_RELEASE_PATH, PACKAGE_FILE, RELEASE_DIR,
Expand Down Expand Up @@ -1174,10 +1175,48 @@ def integrations_group():
def build_integration_manifests(overwrite: bool):
"""Builds consolidated integrations manifests file."""
click.echo("loading rules to determine all integration tags")

def flatten(tag_list: List[str]) -> List[str]:
return list(set([tag for tags in tag_list for tag in (flatten(tags) if isinstance(tags, list) else [tags])]))

rules = RuleCollection.default()
integration_tags = list(set([r.contents.metadata.integration for r in rules if r.contents.metadata.integration]))
click.echo(f"integration tags identified: {integration_tags}")
build_integrations_manifest(overwrite, integration_tags)
integration_tags = [r.contents.metadata.integration for r in rules if r.contents.metadata.integration]
unique_integration_tags = flatten(integration_tags)
click.echo(f"integration tags identified: {unique_integration_tags}")
build_integrations_manifest(overwrite, unique_integration_tags)


@integrations_group.command('build-schemas')
@click.option('--overwrite', '-o', is_flag=True, help="Overwrite the entire integrations-schema.json.gz file")
def build_integration_schemas(overwrite: bool):
"""Builds consolidated integrations schemas file."""
click.echo("Building integration schemas...")

start_time = time.perf_counter()
build_integrations_schemas(overwrite)
end_time = time.perf_counter()
click.echo(f"Time taken to generate schemas: {(end_time - start_time)/60:.2f} minutes")


@integrations_group.command('show-latest-compatible')
@click.option('--package', '-p', help='Name of package')
@click.option('--stack_version', '-s', required=True, help='Rule stack version')
def show_latest_compatible_version(package: str, stack_version: str) -> None:
"""Prints the latest integration compatible version for specified package based on stack version supplied."""

packages_manifest = None
try:
packages_manifest = load_integrations_manifests()
except Exception as e:
click.echo(f"Error loading integrations manifests: {str(e)}")
return

try:
version = find_latest_compatible_version(package, "", stack_version, packages_manifest)
click.echo(f"Compatible integration {version=}")
except Exception as e:
click.echo(f"Error finding compatible version: {str(e)}")
return


@dev_group.group('schemas')
Expand Down
Binary file modified detection_rules/etc/integration-manifests.json.gz
Binary file not shown.
Binary file added detection_rules/etc/integration-schemas.json.gz
Binary file not shown.
183 changes: 178 additions & 5 deletions detection_rules/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,27 @@
# 2.0.

"""Functions to support and interact with Kibana integrations."""
import glob
import gzip
import json
import os
import re
from collections import OrderedDict
from pathlib import Path
from typing import Generator, Tuple, Union

import requests
import yaml
from marshmallow import EXCLUDE, Schema, fields, post_load

import kql

from . import ecs
from .beats import flatten_ecs_schema
from .semver import Version
from .utils import cached, get_etc_path, read_gzip
from .utils import cached, get_etc_path, read_gzip, unzip

MANIFEST_FILE_PATH = Path(get_etc_path('integration-manifests.json.gz'))
SCHEMA_FILE_PATH = Path(get_etc_path('integration-schemas.json.gz'))


@cached
Expand All @@ -26,11 +33,18 @@ def load_integrations_manifests() -> dict:
return json.loads(read_gzip(get_etc_path('integration-manifests.json.gz')))


@cached
def load_integrations_schemas() -> dict:
"""Load the consolidated integrations schemas."""
return json.loads(read_gzip(get_etc_path('integration-schemas.json.gz')))


class IntegrationManifestSchema(Schema):
name = fields.Str(required=True)
version = fields.Str(required=True)
release = fields.Str(required=True)
description = fields.Str(required=True)
download = fields.Str(required=True)
conditions = fields.Dict(required=True)
policy_templates = fields.List(fields.Dict, required=True)
owner = fields.Dict(required=False)
Expand All @@ -44,8 +58,8 @@ def transform_policy_template(self, data, **kwargs):
def build_integrations_manifest(overwrite: bool, rule_integrations: list) -> None:
"""Builds a new local copy of manifest.yaml from integrations Github."""
if overwrite:
if os.path.exists(MANIFEST_FILE_PATH):
os.remove(MANIFEST_FILE_PATH)
if MANIFEST_FILE_PATH.exists():
MANIFEST_FILE_PATH.unlink()

final_integration_manifests = {integration: {} for integration in rule_integrations}

Expand All @@ -62,6 +76,63 @@ def build_integrations_manifest(overwrite: bool, rule_integrations: list) -> Non
print(f"final integrations manifests dumped: {MANIFEST_FILE_PATH}")


def build_integrations_schemas(overwrite: bool) -> None:
"""Builds a new local copy of integration-schemas.json.gz from EPR integrations."""

final_integration_schemas = {}
saved_integration_schemas = {}

# Check if the file already exists and handle accordingly
if overwrite and SCHEMA_FILE_PATH.exists():
SCHEMA_FILE_PATH.unlink()
elif SCHEMA_FILE_PATH.exists():
saved_integration_schemas = load_integrations_schemas()

# Load the integration manifests
integration_manifests = load_integrations_manifests()

# Loop through the packages and versions
for package, versions in integration_manifests.items():
print(f"processing {package}")
final_integration_schemas.setdefault(package, {})
for version, manifest in versions.items():
if package in saved_integration_schemas and version in saved_integration_schemas[package]:
continue

# Download the zip file
download_url = f"https://epr.elastic.co{manifest['download']}"
response = requests.get(download_url)
response.raise_for_status()

# Update the final integration schemas
final_integration_schemas[package].update({version: {}})

# Open the zip file
with unzip(response.content) as zip_ref:
for file in zip_ref.namelist():
# Check if the file is a match
if glob.fnmatch.fnmatch(file, '*/fields/*.yml'):
integration_name = Path(file).parent.parent.name
final_integration_schemas[package][version].setdefault(integration_name, {})
file_data = zip_ref.read(file)
schema_fields = yaml.safe_load(file_data)

# Parse the schema and add to the integration_manifests
data = flatten_ecs_schema(schema_fields)
flat_data = {field['name']: field['type'] for field in data}

final_integration_schemas[package][version][integration_name].update(flat_data)

del file_data

# Write the final integration schemas to disk
with gzip.open(SCHEMA_FILE_PATH, "w") as schema_file:
schema_file_bytes = json.dumps(final_integration_schemas).encode("utf-8")
schema_file.write(schema_file_bytes)

print(f"final integrations manifests dumped: {SCHEMA_FILE_PATH}")


def find_least_compatible_version(package: str, integration: str,
current_stack_version: str, packages_manifest: dict) -> str:
"""Finds least compatible version for specified integration based on stack version supplied."""
Expand Down Expand Up @@ -89,12 +160,54 @@ def find_least_compatible_version(package: str, integration: str,
raise ValueError(f"no compatible version for integration {package}:{integration}")


def find_latest_compatible_version(package: str, integration: str,
rule_stack_version: str, packages_manifest: dict) -> Union[None, Tuple[str, str]]:
"""Finds least compatible version for specified integration based on stack version supplied."""

if not package:
raise ValueError("Package must be specified")

package_manifest = packages_manifest.get(package)
if package_manifest is None:
raise ValueError(f"Package {package} not found in manifest.")

# Converts the dict keys (version numbers) to Version objects for proper sorting (descending)
integration_manifests = sorted(package_manifest.items(), key=lambda x: Version(str(x[0])), reverse=True)
notice = ""

for version, manifest in integration_manifests:
kibana_conditions = manifest.get("conditions", {}).get("kibana", {})
version_requirement = kibana_conditions.get("version")
if not version_requirement:
raise ValueError(f"Manifest for {package}:{integration} version {version} is missing conditions.")

compatible_versions = re.sub(r"\>|\<|\=|\^", "", version_requirement).split(" || ")

if not compatible_versions:
raise ValueError(f"Manifest for {package}:{integration} version {version} is missing compatible versions")

highest_compatible_version = max(compatible_versions, key=lambda x: Version(x))

if Version(highest_compatible_version) > Version(rule_stack_version):
# generate notice message that a later integration version is available
integration = f" {integration.strip()}" if integration else ""

notice = (f"There is a new integration {package}{integration} version {version} available!",
f"Update the rule min_stack version from {rule_stack_version} to "
f"{highest_compatible_version} if using new features in this latest version.")

elif int(highest_compatible_version[0]) == int(rule_stack_version[0]):
return version, notice

raise ValueError(f"no compatible version for integration {package}:{integration}")


def get_integration_manifests(integration: str) -> list:
"""Iterates over specified integrations from package-storage and combines manifests per version."""
epr_search_url = "https://epr.elastic.co/search"

# link for search parameters - https://github.com/elastic/package-registry
epr_search_parameters = {"package": f"{integration}", "prerelease": "true",
epr_search_parameters = {"package": f"{integration}", "prerelease": "false",
"all": "true", "include_policy_templates": "true"}
epr_search_response = requests.get(epr_search_url, params=epr_search_parameters)
epr_search_response.raise_for_status()
Expand All @@ -106,3 +219,63 @@ def get_integration_manifests(integration: str) -> list:
print(f"loaded {integration} manifests from the following package versions: "
f"{[manifest['version'] for manifest in manifests]}")
return manifests


def get_integration_schema_data(data, meta, package_integrations: dict) -> Generator[dict, None, None]:
"""Iterates over specified integrations from package-storage and combines schemas per version."""

# lazy import to avoid circular import
from .rule import ( # pylint: disable=import-outside-toplevel
QueryRuleData, RuleMeta
)

data: QueryRuleData = data
meta: RuleMeta = meta

packages_manifest = load_integrations_manifests()
integrations_schemas = load_integrations_schemas()

# validate the query against related integration fields
if isinstance(data, QueryRuleData) and data.language != 'lucene' and meta.maturity == "production":

# flag to only warn once per integration for available upgrades
notify_update_available = True

for stack_version, mapping in meta.get_validation_stack_versions().items():
ecs_version = mapping['ecs']
endgame_version = mapping['endgame']

ecs_schema = ecs.flatten_multi_fields(ecs.get_schema(ecs_version, name='ecs_flat'))

for pk_int in package_integrations:
package = pk_int["package"]
integration = pk_int["integration"]

package_version, notice = find_latest_compatible_version(package=package,
integration=integration,
rule_stack_version=meta.min_stack_version,
packages_manifest=packages_manifest)

if notify_update_available and notice and data.get("notify", False):
# Notify for now, as to not lock rule stacks to integrations
notify_update_available = False
print(f"\n{data.get('name')}")
print(*notice)

schema = {}
if integration is None:
# Use all fields from each dataset
for dataset in integrations_schemas[package][package_version]:
schema.update(integrations_schemas[package][package_version][dataset])
else:
if integration not in integrations_schemas[package][package_version]:
raise ValueError(f"Integration {integration} not found in package {package} "
f"version {package_version}")
schema = integrations_schemas[package][package_version][integration]
schema.update(ecs_schema)
integration_schema = {k: kql.parser.elasticsearch_type_family(v) for k, v in schema.items()}

data = {"schema": integration_schema, "package": package, "integration": integration,
"stack_version": stack_version, "ecs_version": ecs_version,
"package_version": package_version, "endgame_version": endgame_version}
yield data
16 changes: 11 additions & 5 deletions detection_rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ def get_restricted_fields(self) -> Optional[Dict[str, tuple]]:
def data_validator(self) -> Optional['DataValidator']:
return DataValidator(is_elastic_rule=self.is_elastic_rule, **self.to_dict())

@cached_property
def notify(self) -> bool:
return os.environ.get('DR_NOTIFY_INTEGRATION_UPDATE_AVAILABLE') is not None

@cached_property
def parsed_note(self) -> Optional[MarkoDocument]:
dv = self.data_validator
Expand Down Expand Up @@ -847,7 +851,7 @@ def _add_related_integrations(self, obj: dict) -> None:

if self.check_restricted_field_version(field_name):
if isinstance(self.data, QueryRuleData) and self.data.language != 'lucene':
package_integrations = self._get_packaged_integrations(packages_manifest)
package_integrations = self.get_packaged_integrations(self.data, self.metadata, packages_manifest)

if not package_integrations:
return
Expand Down Expand Up @@ -947,11 +951,13 @@ def compare_field_versions(min_stack: Version, max_stack: Version) -> bool:
max_stack = max_stack or current_version
return Version(min_stack) <= current_version >= Version(max_stack)

def _get_packaged_integrations(self, package_manifest: dict) -> Optional[List[dict]]:
@classmethod
def get_packaged_integrations(cls, data: QueryRuleData, meta: RuleMeta,
package_manifest: dict) -> Optional[List[dict]]:
packaged_integrations = []
datasets = set()

for node in self.data.get('ast', []):
for node in data.get('ast', []):
if isinstance(node, eql.ast.Comparison) and str(node.left) == 'event.dataset':
datasets.update(set(n.value for n in node if isinstance(n, eql.ast.Literal)))
elif isinstance(node, FieldComparison) and str(node.field) == 'event.dataset':
Expand All @@ -960,10 +966,10 @@ def _get_packaged_integrations(self, package_manifest: dict) -> Optional[List[di
if not datasets:
# windows and endpoint integration do not have event.dataset fields in queries
# integration is None to remove duplicate references upstream in Kibana
rule_integrations = self.metadata.get("integration", [])
rule_integrations = meta.get("integration", [])
if rule_integrations:
for integration in rule_integrations:
if integration in ["windows", "endpoint", "apm"]:
if integration in definitions.NON_DATASET_PACKAGES:
packaged_integrations.append({"package": integration, "integration": None})

for value in sorted(datasets):
Expand Down
Loading

0 comments on commit b15d8a2

Please sign in to comment.