Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Filesystem and Cache for Performance Enhancement #2

Merged
merged 2 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 69 additions & 28 deletions pgcs/custom_select.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import re
from functools import partial
from typing import Any, List, Tuple
from functools import lru_cache, partial
from typing import Any, Dict, List, Tuple

import gcsfs
from prompt_toolkit.application import Application
Expand All @@ -28,6 +28,8 @@
from prompt_toolkit.styles import Style
from prompt_toolkit.widgets import TextArea

from pgcs.file_system.base import Entry
from pgcs.file_system.entries import Bucket, Directory, File
from pgcs.preferences import PREF_FILE_PATH, GCSPref
from pgcs.utils import error_handler

Expand All @@ -38,12 +40,30 @@
SELECTED_CLASS = "class:selected"


@lru_cache
def get_file_info(file_path: str, preview: bool = False) -> str:
content = ""
if preview:
content = gfs.read_block(file_path, 0, 50, delimiter=b"\n").decode("utf-8")
file_stats = gfs.stat(file_path)
file_createdat = f"created_at: {file_stats['timeCreated']}"
file_updatedat = f"updated_at: {file_stats['updated']}"
return "\n".join((file_createdat, file_updatedat, content))


class CustomFormattedTextControl(FormattedTextControl):
def __init__(self, text: AnyFormattedText, *args: Any, **kwargs: Any) -> None:
def __init__(
self,
text: AnyFormattedText,
choices: Dict[str, Entry],
*args: Any,
**kwargs: Any,
) -> None:
super(CustomFormattedTextControl, self).__init__(
self._convert_callable_text(text), *args, **kwargs
)
self.pointed_at = 0
self._choices = choices

@property
def choice_count(self) -> int:
Expand Down Expand Up @@ -101,19 +121,23 @@ def _(event: KeyPressEvent) -> None:
def _(event: KeyPressEvent) -> None:
entry = to_plain_text(self.get_pointed_at()).strip()
if entry:
event.app.exit(result=os.path.dirname(os.path.dirname(entry)))
event.app.exit(result="left")

@bindings.add(Keys.ControlP)
def _(event: KeyPressEvent) -> None:
entry = to_plain_text(self.get_pointed_at()).strip()
entry_name = to_plain_text(self.get_pointed_at()).strip()
entry = self._choices[entry_name]
if entry:
event.app.clipboard.set_data(ClipboardData(f"gs://{entry}"))
event.app.clipboard.set_data(ClipboardData(entry.path()))

@bindings.add(Keys.ControlD)
def _(event: KeyPressEvent) -> None:
entry = to_plain_text(self.get_pointed_at()).strip()
if entry and gfs.exists(entry):
gfs.download(entry, ".", recursive=gfs.isdir(entry))
entry_name = to_plain_text(self.get_pointed_at()).strip()
entry = self._choices[entry_name]
if entry:
gfs.download(
entry.path(), ".", recursive=isinstance(entry, (Bucket, Directory))
)

@bindings.add(Keys.Enter)
def _(event: KeyPressEvent) -> None:
Expand All @@ -127,9 +151,8 @@ def _(event: KeyPressEvent) -> None:
)


@error_handler
def custom_select(
choices: List[str], max_preview_height: int = 10, **kwargs: Any
choices: Dict[str, Entry], max_preview_height: int = 10, **kwargs: Any
) -> str:
text_area = TextArea(prompt="QUERY> ", multiline=False)

Expand All @@ -142,23 +165,21 @@ def filter_candidates(choices: List[str]) -> List[Tuple[str, str]]:
]

control = CustomFormattedTextControl(
partial(filter_candidates, choices), focusable=True
partial(filter_candidates, choices), choices, focusable=True
)

candidates_display = ConditionalContainer(Window(control), ~IsDone())

def get_entry_info() -> str:
entry = to_plain_text(control.get_pointed_at()).strip()
if not entry:
entry_name = to_plain_text(control.get_pointed_at()).strip()
entry = choices.get(entry_name)
if entry is None:
return ""
if gfs.isfile(entry):
content: str = gfs.read_block(entry, 0, 50, delimiter=b"\n").decode("utf-8")
file_stats = gfs.stat(entry)
file_createdat = f"created_at: {file_stats['timeCreated']}"
file_updatedat = f"updated_at: {file_stats['updated']}"
content = "\n".join((file_createdat, file_updatedat, content))
else:
content = "\n".join(map(os.path.basename, gfs.ls(entry)[:10]))
content = ""
if isinstance(entry, File):
content = get_file_info(entry.path())
elif isinstance(entry, (Directory, Bucket)):
content = "\n".join(map(os.path.basename, entry.ls()[:10]))
return content

preview_control = FormattedTextControl(get_entry_info, focusable=False)
Expand All @@ -185,10 +206,30 @@ def get_entry_info() -> str:
return to_plain_text(app.run()).strip()


def traverse_gcs(choices: List[str]) -> str:
entry: str = custom_select(choices)
if not entry:
return traverse_gcs(gfs.buckets)
if gfs.isfile(entry):
@error_handler
def traverse_gcs(choices: Dict[str, Entry]) -> File:
result = custom_select(choices)
if result == "left":
entry = list(choices.values())[0]
if isinstance(entry, Bucket):
return traverse_gcs(entry.root) # type: ignore
elif isinstance(entry, Directory):
parent = entry.parent
if isinstance(parent, Bucket):
return traverse_gcs(parent.root) # type: ignore
return traverse_gcs(parent.parent.children) # type: ignore
elif isinstance(entry, File):
return traverse_gcs(entry.parent.parent.children) # type: ignore
else:
raise NotImplementedError

