Skip to content

Commit

Permalink
Implemented cross-terminal memos sharing feature (#17)
Browse files Browse the repository at this point in the history
* add signup login refresh_token password_update push pull supoort

* set https path and add small test
  • Loading branch information
Asugawara authored Feb 18, 2024
1 parent e850eb9 commit 7fe66c8
Show file tree
Hide file tree
Showing 11 changed files with 474 additions and 7 deletions.
Empty file added pmemo/api/__init__.py
Empty file.
74 changes: 74 additions & 0 deletions pmemo/api/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import json
from typing import Optional

import requests
from logzero import logger
from prompt_toolkit import prompt

from pmemo.api.config import APIConfig, Tokens


def input_email() -> str:
return prompt("Email:")


def input_password() -> str:
return prompt("Password:", is_password=True)


class APIAuthenticator:
def __init__(self) -> None:
self._config = APIConfig()

def _input_user_info(self) -> tuple[str, str]:
email = input_email()
password = input_password()
return email, password

def signup(self) -> Tokens:
email, password = self._input_user_info()
res = requests.post(
self._config.signup, data=json.dumps({"email": email, "password": password})
)
tokens = res.json()
if res.status_code == requests.codes.ok:
logger.info("Signup Success")
else:
logger.error("Signup Failed")
return Tokens(
token=tokens.get("idToken", ""),
refresh_token=tokens.get("refreshToken", ""),
)

def _refresh_token(self, refresh_token: str) -> Tokens:
res = requests.post(
self._config.refresh_token,
data=json.dumps({"refresh_token": refresh_token.strip('"')}),
)
tokens = res.json()
if res.status_code == requests.codes.ok:
logger.info("Refresh Token Success")
else:
logger.error("Refresh Token Failed")
return Tokens(
token=tokens.get("id_token", ""),
refresh_token=tokens.get("refresh_token", refresh_token),
)

def login(self, refresh_token: Optional[str]) -> Tokens:
if refresh_token is not None:
return self._refresh_token(refresh_token)
email, password = self._input_user_info()
res = requests.post(
self._config.login,
data=json.dumps({"email": email, "password": password}),
)
tokens = res.json()
if res.status_code == requests.codes.ok:
logger.info("Login Success")
else:
logger.error("Login Failed")
return Tokens(
token=tokens.get("idToken", ""),
refresh_token=tokens.get("refreshToken", ""),
)
52 changes: 52 additions & 0 deletions pmemo/api/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import json

import requests
from cryptography.fernet import Fernet
from logzero import logger

from pmemo.api.config import APIConfig, Tokens


class APIClient:
def __init__(self, tokens: Tokens, encryption_key: bytes) -> None:
self._config = APIConfig()
self._tokens = tokens
self._fernet = Fernet(encryption_key)

def store_memo(self, file_name: bytes, content: bytes) -> None:
# Note: Identical file names should yield same encryption
IV = b"\xb3\x03\xbdVn\xeejKH\xc4\x0c\x83\xa7\xba_\x8e"
encrypted_file_name = self._fernet._encrypt_from_parts(file_name, 0, IV).decode(
"utf-8"
)
encrypted_content = self._fernet.encrypt(content).decode("utf-8")
res = requests.post(
self._config.memos,
headers={"Authorization": f"Bearer {self._tokens.token}"},
data=json.dumps(
dict(file_name=encrypted_file_name, content=encrypted_content)
),
)
if res.status_code == requests.codes.ok:
logger.info("Memo stored successfully: %s", file_name.decode("utf-8"))
else:
logger.error("Failed to store memo")
if self._tokens.token:
logger.error("Maybe login again to refresh the token")

def get_memos(self) -> list[str]:
res = requests.get(
self._config.memos,
headers={"Authorization": f"Bearer {self._tokens.token}"},
)
if res.status_code != requests.codes.ok:
logger.error("Failed to get memos")
if self._tokens.token:
logger.error("Maybe login again to refresh the token")
return []
decrypted_contents = []
for memo in res.json():
decrypted_contents.append(
self._fernet.decrypt(memo["content"].encode()).decode("utf-8")
)
return decrypted_contents
33 changes: 33 additions & 0 deletions pmemo/api/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os

from pydantic import BaseModel


class APIConfig(BaseModel, frozen=True):
domain: str = "https://pmemo.asugawara.com"
version: str = "v1"

@property
def signup(self) -> str:
return os.path.join(self.domain, self.version, "signup")

@property
def login(self) -> str:
return os.path.join(self.domain, self.version, "login")

@property
def refresh_token(self) -> str:
return os.path.join(self.domain, self.version, "refresh_token")

@property
def password_update(self) -> str:
return os.path.join(self.domain, self.version, "password_update")

@property
def memos(self) -> str:
return os.path.join(self.domain, self.version, "memos")


class Tokens(BaseModel, frozen=True):
token: str = ""
refresh_token: str = ""
59 changes: 58 additions & 1 deletion pmemo/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
import copy
import subprocess

from logzero import logger
from rich.console import Console
from rich.markdown import Markdown

from pmemo.api.auth import APIAuthenticator
from pmemo.api.client import APIClient
from pmemo.api.config import Tokens
from pmemo.custom_select import custom_select, select_file
from pmemo.extensions.openai_completion import OpenAiCompletion
from pmemo.extensions.prompt_template_manager import (
Expand Down Expand Up @@ -33,6 +37,13 @@ def update_pref(editor: PmemoEditor, pref: dict) -> dict:
return pref


def update_tokens(pref: PmemoPref, tokens: Tokens) -> None:
new_pref_dict = pref.model_dump()
new_pref_dict["api_pref"]["user_token"] = tokens.token
new_pref_dict["api_pref"]["user_refresh_token"] = tokens.refresh_token
PmemoPref.model_validate(new_pref_dict).write()


def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="cmd")
Expand Down Expand Up @@ -60,6 +71,12 @@ def main():
)
parser_templates.add_argument("-e", "--edit", action="store_true")
parser_run = subparsers.add_parser("run", help="run codeblock")
parser_signup = subparsers.add_parser("signup", help="signup to pmemo")
parser_login = subparsers.add_parser("login", help="login to pmemo")
parser_push = subparsers.add_parser("push", help="push memo to db with encryption")
parser_pull = subparsers.add_parser(
"pull", help="pull memo from db with decryption"
)
parser.set_defaults(cmd="new")
args = parser.parse_args()

