Skip to content

Commit

Permalink
resolve case insensitive regex opencybersecurityalliance#1569
Browse files Browse the repository at this point in the history
  • Loading branch information
subbyte committed Sep 5, 2023
1 parent 6e5feb0 commit 44cc61a
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def _format_match(value) -> str:
value = re.sub(r"([^\\])\\S", r"\1[^ \t\n\r\f\v]", value)
value = re.sub(r"([^\\])\\w", r"\1[a-zA-Z0-9_]", value)
value = re.sub(r"([^\\])\\W", r"\1[^a-zA-Z0-9_]", value)
value = _unfold_case_insensitive_regex(value)
return '/{}/'.format(value)

@staticmethod
Expand Down Expand Up @@ -304,6 +305,51 @@ def _format_translated_queries(query_array):
return formatted_queries


def _unfold_case_insensitive_regex(regex_pattern):
# this function should be executed after \s unfolding

if "(?i)" in regex_pattern:

escaped_left_bracket_symbol = "===%####===%####"
escaped_right_bracket_symbol = "====%###====%###"
escaped_backslash = "==##==###%###==##=="
synthetic_splitter = "==%%##%%=="

p = regex_pattern
p = p.replace(r"\\", escaped_backslash)
p = p.replace(r"\[", escaped_left_bracket_symbol)
p = p.replace(r"\]", escaped_right_bracket_symbol)

if p.count("[") != p.count("]"):
raise RuntimeError(f"regex /{regex_pattern}/ has odd number of brackets.")
else:
xs = re.split(r"[\[\]]", p)
xs_even = [f"[{x}]" for x in xs[1::2]]
xs[1::2] = xs_even

xs_odd = xs[0::2]
p_odd = synthetic_splitter.join(xs_odd)
ys_odd = p_odd.split("(?i)")
p_odd_unfolded = ys_odd[0] + _unfold_case_insensitive_char("".join(ys_odd[1:]))
xs_odd_unfolded = p_odd_unfolded.split(synthetic_splitter)
xs[0::2] = xs_odd_unfolded

p_unfolded = "".join(xs)
p_unfolded = p_unfolded.replace(escaped_right_bracket_symbol, r"\]")
p_unfolded = p_unfolded.replace(escaped_left_bracket_symbol, r"\[")
p_unfolded = p_unfolded.replace(escaped_backslash, r"\\")
return p_unfolded

else:
return regex_pattern


def _unfold_case_insensitive_char(ascii_regex_str):
xs = list(ascii_regex_str)
ys = [f"[{x.lower()}{x.upper()}]" if x.isalpha() else x for x in xs]
return "".join(ys)


def translate_pattern(pattern: Pattern, data_model_mapping, options):
# Added size parameter in tranmission module
#result_limit = options['result_limit']
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from stix_shifter.stix_translation import stix_translation
from stix_shifter_utils.utils.error_response import ErrorCode
from stix_shifter_modules.elastic_ecs.stix_translation.query_constructor import (
_unfold_case_insensitive_char,
_unfold_case_insensitive_regex,
)
import unittest
import datetime
import re
Expand Down Expand Up @@ -36,6 +40,28 @@ def _remove_timestamp_from_query(queries):

class TestStixtoQuery(unittest.TestCase, object):

def test_case_insensitive_unfold_char(self):
input_output_pairs = [ ("a", "[aA]")
, ("ab", "[aA][bB]")
, ("ab7#*c((D))", "[aA][bB]7#*[cC](([dD]))")
]
for (x,y) in input_output_pairs:
assert y == _unfold_case_insensitive_char(x)

def test_case_insensitive_unfold_regex(self):
iopairs = [ ("http://z[abc]83m li", "http://z[abc]83m li")
, ("(?i)virus", "[vV][iI][rR][uU][sS]")
, ("(?i)virus[ s]", "[vV][iI][rR][uU][sS][ s]")
, ("(?i)virus[ s] bin [c3b]", "[vV][iI][rR][uU][sS][ s] [bB][iI][nN] [c3b]")
, (r"(?i)virus\[ s\]", r"[vV][iI][rR][uU][sS]\[ [sS]\]")
, (r"(?i)virus\\[ s\\]", r"[vV][iI][rR][uU][sS]\\[ s\\]")
, ("(?i)http://z83m li", "[hH][tT][tT][pP]://[zZ]83[mM] [lL][iI]")
, ("(?i)http://z[abc]83m li", "[hH][tT][tT][pP]://[zZ][abc]83[mM] [lL][iI]")
, ("http://(?i)z[abc]83m li", "http://[zZ][abc]83[mM] [lL][iI]")
]
for (x,y) in iopairs:
assert y == _unfold_case_insensitive_regex(x)

def test_ipv4_query(self):
stix_pattern = "[ipv4-addr:value = '192.168.122.83' OR ipv4-addr:value = '192.168.122.84']"
translated_query = translation.translate('elastic_ecs', 'query', '{}', stix_pattern)
Expand Down Expand Up @@ -278,6 +304,13 @@ def test_match_operator_escaped(self):
test_query = ['(process.name : /cmd\\.exe/ OR process.parent.name : /cmd\\.exe/)']
_test_query_assertions(translated_query, test_query)

def test_match_operator_case_sensitive(self):
stix_pattern = r"[process:name MATCHES '(?i)virus\\.exe']"
translated_query = translation.translate('elastic_ecs', 'query', '{}', stix_pattern)
translated_query['queries'] = _remove_timestamp_from_query(translated_query['queries'])
test_query = ['(process.name : /[vV][iI][rR][uU][sS]\\.[eE][xX][eE]/ OR process.parent.name : /[vV][iI][rR][uU][sS]\\.[eE][xX][eE]/)']
_test_query_assertions(translated_query, test_query)

def test_match_operator_with_backslash(self):
# STIX uses backslash as escape, so to match a literal . in RE you need double-backslash
stix_pattern = r"[process:name MATCHES '^cmd\\.exe .*']"
Expand Down

0 comments on commit 44cc61a

Please sign in to comment.