From 23c186fb27ec2db7aba627c08727cea71ec9f3f8 Mon Sep 17 00:00:00 2001 From: bcaller Date: Wed, 31 Oct 2018 17:57:24 +0000 Subject: [PATCH] Simplify trigger file for sink argument propagation This changes the schema of the trigger file. Previously there were too many options and it was confusing. My fault, sorry. This meant that `db.execute(query, **TAINT)` was marked as a vulnerability whereas `db.execute(text=query, **TAINT)` wasn't. Neither are vulnerabilities, so this gave a FALSE POSITIVE. Now we have `arg_dict` which is a dictionary of keyword to argument position. E.g. for `def f(a, b, *, c)` we can specify the arg_dict as: ``` {"a": 0, "b": 1, "c": null} ``` if we want them all to propagate or not propagate depending on the `unlisted_args_propagate` value. This way, we can more easily define db.execute as: ``` "execute(": { "unlisted_args_propagate": false, "arg_dict": { "text": 0 } }, ``` --- .../trigger_definitions_parser.py | 27 +++++++++++-------- pyt/vulnerabilities/vulnerabilities.py | 20 +++++++------- .../test_positions.pyt | 21 +++++---------- tests/vulnerabilities/vulnerabilities_test.py | 1 + 4 files changed, 33 insertions(+), 36 deletions(-) diff --git a/pyt/vulnerabilities/trigger_definitions_parser.py b/pyt/vulnerabilities/trigger_definitions_parser.py index ab737928..4bfd3a15 100644 --- a/pyt/vulnerabilities/trigger_definitions_parser.py +++ b/pyt/vulnerabilities/trigger_definitions_parser.py @@ -16,33 +16,38 @@ class Sink: def __init__( self, trigger, *, - unlisted_args_propagate=True, unlisted_kwargs_propagate=True, - arg_list=None, kwarg_list=None, - sanitisers=None + unlisted_args_propagate=True, + arg_dict=None, + sanitisers=None, ): self._trigger = trigger self.sanitisers = sanitisers or [] self.arg_list_propagates = not unlisted_args_propagate - self.kwarg_list_propagates = not unlisted_kwargs_propagate if trigger[-1] != '(': - if self.arg_list_propagates or self.kwarg_list_propagates or arg_list or kwarg_list: + if self.arg_list_propagates or arg_dict: raise ValueError("Propagation options specified, but trigger word isn't a function call") - self.arg_list = set(arg_list or ()) - self.kwarg_list = set(kwarg_list or ()) + arg_dict = {} if arg_dict is None else arg_dict + self.arg_position_to_kwarg = { + position: name for name, position in arg_dict.items() if position is not None + } + self.kwarg_list = set(arg_dict.keys()) def arg_propagates(self, index): - in_list = index in self.arg_list - return self.arg_list_propagates == in_list + kwarg = self.get_kwarg_from_position(index) + return self.kwarg_propagates(kwarg) def kwarg_propagates(self, keyword): in_list = keyword in self.kwarg_list - return self.kwarg_list_propagates == in_list + return self.arg_list_propagates == in_list + + def get_kwarg_from_position(self, index): + return self.arg_position_to_kwarg.get(index) @property def all_arguments_propagate_taint(self): - if self.arg_list or self.kwarg_list: + if self.kwarg_list: return False return True diff --git a/pyt/vulnerabilities/vulnerabilities.py b/pyt/vulnerabilities/vulnerabilities.py index 47986ed8..0daca2cd 100644 --- a/pyt/vulnerabilities/vulnerabilities.py +++ b/pyt/vulnerabilities/vulnerabilities.py @@ -243,29 +243,27 @@ def get_sink_args(cfg_node): def get_sink_args_which_propagate(sink, ast_node): sink_args_with_positions = CallVisitor.get_call_visit_results(sink.trigger.call, ast_node) sink_args = [] + kwargs_present = set() for i, vars in enumerate(sink_args_with_positions.args): - if sink.trigger.arg_propagates(i): + kwarg = sink.trigger.get_kwarg_from_position(i) + if kwarg: + kwargs_present.add(kwarg) + if sink.trigger.kwarg_propagates(kwarg): sink_args.extend(vars) - if ( - # Either any unspecified arg propagates - not sink.trigger.arg_list_propagates or - # or there are some propagating args which weren't passed positionally - any(1 for position in sink.trigger.arg_list if position >= len(sink_args_with_positions.args)) - ): - sink_args.extend(sink_args_with_positions.unknown_args) - for keyword, vars in sink_args_with_positions.kwargs.items(): + kwargs_present.add(keyword) if sink.trigger.kwarg_propagates(keyword): sink_args.extend(vars) if ( # Either any unspecified kwarg propagates - not sink.trigger.kwarg_list_propagates or + not sink.trigger.arg_list_propagates or # or there are some propagating kwargs which have not been passed by keyword - sink.trigger.kwarg_list - set(sink_args_with_positions.kwargs.keys()) + sink.trigger.kwarg_list - kwargs_present ): + sink_args.extend(sink_args_with_positions.unknown_args) sink_args.extend(sink_args_with_positions.unknown_kwargs) return sink_args diff --git a/pyt/vulnerability_definitions/test_positions.pyt b/pyt/vulnerability_definitions/test_positions.pyt index 48e276fe..ddbc20a8 100644 --- a/pyt/vulnerability_definitions/test_positions.pyt +++ b/pyt/vulnerability_definitions/test_positions.pyt @@ -7,22 +7,15 @@ "normal(": {}, "execute(": { "unlisted_args_propagate": false, - "arg_list": [ - 0 - ], - "unlisted_kwargs_propagate": false, - "kwarg_list": [ - "text" - ] + "arg_dict": { + "text": 0 + } }, "run(": { - "kwarg_list": [ - "non_propagating" - ], - "arg_list": [ - 2, - 3 - ] + "arg_dict": { + "non_propagating": 2, + "something_else": 3 + } } } } diff --git a/tests/vulnerabilities/vulnerabilities_test.py b/tests/vulnerabilities/vulnerabilities_test.py index 8f5e70f1..2846ab55 100644 --- a/tests/vulnerabilities/vulnerabilities_test.py +++ b/tests/vulnerabilities/vulnerabilities_test.py @@ -577,6 +577,7 @@ def check(fixture, vulnerable): 'execute(x, name=TAINT)', 'execute(x, *TAINT)', 'execute(text=x, **TAINT)', + 'execute(x, **TAINT)', 'dont_run(TAINT)', ) vuln_fixtures = (