-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prompt before overwriting files, some error handling, and refactoring #34
Open
alichtman
wants to merge
4
commits into
sdushantha:master
Choose a base branch
from
alichtman:master
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,16 +7,30 @@ | |
import json | ||
import sys | ||
from colorama import Fore, Style, init | ||
from pathlib import Path | ||
|
||
init() | ||
|
||
# this ANSI code lets us erase the current line | ||
ERASE_LINE = "\x1b[2K" | ||
|
||
COLOR_NAME_TO_CODE = {"default": "", "red": Fore.RED, "green": Style.BRIGHT + Fore.GREEN} | ||
COLOR_NAME_TO_CODE = { | ||
"default": "", | ||
"red": Fore.RED, | ||
"green": Style.BRIGHT + Fore.GREEN, | ||
"yellow": Style.BRIGHT + Fore.YELLOW, | ||
} | ||
|
||
|
||
def print_text(text, color="default", in_place=False, **kwargs): # type: (str, str, bool, any) -> None | ||
class FileToDownload: | ||
def __init__(self, name, url, path, dest_path): | ||
self.name = name | ||
self.url = url | ||
self.path = path | ||
self.dest_path: Path = dest_path | ||
|
||
|
||
def print_text(text, color="default", in_place=False, **kwargs) -> None: | ||
""" | ||
print text to console, a wrapper to built-in print | ||
|
||
|
@@ -30,31 +44,107 @@ def print_text(text, color="default", in_place=False, **kwargs): # type: (str, | |
print(COLOR_NAME_TO_CODE[color] + text + Style.RESET_ALL, **kwargs) | ||
|
||
|
||
def prompt_yes_no(question: str, default: bool = True) -> bool: | ||
""" | ||
Prompt user for a yes/no question. | ||
|
||
:param question: question to ask | ||
:param default: default answer if user just presses enter | ||
:return: True if user answers yes, False if user answers no | ||
""" | ||
if default: | ||
yes_no = "Y/n" | ||
else: | ||
yes_no = "y/N" | ||
|
||
while True: | ||
print_text("{} [{}] ".format(question, yes_no), end="") | ||
choice = input().lower() | ||
if choice in {"y", "yes"}: | ||
return True | ||
elif choice in {"n", "no"}: | ||
return False | ||
elif choice == "": | ||
return default | ||
else: | ||
print_text("Please respond with 'yes' or 'no' (or 'y' or 'n').") | ||
|
||
|
||
def create_url(url): | ||
""" | ||
From the given url, produce a URL that is compatible with Github's REST API. Can handle blob or tree paths. | ||
""" | ||
repo_only_url = re.compile(r"https:\/\/github\.com\/[a-z\d](?:[a-z\d]|-(?=[a-z\d])){0,38}\/[a-zA-Z0-9]+$") | ||
repo_only_url = re.compile( | ||
r"https:\/\/github\.com\/[a-z\d](?:[a-z\d]|-(?=[a-z\d])){0,38}\/[a-zA-Z0-9]+$" | ||
) | ||
re_branch = re.compile("/(tree|blob)/(.+?)/") | ||
|
||
# Check if the given url is a url to a GitHub repo. If it is, tell the | ||
# user to use 'git clone' to download it | ||
if re.match(repo_only_url,url): | ||
print_text("β The given url is a complete repository. Use 'git clone' to download the repository", | ||
"red", in_place=True) | ||
if re.match(repo_only_url, url): | ||
print_text( | ||
"β The given url is a complete repository. Use 'git clone' to download the repository", | ||
"red", | ||
in_place=True, | ||
) | ||
sys.exit() | ||
|
||
# extract the branch name from the given url (e.g master) | ||
branch = re_branch.search(url) | ||
download_dirs = url[branch.end():] | ||
api_url = (url[:branch.start()].replace("github.com", "api.github.com/repos", 1) + | ||
"/contents/" + download_dirs + "?ref=" + branch.group(2)) | ||
if branch is None: | ||
print_text( | ||
"β Could not find branch name in the given url", "red", in_place=True | ||
) | ||
sys.exit() | ||
download_dirs = url[branch.end() :] | ||
api_url = ( | ||
url[: branch.start()].replace("github.com", "api.github.com/repos", 1) | ||
+ "/contents/" | ||
+ download_dirs | ||
+ "?ref=" | ||
+ branch.group(2) | ||
) | ||
return api_url, download_dirs | ||
|
||
|
||
def download(repo_url, flatten=False, output_dir="./"): | ||
""" Downloads the files and directories in repo_url. If flatten is specified, the contents of any and all | ||
sub-directories will be pulled upwards into the root folder. """ | ||
def download_file(file_to_download: FileToDownload, force: bool) -> None: | ||
if os.path.exists(file_to_download.dest_path) and not force: | ||
if prompt_yes_no( | ||
"β File {} already exists. Overwrite?".format(file_to_download.dest_path), | ||
default=False, | ||
): | ||
urllib.request.urlretrieve(file_to_download.url, file_to_download.dest_path) | ||
# bring the cursor to the beginning, erase the current line, and dont make a new line | ||
print_text( | ||
"Downloading (overwriting): " | ||
+ Fore.WHITE | ||
+ "{} to {}".format( | ||
file_to_download.name, file_to_download.dest_path.resolve() | ||
), | ||
"green", | ||
in_place=True, | ||
) | ||
else: | ||
print_text( | ||
"Skipped: " + Fore.WHITE + "{}".format(file_to_download.name), | ||
"yellow", | ||
in_place=True, | ||
) | ||
else: | ||
urllib.request.urlretrieve(file_to_download.url, file_to_download.dest_path) | ||
# bring the cursor to the beginning, erase the current line, and dont make a new line | ||
print_text( | ||
"Downloaded: " | ||
+ Fore.WHITE | ||
+ "{} to {}".format(file_to_download.name, file_to_download.dest_path), | ||
"green", | ||
in_place=True, | ||
) | ||
|
||
|
||
def download(repo_url, flatten=False, force=False, output_dir="./"): | ||
"""Downloads the files and directories in repo_url. If flatten is specified, the contents of any and all | ||
sub-directories will be pulled upwards into the root folder.""" | ||
|
||
# generate the url which returns the JSON data | ||
api_url, download_dirs = create_url(repo_url) | ||
|
@@ -68,10 +158,9 @@ def download(repo_url, flatten=False, output_dir="./"): | |
else: | ||
dir_out = output_dir | ||
|
||
dir_out = Path(dir_out) | ||
|
||
try: | ||
opener = urllib.request.build_opener() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These only need to be set once, globally. |
||
opener.addheaders = [('User-agent', 'Mozilla/5.0')] | ||
urllib.request.install_opener(opener) | ||
response = urllib.request.urlretrieve(api_url) | ||
except KeyboardInterrupt: | ||
# when CTRL+C is pressed during the execution of this script, | ||
|
@@ -80,8 +169,7 @@ def download(repo_url, flatten=False, output_dir="./"): | |
sys.exit() | ||
|
||
if not flatten: | ||
# make a directory with the name which is taken from | ||
# the actual repo | ||
# make a directory with the name which is taken from the actual repo | ||
os.makedirs(dir_out, exist_ok=True) | ||
|
||
# total files count | ||
|
@@ -95,81 +183,108 @@ def download(repo_url, flatten=False, output_dir="./"): | |
|
||
# If the data is a file, download it as one. | ||
if isinstance(data, dict) and data["type"] == "file": | ||
print("Single file download") | ||
try: | ||
# download the file | ||
opener = urllib.request.build_opener() | ||
opener.addheaders = [('User-agent', 'Mozilla/5.0')] | ||
urllib.request.install_opener(opener) | ||
urllib.request.urlretrieve(data["download_url"], os.path.join(dir_out, data["name"])) | ||
# bring the cursor to the beginning, erase the current line, and dont make a new line | ||
print_text("Downloaded: " + Fore.WHITE + "{}".format(data["name"]), "green", in_place=True) | ||
dest_path = dir_out / Path(data["name"]) | ||
file_to_download = FileToDownload( | ||
name=data["name"], | ||
url=data["download_url"], | ||
dest_path=dest_path, | ||
path=data["path"], | ||
) | ||
download_file(file_to_download, force) | ||
|
||
return total_files | ||
except KeyboardInterrupt: | ||
# when CTRL+C is pressed during the execution of this script, | ||
# bring the cursor to the beginning, erase the current line, and dont make a new line | ||
print_text("β Got interrupted", 'red', in_place=False) | ||
print_text("β Got interrupted", "red", in_place=False) | ||
sys.exit() | ||
|
||
for file in data: | ||
file_url = file["download_url"] | ||
file_name = file["name"] | ||
file_path = file["path"] | ||
|
||
if flatten: | ||
path = os.path.basename(file_path) | ||
path = Path(os.path.basename(file_path)) | ||
else: | ||
path = file_path | ||
dirname = os.path.dirname(path) | ||
path = Path(file_path) | ||
|
||
file_to_download = FileToDownload( | ||
name=file["name"], | ||
url=file["download_url"], | ||
dest_path=path, | ||
path=file["path"], | ||
) | ||
|
||
if dirname != '': | ||
os.makedirs(os.path.dirname(path), exist_ok=True) | ||
if path.parent != "": | ||
os.makedirs(path.parent, exist_ok=True) | ||
else: | ||
pass | ||
|
||
if file_url is not None: | ||
if file_to_download.url is not None: | ||
try: | ||
opener = urllib.request.build_opener() | ||
opener.addheaders = [('User-agent', 'Mozilla/5.0')] | ||
urllib.request.install_opener(opener) | ||
# download the file | ||
urllib.request.urlretrieve(file_url, path) | ||
|
||
# bring the cursor to the beginning, erase the current line, and dont make a new line | ||
print_text("Downloaded: " + Fore.WHITE + "{}".format(file_name), "green", in_place=False, end="\n", | ||
flush=True) | ||
|
||
download_file(file_to_download, force) | ||
except KeyboardInterrupt: | ||
# when CTRL+C is pressed during the execution of this script, | ||
# bring the cursor to the beginning, erase the current line, and dont make a new line | ||
print_text("β Got interrupted", 'red', in_place=False) | ||
print_text("β Got interrupted", "red", in_place=False) | ||
sys.exit() | ||
else: | ||
download(file["html_url"], flatten, download_dirs) | ||
download(file["html_url"], flatten, force, download_dirs) | ||
|
||
return total_files | ||
|
||
|
||
def set_up_url_opener(): | ||
""" | ||
Set up the URL opener to mimic a browser. | ||
""" | ||
opener = urllib.request.build_opener() | ||
opener.addheaders = [("User-agent", "Mozilla/5.0")] | ||
urllib.request.install_opener(opener) | ||
|
||
|
||
def main(): | ||
if sys.platform != 'win32': | ||
if sys.platform != "win32": | ||
# disbale CTRL+Z | ||
signal.signal(signal.SIGTSTP, signal.SIG_IGN) | ||
|
||
parser = argparse.ArgumentParser(description="Download directories/folders from GitHub") | ||
parser.add_argument('urls', nargs="+", | ||
help="List of Github directories to download.") | ||
parser.add_argument('--output_dir', "-d", dest="output_dir", default="./", | ||
help="All directories will be downloaded to the specified directory.") | ||
|
||
parser.add_argument('--flatten', '-f', action="store_true", | ||
help='Flatten directory structures. Do not create extra directory and download found files to' | ||
' output directory. (default to current directory if not specified)') | ||
parser = argparse.ArgumentParser( | ||
description="Download directories/folders from GitHub" | ||
) | ||
parser.add_argument( | ||
"urls", nargs="+", help="List of Github directories to download." | ||
) | ||
parser.add_argument( | ||
"--output_dir", | ||
"-d", | ||
dest="output_dir", | ||
default="./", | ||
help="All directories will be downloaded to the specified directory.", | ||
) | ||
|
||
parser.add_argument( | ||
"--flatten", | ||
"-f", | ||
action="store_true", | ||
help="Flatten directory structures. Do not create extra directory and download found files to" | ||
" output directory. (default to current directory if not specified)", | ||
) | ||
|
||
parser.add_argument( | ||
"--force", | ||
action="store_true", | ||
help="Force overwriting existing files.", | ||
) | ||
|
||
args = parser.parse_args() | ||
|
||
set_up_url_opener() | ||
|
||
flatten = args.flatten | ||
for url in args.urls: | ||
total_files = download(url, flatten, args.output_dir) | ||
download(url, flatten, args.force, args.output_dir) | ||
|
||
print_text("β Download complete", "green", in_place=True) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimally, I think this would try common branch names like
main
andmaster
, but that's out of scope for this PR