Expand Down Expand Up @@ -144,8 +161,48 @@ def main():
target_file = custom_select(codeblock_candidates)
subprocess.run(["python3", pref.out_dir / memo_title / target_file])

elif args.cmd == "signup":
tokens = APIAuthenticator().signup()
update_tokens(pref, tokens)

elif args.cmd == "login":
tokens = APIAuthenticator().login(pref.api_pref.user_refresh_token)
update_tokens(pref, tokens)

elif args.cmd == "push":
if pref.api_pref.user_token is None:
logger.error("You need to signup/login first")
return

memos = sort_by_mtime(pref.out_dir, "*/*.md")
tokens = Tokens(
token=pref.api_pref.user_token,
refresh_token=pref.api_pref.user_refresh_token,
)
client = APIClient(tokens, pref.api_pref.encryption_key)
for memo in memos:
client.store_memo(memo.name.encode(), memo.read_bytes())

elif args.cmd == "pull":
if pref.api_pref.user_token is None:
logger.error("You need to signup/login first")
return

tokens = Tokens(
token=pref.api_pref.user_token,
refresh_token=pref.api_pref.user_refresh_token,
)
client = APIClient(tokens, pref.api_pref.encryption_key)
try:
for memo_content in client.get_memos():
pulled_memo = Memo(pref.out_dir, memo_content)
pulled_memo.save(show_diff=True)
except KeyboardInterrupt:
logger.error("Pulling canceled")
pass

else:
raise NotImplementedError
raise NotImplementedError(f"Unknown command: {args.cmd}")


if __name__ == "__main__":
Expand Down
19 changes: 18 additions & 1 deletion pmemo/memo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import hashlib
import re
import sys
from difflib import context_diff
from pathlib import Path

from pmemo.utils import confirm_overwrite, confirm_remove
Expand Down Expand Up @@ -142,10 +144,25 @@ def remove(self) -> None:
file.unlink()
self.file_path.parent.rmdir()

def save(self) -> None:
def save(self, show_diff: bool = False) -> None:
"""
Saves the memo to the file system, including associated code blocks.
"""
if show_diff and self.file_path.exists():
existing_content = self.file_path.read_text().splitlines()
new_content = self._content.splitlines()
sys.stderr.writelines(
"\n".join(
context_diff(
existing_content,
new_content,
fromfile=self.file_path.name,
tofile="pulled content",
)
)
)
sys.stderr.flush()

if confirm_overwrite(self.file_path):
self.file_path.write_text(self._content)

Expand Down
10 changes: 9 additions & 1 deletion pmemo/preferences.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from pathlib import Path
from typing import Optional

from cryptography.fernet import Fernet
from prompt_toolkit.keys import Keys
from pydantic import BaseModel, PositiveInt
from pydantic import BaseModel, Field, PositiveInt

PREF_FILE_PATH = Path(__file__).parent / ".preference"
DEFAULT_PMEMO_DIR = Path.home() / ".pmemo"
Expand Down Expand Up @@ -92,11 +93,18 @@ class ExtensionsPref(BaseModel, frozen=True):
template_pref: TemplateManagerPref = TemplateManagerPref()


class ApiPref(BaseModel, frozen=True):
encryption_key: bytes = Field(default_factory=Fernet.generate_key)
user_token: Optional[str] = None
user_refresh_token: Optional[str] = None


class PmemoPref(BaseModel, frozen=True):
out_dir: Path = DEFAULT_PMEMO_DIR
memo_pref: MemoPref = MemoPref()
editor_pref: EditorPref = EditorPref()
extensions_pref: ExtensionsPref = ExtensionsPref()
api_pref: ApiPref = ApiPref()

def write(self) -> None:
PREF_FILE_PATH.write_text(self.model_dump_json())
Expand Down
Loading

0 comments on commit 7fe66c8

Please sign in to comment.