-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
296 additions
and
67 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
import json | ||
import os | ||
import urllib.error | ||
import urllib.parse | ||
import urllib.request | ||
|
||
""" | ||
From: /kb/dev_container/narrative/src/biokbase/narrative/staging/helper.py | ||
KBase staging.helper class Tianhao Gu <tgu@anl.gov> | ||
""" | ||
|
||
STAGING_URL = 'https://kbase.us/services/staging_service' | ||
|
||
CHUNK_SIZE = 16 * 1024 | ||
|
||
|
||
class Staging: | ||
|
||
def __fetch_url(self, end_point, values=None, headers=None, method="GET", save_path=None): | ||
"""Fetching URL | ||
By default, it sends a GET request with {"Authorization": $KB_AUTH_TOKEN} header | ||
""" | ||
if save_path and os.path.exists(save_path): | ||
raise ValueError( | ||
f"A file exists at {save_path} but this method does not overwrite files" | ||
) | ||
|
||
data = None | ||
if values: | ||
data = urllib.parse.urlencode(values) | ||
|
||
if not headers: | ||
headers = {"Authorization": self._token} | ||
|
||
req = urllib.request.Request(end_point, data, headers) | ||
req.get_method = lambda: method | ||
try: | ||
response = urllib.request.urlopen(req) | ||
except urllib.error.URLError as e: | ||
error_msg = "The server could not fulfill the request.\n" | ||
|
||
server_msg = e.read() | ||
if server_msg: | ||
error_msg += f"Server message: {server_msg}\n" | ||
|
||
if hasattr(e, "reason"): | ||
error_msg += f"Reason: {e.reason}\n" | ||
|
||
if hasattr(e, "code"): | ||
error_msg += f"Error code: {e.code}\n" | ||
|
||
raise ValueError(error_msg) from e | ||
|
||
if not save_path: | ||
return response.read() | ||
|
||
with open(save_path, "wb") as f: | ||
while True: | ||
chunk = response.read(CHUNK_SIZE) | ||
if not chunk: | ||
break | ||
f.write(chunk) | ||
|
||
def __init__(self, token): | ||
"""Initializes a new Helper instance.""" | ||
self._token = token | ||
self._staging_url = STAGING_URL | ||
|
||
def list(self, directory=""): | ||
"""Calling LIST endpoint and return a list of file path""" | ||
end_point = self._staging_url + "/list/" + directory | ||
response = self.__fetch_url(end_point) | ||
|
||
resp_json = json.loads(response) | ||
|
||
file_list = [] | ||
for file in resp_json: | ||
if not file.get("isFolder"): | ||
file_list.append(file.get("path")) | ||
|
||
return sorted(file_list) | ||
|
||
def metadata(self, path=""): | ||
"""Calling METADATA endpoint and return metadata in JSON format""" | ||
if not path: | ||
raise ValueError("Must provide path argument") | ||
|
||
end_point = self._staging_url + "/metadata/" + path | ||
response = self.__fetch_url(end_point) | ||
|
||
return json.loads(response) | ||
|
||
def jgi_metadata(self, path=""): | ||
"""Calling JGI-METADATA endpoint and return metadata in JSON format""" | ||
if not path: | ||
raise ValueError("Must provide path argument") | ||
|
||
end_point = self._staging_url + "/jgi-metadata/" + path | ||
response = self.__fetch_url(end_point) | ||
|
||
return json.loads(response) | ||
|
||
def search(self, path=""): | ||
"""Calling SEARCH endpoint and return server response in JSON format""" | ||
if not path: | ||
raise ValueError("Must provide path argument") | ||
|
||
end_point = self._staging_url + "/search/" + path | ||
response = self.__fetch_url(end_point) | ||
|
||
return json.loads(response) | ||
|
||
def delete(self, path=""): | ||
"""Calling DELETE endpoint and return server response in JSON format""" | ||
if not path: | ||
raise ValueError("Must provide path argument") | ||
|
||
end_point = self._staging_url + "/delete/" + path | ||
response = self.__fetch_url(end_point, method="DELETE") | ||
|
||
return {"server_response": response} | ||
|
||
def download(self, path, save_location=None): | ||
"""Calling DOWNLOAD endpoint and saving the resulting file""" | ||
if not save_location: | ||
save_location = "./" + os.path.basename(path) | ||
|
||
end_point = self._staging_url + "/download/" + path | ||
self.__fetch_url(end_point, save_path=save_location) | ||
|
||
return save_location | ||
|
||
def mv(self, path="", new_path=""): | ||
"""Calling MV endpoint and return server response in JSON format""" | ||
if not path: | ||
raise ValueError("Must provide path argument") | ||
|
||
if not new_path: | ||
raise ValueError("Must provide new_path argument") | ||
|
||
end_point = self._staging_url + "/mv/" + path | ||
body_values = {"newPath": new_path} | ||
response = self.__fetch_url(end_point, values=body_values, method="PATCH") | ||
|
||
return {"server_response": response} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.