Skip to content

Commit

Permalink
Merge pull request ComplianceAsCode#11194 from yuumasato/jqfilters_on…
Browse files Browse the repository at this point in the history
…_add_rule_script

Rename `add_platform_rule.py` to `add_kubernetes_rule.py` and add support for `jq`
  • Loading branch information
rhmdnd authored Oct 30, 2023
2 parents f5af13c + 09d23a6 commit bd33b07
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 106 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ release_tools/*.log
release_tools/release_notes.txt
release_tools/artifacts

# Ignore the test profile that utils/add_platform_rule.py creates
# Ignore the test profile that utils/add_kubernetes_rule.py creates
ocp4/profiles/test.profile

# Ignore the build profiling files
Expand Down
13 changes: 8 additions & 5 deletions docs/manual/developer/06_contributing_with_content.md
Original file line number Diff line number Diff line change
Expand Up @@ -640,19 +640,22 @@ fixes with the following commands:
This utility requires an up-to-date JSON tree created by
`rule_dir_json.py`.

#### `utils/add_platform_rule.py`
#### `utils/add_kubernetes_rule.py`

This utility can be used to bootstrap and test Kubernetes/OpenShift
application checks. See the help output for more detailed usage examples
of each of the supported subcommands:

- `utils/add_platform_rule.py create --rule=<rule_name> <options>` -
creates files for a new rule.
- `utils/add_kubernetes_rule.py create platform --rule=<rule_name> <options>` -
creates files for a new platform rule.

- `utils/add_platform_rule.py test --rule=<rule_name> <options>` -
- `utils/add_kubernetes_rule.py create node --rule=<rule_name> <options>` -
creates files for a new node rule.

- `utils/add_kubernetes_rule.py test --rule=<rule_name> <options>` -
tests a rule against local files using an oscap container.

- `utils/add_platform_rule.py cluster-test --rule=<rule_name> <options>`
- `utils/add_kubernetes_rule.py cluster-test --rule=<rule_name> <options>`
- tests a rule against a running OCP4 cluster using
compliance-operator.

Expand Down
204 changes: 104 additions & 100 deletions utils/add_platform_rule.py → utils/add_kubernetes_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,25 @@
import sys
import textwrap
import os
import re
import time
import yaml

from ssg.utils import mkdir_p

class JinjaString(str):
pass

def jinja_string_representer(dumper, data):
# yuumasato: The Jinja syntax is not compatible with YAML as curly braces are special characters.
# So we use an exotic token to mark where opening and closing Jinja tags would be, they are substituted
# by actual `{{{` and `}}}` right before being written to file.
# A the moment this is restricted to macro calls, but could be expanded to conditionals as well.
sanitized = re.sub(r"{{{", "JiNjA_OpEn", data)
sanitized = re.sub(r"}}}", "JiNjA_ClOsE", sanitized)
return dumper.represent_scalar(u'tag:yaml.org,2002:str', sanitized, style="|")

yaml.add_representer(JinjaString, jinja_string_representer)

PROG_DESC = (''' Create and test content files for Kubernetes API checks.
Expand All @@ -27,15 +42,15 @@
Example workflow:
$ utils/add_platform_rule.py create --rule=ocp_proxy_has_ca \
$ utils/add_kubernetes_rule.py create --rule=ocp_proxy_has_ca \
--type="proxies.config" --name="cluster" \
--yamlpath=".spec.trustedCA.name" --match="[a-zA-Z0-9]*"
creating check for "/apis/config.openshift.io/v1/proxies/cluster" with yamlpath ".spec.trustedCA.name" satisfying match of "[a-zA-Z0-9]*"
wrote applications/openshift/ocp_proxy_has_ca/rule.yml
$ mkdir -p /tmp/apis/config.openshift.io/v1/proxies/
$ oc get proxies.config/cluster -o yaml > /tmp/apis/config.openshift.io/v1/proxies/cluster
$ utils/add_platform_rule.py test --rule=ocp_proxy_has_ca
$ utils/add_kubernetes_rule.py test --rule=ocp_proxy_has_ca
testing rule ocp_proxy_has_ca locally
Title
None
Expand All @@ -46,7 +61,7 @@
Result
pass
$ utils/add_platform_rule.py cluster-test --rule=ocp_proxy_has_ca
$ utils/add_kubernetes_rule.py cluster-test --rule=ocp_proxy_has_ca
testing rule ocp_proxy_has_ca in-cluster
deploying compliance-operator
pushing image build to cluster
Expand All @@ -72,83 +87,24 @@
version: 4.6.0-0.ci-2020-06-15-112708
''')

PLATFORM_RULE_BASE = ('''prodtype: ocp4
title: {TITLE}
description: {DESC}
rationale: TBD
identifiers: {{}}
severity: {SEV}
warnings:
- general: |-
{{{{{{ openshift_cluster_setting("{URL}") | indent(4) }}}}}}
''')

YAML_TEMPLATE = ('''template:
name: yamlfile_value
vars:
ocp_data: "true"{ENTITY_CHECK}{CHECK_EXISTENCE}
filepath: {URL}
yamlpath: "{YAMLPATH}"
values:
- value: "{MATCH}"{CHECK_TYPE}
''')

YAML_TEMPLATE_W_VARIABLE = ('''template:
name: yamlfile_value
vars:
ocp_data: "true"{ENTITY_CHECK}{CHECK_EXISTENCE}
filepath: {URL}
yamlpath: "{YAMLPATH}"
xccdf_variable: {VARIABLE}
''')

NODE_RULE_BASE = ('''prodtype: ocp4
title: {TITLE}
platform: ocp4-node
description: {DESC}
rationale: TBD
identifiers: {{}}
severity: {SEV}
template:
name: {TEMPLATE_NAME}
vars:
{TEMPLATE_VARS}
''')


def operation_value(value):
def set_operation_value(value, template_vars):
if value:
return '\n operation: "pattern match"\n type: "string"'
else:
return ''
template_vars['operation'] = 'pattern match'
template_vars['type'] = 'string'


def entity_value(value):
def set_entity_value(value, template_vars):
if value is not None:
return '\n entity_check: "%s"' % value
else:
return ''
template_vars['entity_check'] = value

def check_existence_value(value):
def set_check_existence_value(value, template_vars):
if value is not None:
return '\n check_existence: "%s"' % value
else:
return ''
template_vars['check_existence'] = value

def set_template_vars(value, template_vars):
for var in value.split(","):
key, value = var.strip().split(":")
template_vars[key.strip()] = value.strip()

PROFILE_TEMPLATE = ('''documentation_complete: true
Expand Down Expand Up @@ -211,6 +167,40 @@ def which(program):
return None


def create_base_rule(args, url=None, node_rule=False):
rule_yaml = dict()
rule_yaml['documentation_complete'] = True
rule_yaml['prodtype'] = 'ocp4'
rule_yaml['title'] = args.title
if node_rule:
rule_yaml['platform'] = 'ocp4-node'
rule_yaml['description'] = args.description
rule_yaml['rationale'] = 'TBD'
rule_yaml['identifiers'] = dict()
rule_yaml['severity'] = args.severity
if args.jqfilter:
rule_yaml['warnings'] = [{'general': JinjaString("{{{ openshift_filtered_cluster_setting({'%s': '%s'}) | indent(4) }}}" % (url, args.jqfilter))}]
elif url:
rule_yaml['warnings'] = [{'general': JinjaString('{{{ openshift_cluster_setting("%s") | indent(4) }}}' % (url))}]
rule_yaml['template'] = dict()

return rule_yaml


def save_rule(rule_yaml_path, rule_yaml):
with open(rule_yaml_path, 'w') as f:
yaml_contents = yaml.dump(rule_yaml, None, indent=2, sort_keys=False, canonical=False, default_flow_style=False, width=120)
# Adds a blank line between keys
formatted_yaml_contents = re.sub(r"\n(\w+:.*)", r"\n\n\1", yaml_contents)

# Replace placeholders for CaC/content Jinja2 expressions
formatted_yaml_contents = re.sub(r"JiNjA_OpEn", r"{{{", formatted_yaml_contents)
formatted_yaml_contents = re.sub(r"JiNjA_ClOsE", r"}}}", formatted_yaml_contents)

f.write(formatted_yaml_contents)
print('* Wrote ' + rule_yaml_path)


def createNodeRuleFunc(args):
group_path = os.path.join(OCP_RULE_DIR, args.group)
if args.group:
Expand All @@ -223,15 +213,16 @@ def createNodeRuleFunc(args):
rule_yaml_path = os.path.join(rule_path, 'rule.yml')

mkdir_p(rule_path)
with open(rule_yaml_path, 'w') as f:
var_list = [ f" {k.strip()}" for k in args.template_vars.split(",")]
processed_template_vars = "\n".join(var_list)
f.write(NODE_RULE_BASE.format(TITLE=args.title, SEV=args.severity, IDENT=args.identifiers,
DESC=args.description, TEMPLATE_NAME=args.template,
TEMPLATE_VARS=processed_template_vars
))

print('* Wrote ' + rule_yaml_path)
rule_yaml = create_base_rule(args)

template = rule_yaml['template']
template['name'] = args.template

template['vars'] = dict()
template_vars = set_template_vars(args.template_vars, template['vars'])

save_rule(rule_yaml_path, rule_yaml)
return 0


Expand Down Expand Up @@ -295,24 +286,35 @@ def createPlatformRuleFunc(args):
rule_yaml_path = os.path.join(rule_path, 'rule.yml')

mkdir_p(rule_path)
with open(rule_yaml_path, 'w') as f:
f.write(PLATFORM_RULE_BASE.format(URL=url, TITLE=args.title, SEV=args.severity, IDENT=args.identifiers,
DESC=args.description, YAMLPATH=args.yamlpath,
))
if args.match:
f.write(YAML_TEMPLATE.format(URL=url, YAMLPATH=args.yamlpath, MATCH=args.match,
NEGATE=str(args.negate).lower(),
CHECK_TYPE=operation_value(args.regex),
CHECK_EXISTENCE=check_existence_value(args.check_existence),
ENTITY_CHECK=entity_value(args.match_entity)))
else:
f.write(YAML_TEMPLATE_W_VARIABLE.format(URL=url, YAMLPATH=args.yamlpath,
VARIABLE=(args.variable),
NEGATE=str(args.negate).lower(),
CHECK_TYPE=operation_value(args.regex),
CHECK_EXISTENCE=check_existence_value(args.check_existence),
ENTITY_CHECK=entity_value(args.match_entity)))
print('* Wrote ' + rule_yaml_path)

rule_yaml = create_base_rule(args, url)

template = rule_yaml['template']
template['name'] = 'yamlfile_value'

template['vars'] = dict()
template_vars = template['vars']
template_vars['ocp_data'] = "true"
if args.jqfilter:
template_vars['filepath'] = JinjaString("{{{ openshift_filtered_path('%s', '%s') }}}" % (url, args.jqfilter))
else:
template_vars['filepath'] = url
template_vars['yamlpath'] = args.yamlpath

set_entity_value(args.match_entity, template_vars)
set_check_existence_value(args.check_existence, template_vars)

if args.match:
value_dict = dict()
value_dict['value'] = args.match
set_operation_value(args.regex, value_dict)

template_vars['values'] = [value_dict]
else:
template_vars['xccdf_variable'] = args.variable

save_rule(rule_yaml_path, rule_yaml)

return 0


Expand Down Expand Up @@ -425,7 +427,7 @@ def testFunc(args):

def main():
parser = argparse.ArgumentParser(
prog="add_platform_rule.py",
prog="add_kubernetes_rule.py",
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent(PROG_DESC))
subparser = parser.add_subparsers(
Expand Down Expand Up @@ -475,6 +477,8 @@ def main():
'--check-existence', help='check_existence` value for the `yamlfilecontent_test`.')
platform_parser.add_argument(
'--negate', default=False, action="store_true", help='negate the given matching criteria (does NOT match). Default is false.')
platform_parser.add_argument(
'--jqfilter', default="", help='A JQ filter to select the data passed down for OVAL evaluation.')
platform_parser.set_defaults(func=createPlatformRuleFunc)

node_parser = type_parser.add_parser('node', help='Creates a Node rule', parents=[common_rule_args])
Expand Down

0 comments on commit bd33b07

Please sign in to comment.