Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: case-insensitive bug for items in brackets for elastic_ecs. #1606

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -335,15 +335,15 @@ def _unfold_case_insensitive_regex(regex_pattern):

# xsb: xs inside bracket
xsb = xs[1::2]
xsb[ci_index//2:] = ["[" + _unfold_ci_chars(x, True) + "]" for x in xsb[ci_index//2:]]
xsb[ci_index//2:] = [_unfold_ci_chars_in_bracket(x) for x in xsb[ci_index//2:]]
xs[1::2] = xsb

# xsob: xs outside bracket
xsob = xs[0::2]
xsob_s_h, *xsob_s_t = xsob[ci_index//2].split("(?i)")
xsob_s_t = [_unfold_ci_chars(x, False) for x in xsob_s_t]
xsob_s_t = [_unfold_plaintext_ci_chars(x) for x in xsob_s_t]
xsob[ci_index//2] = "".join([xsob_s_h] + xsob_s_t)
xsob[ci_index//2+1:] = [_unfold_ci_chars(x.replace("(?i)", ""), False) for x in xsob[ci_index//2+1:]]
xsob[ci_index//2+1:] = [_unfold_plaintext_ci_chars(x.replace("(?i)", "")) for x in xsob[ci_index//2+1:]]
xs[0::2] = xsob

p_unfolded = "".join(xs)
Expand All @@ -361,19 +361,47 @@ def _unfold_case_insensitive_regex(regex_pattern):
return regex_pattern


def _unfold_ci_chars(regex_pattern_segment, if_set):
# if_set: if all chars are in the square bracket of regex
def char_mapper(c):
if if_set:
return c.lower() + c.upper()
def _unfold_plaintext_ci_chars(regex_pattern_segment):
return "".join([f"[{x.lower()}{x.upper()}]" if x.isascii() and x.isalpha() else x for x in regex_pattern_segment])


def _unfold_ci_chars_in_bracket(regex_pattern_in_bracket):
# split segments
segs = [""]
# effective i that knows skipped indexes/chars
ie = 0
for i, x in enumerate(regex_pattern_in_bracket):
if i < ie:
continue
if i < len(regex_pattern_in_bracket)-2:
if x.isascii():
ahead1 = regex_pattern_in_bracket[i+1]
ahead2 = regex_pattern_in_bracket[i+2]
if ahead1 == "-" and ahead2.isascii():
segs.append(regex_pattern_in_bracket[i:i+3])
segs.append("")
ie = i+3
else:
segs[-1] = segs[-1] + x
else:
segs[-1] = segs[-1] + x
else:
segs.append(regex_pattern_in_bracket[i:len(regex_pattern_in_bracket)])
break
segs_new = []
for seg in segs:
if len(seg) == 3 and seg[1] == "-" and seg[0].isascii() and seg[0].isalpha() and seg[2].isascii() and seg[2].isalpha():
lower = f"{seg[0].lower()}-{seg[2].lower()}"
if lower not in segs_new:
segs_new.append(lower)
upper = f"{seg[0].upper()}-{seg[2].upper()}"
if upper not in segs_new:
segs_new.append(upper)
else:
return f"[{c.lower()}{c.upper()}]"
xs = list(regex_pattern_segment)
s = "".join([char_mapper(x) if x.isascii() and x.isalpha() else x for x in xs])
if if_set:
# dedup for items inside square bracket
s = "".join(sorted(set(s)))
return s
new = "".join([x.lower()+x.upper() if x.isascii() and x.isalpha() else x for x in seg])
if new not in segs_new:
segs_new.append(new)
return "[" + "".join(segs_new) + "]"


def translate_pattern(pattern: Pattern, data_model_mapping, options):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from stix_shifter.stix_translation import stix_translation
from stix_shifter_utils.utils.error_response import ErrorCode
from stix_shifter_modules.elastic_ecs.stix_translation.query_constructor import (
_unfold_ci_chars,
_unfold_plaintext_ci_chars,
_unfold_ci_chars_in_bracket,
_unfold_case_insensitive_regex,
)
import unittest
Expand Down Expand Up @@ -41,26 +42,35 @@ def _remove_timestamp_from_query(queries):
class TestStixtoQuery(unittest.TestCase, object):

def test_case_insensitive_unfold_chars(self):
input_output_pairs = [ ("a", False, "[aA]")
, ("ab", False, "[aA][bB]")
, ("ab7#*c((D))", False, "[aA][bB]7#*[cC](([dD]))")
, ("aba", True, "ABab")
, ("ab7#BBeE", True, "#7ABEabe")
input_output_pairs = [ ("a", "[aA]")
, ("ab", "[aA][bB]")
, ("ab7#*c((D))", "[aA][bB]7#*[cC](([dD]))")
]
for (x,y,z) in input_output_pairs:
assert z == _unfold_ci_chars(x, y)

for (x,y) in input_output_pairs:
assert y == _unfold_plaintext_ci_chars(x)

def test_unfold_ci_chars_in_bracket(self):
iopairs = [ ("abD", "[aAbBdD]")
, ("a-z0-9", "[a-zA-Z0-9]")
, ("-ef-z", "[-eEf-zF-Z]")
, ("ab-", "[aAbB-]")
, ("a-zA-Z0-9", "[a-zA-Z0-9]")
]
for (x,y) in iopairs:
assert y == _unfold_ci_chars_in_bracket(x)

def test_case_insensitive_unfold_regex(self):
iopairs = [ ("http://z[abc]83m li", "http://z[abc]83m li")
, ("(?i)virus", "[vV][iI][rR][uU][sS]")
, ("(?i)virus[ s]", "[vV][iI][rR][uU][sS][ Ss]")
, ("(?i)virus[ s] bin [c3b]", "[vV][iI][rR][uU][sS][ Ss] [bB][iI][nN] [3BCbc]")
, ("(?i)virus[ s]", "[vV][iI][rR][uU][sS][ sS]")
, ("(?i)virus[ s] bin [c3b]", "[vV][iI][rR][uU][sS][ sS] [bB][iI][nN] [cC3bB]")
, (r"(?i)virus\[ s\]", r"[vV][iI][rR][uU][sS]\[ [sS]\]")
, (r"(?i)virus\\[ s\\]", r"[vV][iI][rR][uU][sS]\\[ Ss\\]")
, (r"(?i)virus\\[ s\\]", r"[vV][iI][rR][uU][sS]\\[ sS\\]")
, ("(?i)http://z83m li", "[hH][tT][tT][pP]://[zZ]83[mM] [lL][iI]")
, ("(?i)http://z[abc]83m li", "[hH][tT][tT][pP]://[zZ][ABCabc]83[mM] [lL][iI]")
, ("http://(?i)z[abc]83m li", "http://[zZ][ABCabc]83[mM] [lL][iI]")
, ("(?i)http://z[abc]83m li", "[hH][tT][tT][pP]://[zZ][aAbBcC]83[mM] [lL][iI]")
, ("http://(?i)z[abc]83m li", "http://[zZ][aAbBcC]83[mM] [lL][iI]")
, ("http://(?i)z[a-z]83m li", "http://[zZ][a-zA-Z]83[mM] [lL][iI]")
, ("http://(?i)z[a-z0-9A-Z]83m li", "http://[zZ][a-zA-Z0-9]83[mM] [lL][iI]")
]
for (x,y) in iopairs:
assert y == _unfold_case_insensitive_regex(x)
Expand Down