-
Notifications
You must be signed in to change notification settings - Fork 70
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor delete_empty_folder using treelib and Tree approach
- Loading branch information
Showing
10 changed files
with
231 additions
and
114 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
from artifactory_cleanup.artifactorycleanup import ArtifactoryCleanup # noqa | ||
from artifactory_cleanup.rules.base import CleanupPolicy # noqa |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,102 +1,164 @@ | ||
from collections import defaultdict, deque | ||
from typing import Dict, List | ||
from collections import defaultdict | ||
from typing import Dict, List, Tuple, Optional | ||
|
||
from treelib import Node, Tree | ||
|
||
def artifacts_list_to_tree(list_of_artifacts: List): | ||
|
||
def is_repository(data): | ||
return data["path"] == "." and data["name"] == "." | ||
|
||
|
||
def get_fullpath(repo, path, name, **kwargs): | ||
""" | ||
Get path from raw Artifactory's data | ||
""" | ||
# root - repo | ||
if name == ".": | ||
return repo | ||
# folder under root | ||
if path == ".": | ||
return f"{repo}/{name}" | ||
return f"{repo}/{path}/{name}" | ||
|
||
|
||
def split_fullpath(fullpath: str) -> Tuple[str, Optional[str]]: | ||
""" | ||
Split path into (name, parent) | ||
>>> split_fullpath("repo/folder/filename.py") | ||
('filename.py', 'repo/folder') | ||
>>> split_fullpath("repo") | ||
('repo', None) | ||
""" | ||
parts = fullpath.rsplit("/", maxsplit=1) | ||
if len(parts) == 1: | ||
return parts[0], None | ||
return parts[1], parts[0] | ||
|
||
|
||
def parse_fullpath(fullpath: str) -> Tuple[str, str, str]: | ||
""" | ||
Convert a list of artifacts to a dict representing the directory tree. | ||
Each entry name corresponds to the folder or file name. And has two subnodes 'children' and | ||
'data'. 'children' is recursively again the list of files/folder within that folder. | ||
'data' contains the artifact data returned by artifactory. | ||
Parse full path to (repo, path, name) | ||
>>> parse_fullpath("repo/path/name.py") | ||
('repo', 'path', 'name.py') | ||
Major idea based on https://stackoverflow.com/a/58917078 | ||
>>> parse_fullpath("repo/path") | ||
('repo', '.', 'path') | ||
>>> parse_fullpath("repo") | ||
('repo', '.', '.') | ||
""" | ||
if "/" not in fullpath: | ||
return fullpath, ".", "." | ||
name, repo_path = split_fullpath(fullpath) | ||
|
||
if "/" not in repo_path: | ||
return repo_path, ".", name | ||
|
||
repo, path = repo_path.split("/", maxsplit=1) | ||
return repo, path, name | ||
|
||
|
||
def nested_dict(): | ||
class ArtifactNode(Node): | ||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
self.files = 0 | ||
|
||
def is_file(self): | ||
if not self.data: | ||
return False | ||
return self.data["type"] == "file" | ||
|
||
def get_data(self) -> Dict: | ||
if self.data: | ||
return self.data | ||
|
||
repo, path, name = parse_fullpath(self.identifier) | ||
data = dict(repo=repo, path=path, name=name) | ||
if is_repository(data): | ||
raise ValueError("Can not remove repository root") | ||
return data | ||
|
||
|
||
class RepositoryTree(Tree): | ||
def parse_artifact(self, data): | ||
""" | ||
Creates a default dictionary where each value is another default dictionary. | ||
Parse Artifactory's raw data and add artifact to the tree | ||
""" | ||
return defaultdict(nested_dict) | ||
fullpath = get_fullpath(**data) | ||
self.add_artifact(fullpath, data) | ||
|
||
def add_artifact(self, fullpath, data): | ||
existed = self.get_node(fullpath) | ||
if existed and existed.data is None: | ||
# We met it before, but with no data | ||
existed.data = data | ||
return existed | ||
|
||
def default_to_regular(d): | ||
name, parent = split_fullpath(fullpath) | ||
self.upsert_path(parent) | ||
artifact = ArtifactNode(tag=name, identifier=fullpath, data=data) | ||
self.add_node(node=artifact, parent=parent) | ||
return artifact | ||
|
||
def upsert_path(self, fullpath): | ||
""" | ||
Converts defaultdicts of defaultdicts to dict of dicts. | ||
Create path to the folder if not exist | ||
""" | ||
if isinstance(d, defaultdict): | ||
d = {k: default_to_regular(v) for k, v in d.items()} | ||
return d | ||
|
||
new_path_dict = nested_dict() | ||
for artifact in list_of_artifacts: | ||
parts = artifact["path"].split("/") | ||
if parts: | ||
marcher = new_path_dict | ||
for key in parts: | ||
# We need the repo for the root level folders. They are not in the | ||
# artifacts list | ||
marcher[key]["data"] = {"repo": artifact["repo"]} | ||
marcher = marcher[key]["children"] | ||
marcher[artifact["name"]]["data"] = artifact | ||
artifact_tree = default_to_regular(new_path_dict) | ||
# Artifactory also returns the directory itself. We need to remove it from the list | ||
# since that tree branch has no children assigned | ||
if "." in artifact_tree: | ||
del artifact_tree["."] | ||
return artifact_tree | ||
|
||
|
||
def folder_artifacts_without_children(artifacts_tree: Dict, path=""): | ||
""" | ||
Takes the artifacts tree and returns the list of artifacts which are folders | ||
and do not have any children. | ||
if not fullpath: | ||
return | ||
|
||
If folder1 has only folder2 as a child, and folder2 is empty, the list only contains | ||
folder1. I.e., empty folders are also recursively propagated back. | ||
exists = self.contains(fullpath) | ||
if exists: | ||
return | ||
|
||
The input tree will be modified and empty folders will be deleted from the tree. | ||
self.add_artifact(fullpath, data=None) | ||
|
||
""" | ||
def count_files(self, nid=None) -> int: | ||
"""Count files inside the directory. DFS traversing""" | ||
nid = nid or self.root | ||
node: ArtifactNode = self.get_node(nid) | ||
if node.is_file(): | ||
node.files = 1 | ||
return node.files | ||
|
||
# use a deque instead of a list. it's faster to add elements there | ||
empty_folder_artifacts = deque() | ||
children: List[ArtifactNode] = self.children(nid) | ||
for child in children: | ||
self.count_files(child.identifier) | ||
files = sum(child.files for child in children) | ||
node.files = files | ||
return node.files | ||
|
||
def _add_to_del_list(name: str): | ||
""" | ||
Add element with name to empty folder list and remove it from the tree | ||
""" | ||
empty_folder_artifacts.append(artifacts_tree[name]["data"]) | ||
# Also delete the item from the children list to recursively delete folders | ||
# upwards | ||
del artifacts_tree[name] | ||
|
||
# Use list(item.keys()) here so that we can delete items while iterating over the | ||
# dict. | ||
for artifact_name in list(artifacts_tree.keys()): | ||
tree_entry = artifacts_tree[artifact_name] | ||
if "type" in tree_entry["data"] and tree_entry["data"]["type"] == "file": | ||
continue | ||
if not "path" in tree_entry["data"]: | ||
# Set the path and name for root folders which were not explicitly in the | ||
# artifacts list | ||
tree_entry["data"]["path"] = path | ||
tree_entry["data"]["name"] = artifact_name | ||
if not "children" in tree_entry or len(tree_entry["children"]) == 0: | ||
# This an empty folder | ||
_add_to_del_list(artifact_name) | ||
else: | ||
artifacts = folder_artifacts_without_children( | ||
tree_entry["children"], | ||
path=path + "/" + artifact_name if len(path) > 0 else artifact_name, | ||
) | ||
# Additional check needed here because the recursive call may | ||
# delete additional children. | ||
# And here we want to check again if all children would be deleted. | ||
# Then also delete this. | ||
if len(tree_entry["children"]) == 0: | ||
# just delete the whole folder since all children are empty | ||
_add_to_del_list(artifact_name) | ||
else: | ||
# add all empty folder children to the list | ||
empty_folder_artifacts.extend(artifacts) | ||
|
||
return empty_folder_artifacts | ||
def get_highest_empty_folders(self, nid=None) -> List[ArtifactNode]: | ||
"""Get the highest empty folders for the repository. DFS traversing""" | ||
nid = nid or self.root | ||
node: ArtifactNode = self.get_node(nid) | ||
if not node.is_root() and node.files == 0: | ||
# Empty folder that contains only empty folders | ||
if all(child.files == 0 for child in self.children(nid)): | ||
return [node] | ||
|
||
folders = [] | ||
for child in self.children(nid): | ||
_folder = self.get_highest_empty_folders(nid=child.identifier) | ||
folders.extend(_folder) | ||
return folders | ||
|
||
|
||
def build_repositories(artifacts: List[Dict]) -> List[RepositoryTree]: | ||
"""Build tree-like repository objects from raw Artifactory data""" | ||
repositories = defaultdict(RepositoryTree) | ||
for data in artifacts: | ||
repo = repositories[data["repo"]] | ||
repo.parse_artifact(data) | ||
return list(repositories.values()) | ||
|
||
|
||
def get_empty_folders(repositories: List[RepositoryTree]) -> List[Dict]: | ||
folders = [] | ||
for repo in repositories: | ||
repo.count_files() | ||
for repo in repositories: | ||
_folders = repo.get_highest_empty_folders() | ||
folders.extend(_folders) | ||
return [folder.get_data() for folder in folders] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
[pytest] | ||
addopts = --doctest-modules |
Oops, something went wrong.