entry = choices[result]
if isinstance(entry, File):
return entry
return traverse_gcs(gfs.ls(entry))
if not entry.children: # type: ignore
for _, dirnames, filenames in gfs.walk(entry.path(), maxdepth=1):
for dirname in dirnames:
entry.add(Directory(dirname, entry))
for filename in filenames:
entry.add(File(filename, entry))
return traverse_gcs(entry.children) # type: ignore
Empty file added pgcs/file_system/__init__.py
Empty file.
23 changes: 23 additions & 0 deletions pgcs/file_system/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from __future__ import annotations

from abc import ABCMeta, abstractmethod


class Entry(metaclass=ABCMeta):
def __init__(self, name: str) -> None:
self._name = name

@property
def name(self) -> str:
return self._name

def __repr__(self) -> str:
return self._name

@abstractmethod
def path(self) -> str:
pass

@abstractmethod
def add(self, entry: Entry) -> None:
pass
88 changes: 88 additions & 0 deletions pgcs/file_system/entries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import annotations

import os
import pickle
from typing import Dict, List, Optional

from pgcs.file_system.base import Entry


class File(Entry):
def __init__(self, name: str, parent: Entry) -> None:
super().__init__(name)
self._parent = parent

@property
def parent(self) -> Entry:
return self._parent

def path(self) -> str:
return "/".join((self._parent.path(), self._name))

def add(self, entry: Entry) -> None:
raise NotImplementedError


class Directory(Entry):
def __init__(self, name: str, parent: Entry) -> None:
super().__init__(name)
self._parent = parent
self._children: Dict[str, Entry] = {}

@property
def parent(self) -> Entry:
return self._parent

@property
def children(self) -> Dict[str, Entry]:
return self._children

def path(self) -> str:
return "/".join((self._parent.path(), self._name))

def get(self, entry_name: str, default: Optional[Entry] = None) -> Optional[Entry]:
return self._children.get(entry_name, default)

def add(self, entry: Entry) -> None:
if entry.path().startswith(self.path()):
if entry.name not in self._children:
self._children[entry.name] = entry

def ls(self) -> List[str]:
return [entry.path() for entry in self._children.values()]


class Bucket(Entry):
def __init__(self, name: str, root: Dict[str, Entry]) -> None:
super().__init__(name)
self._root = root
self._children: Dict[str, Entry] = {}

@property
def root(self) -> Dict[str, Entry]:
return self._root

@property
def children(self) -> Dict[str, Entry]:
return self._children

def path(self) -> str:
return f"gs://{self._name}"

def get(self, entry_name: str, default: Optional[Entry] = None) -> Optional[Entry]:
return self._children.get(entry_name, default)

def add(self, entry: Entry) -> None:
if entry.path().startswith(self.path()):
if entry.name not in self._children:
self._children[entry.name] = entry

def ls(self) -> List[str]:
return [entry.path() for entry in self._children.values()]

def save(self, save_dir: str, force: bool = False) -> None:
os.makedirs(save_dir, exist_ok=True)
file_path = os.path.join(save_dir, self.name)
if force or not os.path.exists(file_path):
with open(file_path, "wb") as f:
pickle.dump(self, f)
24 changes: 19 additions & 5 deletions pgcs/main.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,44 @@
import argparse
import os
import pickle
from typing import Dict

import gcsfs

from pgcs.custom_select import traverse_gcs
from pgcs.file_system.base import Entry
from pgcs.file_system.entries import Bucket
from pgcs.preferences import PREF_FILE_PATH, GCSPref

gfs = gcsfs.GCSFileSystem()


def main() -> None:
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="cmd")
parser_traverse = subparsers.add_parser(
"traverse", help="default positional argument `pg` == `pg traverse`"
)
parser_traverse.add_argument("root", nargs="?")
parser_pref = subparsers.add_parser("pref", help="set pref")
parser_pref.add_argument("--init", action="store_true")
parser_pref.add_argument("key", nargs="?")
parser_pref.add_argument("value", nargs="?")
parser.set_defaults(cmd="traverse", root=None)
parser.set_defaults(cmd="traverse")
args = parser.parse_args()

pref = GCSPref.read() if PREF_FILE_PATH.exists() else GCSPref()
if args.cmd == "traverse":
gfs = gcsfs.GCSFileSystem()
buckets = gfs.ls(args.root) if args.root is not None else gfs.buckets
traverse_gcs(buckets)
root: Dict[str, Entry] = {}
for bucket in gfs.buckets:
if os.path.exists(pref.cache_dir / bucket.rstrip("/")):
with open(pref.cache_dir / bucket.rstrip("/"), "rb") as f:
root[bucket] = pickle.load(f)
else:
root[bucket] = Bucket(bucket.rstrip("/"), root)
traverse_gcs(root)
for bucket in root.values():
bucket.save(pref.cache_dir, force=True)

elif args.cmd == "pref":
if args.init:
new_pref = GCSPref()
Expand Down
2 changes: 2 additions & 0 deletions pgcs/preferences.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from pydantic import BaseModel

PREF_FILE_PATH = Path(__file__).parent / ".preference"
PREF_CACHE_DIR = Path(__file__).parent / ".cache"


class GCSPref(BaseModel, frozen=True):
ignore_case: bool = True
cache_dir: Path = PREF_CACHE_DIR

def write(self) -> None:
PREF_FILE_PATH.write_text(self.model_dump_json())
Expand Down
2 changes: 1 addition & 1 deletion pgcs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
exit()
pass

return wrapper
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file added tests/file_system/__init__.py
Empty file.
Loading
Loading