Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/code-analyzer-flow-engine/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FlowScanner/build/**/**
FlowScanner/**/__pycache__/**
FlowScanner/flowtest.egg-info/**
FlowScanner/*.egg-info/**
205 changes: 160 additions & 45 deletions packages/code-analyzer-flow-engine/FlowScanner/flow_parser/parse.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import flow_scanner.query_manager
import flow_scanner.util as util
import flow_scanner.version as version
import queries.default_query as default_query
from flow_scanner.query_manager import validate_qry_list, get_all_optional_queries

from flow_scanner.query_manager import validate_qry_list, get_all_queries
from flow_scanner.util import make_id
from public.data_obj import PresetEncoder
from public.parse_utils import quick_validate
Expand Down Expand Up @@ -134,18 +134,28 @@ def get_tokens_from_csv_file(file_path: str) -> list[str]:

return get_validated_queries(unsplit(data))

def print_preset_list():
map_ = flow_scanner.query_manager.PRESETS
preset_info = {}
for p in map_:
preset_info[p] = [x[1] for x in map_[p]]

res_json = json.dumps(preset_info, indent=4)
# print to stdout so user can redirect or examine
print(res_json)

def get_validated_queries(data: list[str]) -> list[str]:
cleaned_data = de_kebab_list(clean_str_list(data))
validation = validate_qry_list(cleaned_data)
if validation is True:
return cleaned_data
valid, found, missed, duplicates = validate_qry_list(cleaned_data)
if valid:
return found
else:
if len(validation) == 1:
raise argparse.ArgumentTypeError("Unrecognized query requested: %s" % validation[0])
else:
raise argparse.ArgumentTypeError("Unrecognized queries requested: %s" %
",".join(validation))
for issue, data in [('Duplicate', duplicates), ('Unrecognized', missed)]:
if data is not None and len(data) == 1:
raise argparse.ArgumentTypeError(f"{issue} query requested: %s" % data[0])
else:
raise argparse.ArgumentTypeError(f"{issue} queries requested: %s" %
",".join(data))


def unsplit(msg: str) -> list[str]:
Expand Down Expand Up @@ -232,8 +242,8 @@ def parse_args(my_args: list[str], default: str = None) -> argparse.Namespace:
version='%(prog)s ' + version.__version__)
parser.add_argument("-p", "--preset_info", action='store_true',
help="return information on default preset and exit")
parser.add_argument("--optional_query_info", action='store_true',
help="display which optional queries are supported and exit")
parser.add_argument("--query_info", action='store_true',
help="display which queries are supported and exit")

"""
Options for which flows to scan
Expand All @@ -255,10 +265,11 @@ def parse_args(my_args: list[str], default: str = None) -> argparse.Namespace:
Option for specifying the workspace path list
"""

parser.add_argument("--workspace", help=("path of file containing csv separated lists of "
"flows in workspace that may be resolved as subflow targets. "
"If empty this defaults to flows target csv file, the specified directory, "
"or contents of flow directory or flows listed in commandline."),
parser.add_argument("--workspace",
help=("path of file containing csv separated lists of "
"flows in workspace that may be resolved as subflow targets. "
"If empty this defaults to flows target csv file, the specified directory, "
"or contents of flow directory or flows listed in commandline."),
type=check_file_exists)

"""
Expand Down Expand Up @@ -310,12 +321,12 @@ def parse_args(my_args: list[str], default: str = None) -> argparse.Namespace:
parser.add_argument("--query_path", required=False, help="path of custom query python file")
parser.add_argument("--query_class", required=False, help="name of class to instantiate in query_path")
parser.add_argument("--preset", required=False, help="name of preset to use (consumed by query code)")
parser.add_argument("--optional_queries", required=False,
help="comma separated list of optional queries to execute in addition to the preset.")
parser.add_argument("--optional_queries_path", required=False,
help="path of file containing a comma separated list of optional queries to "
parser.add_argument("--queries", required=False,
help="comma separated list of queries to execute in addition to the preset.")
parser.add_argument("--queries_path", required=False,
help="path of file containing a comma separated list of queries to "
"execute in addition to the preset.", type=check_file_exists)
parser.add_argument("--all_optional", required=False, action='store_true', help=("run all optional queries. "
parser.add_argument("--all_queries", required=False, action='store_true', help=("run all queries. "
"WARNING: this is noisy."))
parser.add_argument("--debug_flow", required=False, help=("For expert use only. Run a debug flow with"
"the supplied parameter."))
Expand All @@ -341,24 +352,33 @@ def main(argv: list[str] = None) -> str | None:

args = parse_args(argv, default=default)

if args.preset is not None:
# check preset
if args.preset not in flow_scanner.query_manager.PRESETS:
raise argparse.ArgumentTypeError(f"Invalid preset requested: {args.preset}")

# check if the user wants only a description of the default queries
if args.preset_info is True:
# if user has specified a preset, use that or None
preset_name = args.preset
preset = default_query.build_preset(preset_name)
queries = preset.queries
query_info = [x.to_dict() for x in list(queries)]
sorted_query_info = sorted(query_info, key=lambda x: x['query_id'])
desc = json.dumps(sorted_query_info, indent=4, cls=PresetEncoder)
# print to stdout so user can redirect or examine
print(desc)

preset_name = args.preset or "default"
if preset_name:
preset = flow_scanner.query_manager.build_preset_for_name(preset_name)
if preset is None:
print(f"No preset found with name: {preset_name}")
print_preset_list()
return
queries = preset.queries
query_info = [x.to_dict() for x in list(queries)]
sorted_query_info = sorted(query_info, key=lambda x: x['query_id'])
desc = json.dumps(sorted_query_info, indent=4, cls=PresetEncoder)
# print to stdout so user can redirect or examine
print(desc)
return

# Check if user wants list of optional queries
if args.optional_query_info is True:
desc = flow_scanner.query_manager.get_all_optional_descriptions()
print(desc)
# Check if user wants list of queries
if args.query_info is True:
desc = flow_scanner.query_manager.get_query_descriptions()
print(json.dumps(desc, indent=4, cls=PresetEncoder))
return

# logging
Expand All @@ -381,18 +401,18 @@ def main(argv: list[str] = None) -> str | None:


"""
Handle Optional Queries
Handle Queries
"""
if args.all_optional is True:
optional_qry_l = get_all_optional_queries()
if args.all_queries is True:
qry_l = get_all_queries()

elif args.optional_queries_path is not None:
optional_qry_l = get_tokens_from_csv_file(args.optional_queries_path)
elif args.queries_path is not None:
qry_l = get_tokens_from_csv_file(args.queries_path)

elif args.optional_queries is not None:
optional_qry_l = get_validated_queries(unsplit(args.optional_queries))
elif args.queries is not None:
qry_l = get_validated_queries(unsplit(args.queries))
else:
optional_qry_l = None
qry_l = None

"""
Handle chunking
Expand Down Expand Up @@ -459,7 +479,7 @@ def main(argv: list[str] = None) -> str | None:
query_module_path=args.query_path,
query_class_name=args.query_class,
query_preset=args.preset,
optional_queries=optional_qry_l,
queries=qry_l,
crawl_dir=args.crawl_dir,
resolver=resolver)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,19 +320,20 @@ def load_crawl_step(self, crawler: Crawler, crawl_step: CrawlStep = None) -> Cra
old_history = self.current_crawl_step.visitor.history
new_history = next_cs.visitor.history

if old_history == ():
if old_history == () or old_history == ('*',):
# we are on the first branch, so no backtracking
old_map = self.__influence_map[self.current_crawl_step]

elif len(new_history) >= len(old_history) and new_history[0:len(old_history)] == old_history:
elif len(new_history) == len(old_history) + 1 and new_history[0:len(old_history)] == old_history:
# the new branch is a continuation of old branch so no backtracking
old_map = self.__influence_map[self.current_crawl_step]

else:
# the new history is a different branch, and we need to backtrack
# the new history is a different branch, and we need to either backtrack or jump ahead
# get_last_ancestor returns the last time we visited the element right before the current element
old_cs = crawler.get_last_ancestor(next_cs)
if old_cs is None:
# no predecessor, so we use default
# use default map
old_map = self.__default_map
else:
old_map = self.__influence_map[old_cs]
Expand Down
Loading
Loading