Skip to content

Commit

Permalink
Rewrite the pathmap.Tree
Browse files Browse the repository at this point in the history
This mostly rewrites the `Tree`, making the following changes and optimizations:
- Uses a real `Node` struct with children and terminals, instead of abusing special keys for it.
- Avoids constructing needless non-terminal strings for all intermediate nodes.
- Constructs the tree directly iteratively, instead of creating a parallel tree and merging recursively.
- Switches from recursion to iteration for `_drill`. It should be possible to also avoid recursion in lookup, but with a bit more effort.

This should primarily improve construction performance and improve memory usage, which was the primary pain points with the previous implementation.
  • Loading branch information
Swatinem committed Aug 23, 2024
1 parent 94280e1 commit 98f6356
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 260 deletions.
209 changes: 74 additions & 135 deletions helpers/pathmap.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import collections
import operator
from dataclasses import dataclass, field
from difflib import SequenceMatcher
from os.path import relpath
from typing import Sequence


def _clean_path(path):
Expand All @@ -28,130 +28,118 @@ def _check_ancestors(path, match, ancestors):
return ml.endswith("/".join(pl.split("/")[(ancestors + 1) * -1 :]))


def _get_best_match(path: str, possibilities: list[str]) -> str:
"""
Given a `path`, return the most similar one out of `possibilities`.
"""

best_match = (-1, "")
for possibility in possibilities:
match = SequenceMatcher(None, path, possibility).ratio()
if match > best_match[0]:
best_match = (match, possibility)

return best_match[1]


@dataclass
class Node:
terminals: list[str] = field(default_factory=list)
"""
A list of paths terminating in this node.
"""

children: dict[str, "Node"] = field(default_factory=dict)
"""
Child nodes, keyed by path component.
"""


class Tree:
def __init__(self, *args, **kwargs):
self.instance = {}
def __init__(self, paths: Sequence[str]):
self.root = Node()
for path in paths:
self.insert(path)

def insert(self, path: str):
# the path components, in reverse order
components = reversed(path.split("/"))

# Sequence end indicator
self._END = "\\*__ends__*//"
node = self.root
for component in components:
component = component.lower()
node = node.children.setdefault(component, Node())

# Original value indicator
self._ORIG = "\\*__orig__*//"
node.terminals.append(path)

def resolve_path(self, path: str, ancestors=None):
path = _clean_path(path)

new_path = self.lookup(path, ancestors)

if new_path:
if ancestors and not _check_ancestors(path, new_path, ancestors):
# path ancestor count is not valud
# path ancestor count is not valid
return None

return new_path

# path was not resolved
return None

def _list_to_nested_dict(self, lis):
"""
Turns a list into a nested dict
E.g.:
['a','b','c'] => { 'c' : { 'b' : { 'a' : {} } } }
"""
d = {}
for i in range(0, len(lis)):
d[self._END] = True if i == 0 else False
d[self._ORIG] = ["/".join(lis[i:])]
d = {lis[i].lower(): d}
return d

def _get_best_match(self, path, possibilities):
"""
Given a path find how similar it is to all paths in possibilities
:str: path - A path part E.g.: a/b.py => a
:list: possibilities - Collected possibilities
"""

# Map out similarity of possible paths with the path being looked up
similarity = list(
map(lambda x: SequenceMatcher(None, path, x).ratio(), possibilities)
)

# Get the index, value of the most similar path
index, value = max(enumerate(similarity), key=operator.itemgetter(1))

return possibilities[index]

def _drill(self, d, results):
def _drill(self, node: Node) -> list[str] | None:
"""
Drill down a branch of a tree.
Collects results until a ._END is reached.
:returns - A list containing a possible path or None
"Drill down" a straight branch of a tree, returning the first terminal.
"""
root_keys = [x for x in d.keys() if x != self._ORIG and x != self._END]
while len(node.children) == 1:
node = next(iter(node.children.values()))
if len(node.terminals):
return node.terminals

if len(root_keys) > 1 or not root_keys:
return None

root_key = root_keys[0]
root = d.get(root_key)

if root.get(self._END):
return root.get(self._ORIG)
else:
return self._drill(root, results)
return None

def _recursive_lookup(self, d, lis, results, i=0, end=False, match=False):
def _recursive_lookup(
self,
node: Node,
components: list[str],
results: list[str],
i=0,
end=False,
match=False,
):
"""
Performs a lookup in tree recursively
:dict: d - tree branch
:list: lis - list of strings to search for
:list: results - Collected hit results
:int: i - Index of lis
:bool: end - Indicates if last lookup was the end of a sequence
:bool: match - Indicates if filename has any match in tree
:returns a list of hit results if path is found in the tree
"""
key = None

if i < len(lis):
key = lis[i].lower()

root = d.get(key)
if root:
if root.get(self._END):
results = root.get(self._ORIG)
child_node = (
node.children.get(components[i].lower()) if i < len(components) else None
)
if child_node:
is_end = len(child_node.terminals) > 0
if is_end:
results = child_node.terminals
return self._recursive_lookup(
root, lis, results, i + 1, root.get(self._END), True
child_node, components, results, i + 1, is_end, True
)
else:
if not end and match:
next_path = self._drill(d, results)
next_path = self._drill(node)
if next_path:
results.extend(next_path)
return results

def lookup(self, path, ancestors=None):
def lookup(self, path: str, ancestors=None) -> str | None:
"""
Lookup a path in the tree
:str: path - The path to search for
:returns The closest matching path in the tree if present else None
Lookup a path in the tree, returning the closest matching path
in the tree if found.
"""
path_hit = None
path_split = list(reversed(path.split("/")))
results = self._recursive_lookup(self.instance, path_split, [])

components = list(reversed(path.split("/")))
results = self._recursive_lookup(self.root, components, [])
if not results:
return None

if len(results) == 1:
path_hit = results[0]
else:
Expand All @@ -160,54 +148,5 @@ def lookup(self, path, ancestors=None):
closest_length = min(path_lengths, key=lambda x: abs(x - ancestors))
path_hit = next(x for x in results if len(x) == closest_length)
else:
path_hit = self._get_best_match(path, list(reversed(results)))

path_hit = _get_best_match(path, list(reversed(results)))
return path_hit

def update(self, d, u):
"""
Update a dictionary
:dict: d - Dictionary being updated
:dict: u - Dictionary being merged
"""
for k, v in u.items():
if isinstance(v, collections.abc.Mapping):
r = self.update(d.get(k, {}), v)
d[k] = r
else:
if k == self._END and d.get(k) is True:
pass
elif k == self._ORIG and d.get(k) and u.get(k):
if d[k] != u[k]:
d[k] = d[k] + u[k]
else:
d[k] = u[k]
return d

def insert(self, path):
"""
Insert a path into the tree
:str: path - The path to insert
"""

path_split = path.split("/")
root_key = path_split[-1].lower()
root = self.instance.get(root_key)

if not root:
u = self._list_to_nested_dict(path_split)
self.instance.update(u)
else:
u = self._list_to_nested_dict(path_split)
self.instance = self.update(self.instance, u)

def construct_tree(self, toc):
"""
Constructs a tree
:list: toc - The table of contents
"""

for path in toc:
self.insert(path)
Loading

0 comments on commit 98f6356

Please sign in to comment.