Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 843 | Files: download files saved during Program execution #896

Merged
merged 12 commits into from
Sep 1, 2023
92 changes: 92 additions & 0 deletions client/quantum_serverless/core/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# This code is a Qiskit project.
#
# (C) Copyright IBM 2023.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.


"""
===============================================
Provider (:mod:`quantum_serverless.core.files`)
==============================================

.. currentmodule:: quantum_serverless.core.files

Quantum serverless files
========================

.. autosummary::
:toctree: ../stubs/

"""
import os.path
import uuid
from typing import List, Optional

import requests
from opentelemetry import trace
from tqdm import tqdm

from quantum_serverless.core.constants import REQUESTS_TIMEOUT
from quantum_serverless.utils.json import safe_json_request


class GatewayFilesClient:
"""GatewayFilesClient."""

def __init__(self, host: str, token: str, version: str):
"""Files client for Gateway service.

Args:
host: gateway host
version: gateway version
token: authorization token
"""
self.host = host
self.version = version
self._token = token

def download(self, file: str, download_location: str) -> Optional[str]:
"""Downloads file."""
tracer = trace.get_tracer("client.tracer")
psschwei marked this conversation as resolved.
Show resolved Hide resolved
with tracer.start_as_current_span("files.download"):
with requests.get(
f"{self.host}/api/{self.version}/files/download/",
params={"file": file},
stream=True,
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
) as req:
req.raise_for_status()

total_size_in_bytes = int(req.headers.get("content-length", 0))
chunk_size = 8192
progress_bar = tqdm(
total=total_size_in_bytes, unit="iB", unit_scale=True
)
file_name = f"downloaded_{str(uuid.uuid4())[:8]}_{file}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this download into an unique file name? Avoiding override or appending?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, to avoid overriding and we also return file name too, if user want to do something with it programmatically

with open(os.path.join(download_location, file_name), "wb") as f:
for chunk in req.iter_content(chunk_size=chunk_size):
progress_bar.update(len(chunk))
f.write(chunk)
progress_bar.close()
return file_name

def list(self) -> List[str]:
"""Returns list of available files to download produced by programs,"""
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("files.list"):
response_data = safe_json_request(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/files/",
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
)
)
return response_data.get("results", [])
16 changes: 16 additions & 0 deletions client/quantum_serverless/core/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
ENV_GATEWAY_PROVIDER_TOKEN,
GATEWAY_PROVIDER_VERSION_DEFAULT,
)
from quantum_serverless.core.files import GatewayFilesClient
from quantum_serverless.core.job import (
Job,
RayJobClient,
Expand Down Expand Up @@ -286,6 +287,14 @@ def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None) -> J

return job_client.run(program, arguments)

def files(self) -> List[str]:
"""Returns list of available files produced by programs to download."""
raise NotImplementedError

def download(self, file: str, download_location: str):
"""Download file."""
raise NotImplementedError

def widget(self):
"""Widget for information about provider and jobs."""
return Widget(self).show()
Expand Down Expand Up @@ -343,6 +352,7 @@ def __init__(
self._fetch_token(username, password)

self._job_client = GatewayJobClient(self.host, self._token, self.version)
self._files_client = GatewayFilesClient(self.host, self._token, self.version)

def get_compute_resources(self) -> List[ComputeResource]:
raise NotImplementedError("GatewayProvider does not support resources api yet.")
Expand All @@ -365,6 +375,12 @@ def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None) -> J
def get_jobs(self, **kwargs) -> List[Job]:
return self._job_client.list(**kwargs)

def files(self) -> List[str]:
return self._files_client.list()

def download(self, file: str, download_location: str = "./"):
return self._files_client.download(file, download_location)

def _fetch_token(self, username: str, password: str):
response_data = safe_json_request(
request=lambda: requests.post(
Expand Down
25 changes: 25 additions & 0 deletions client/quantum_serverless/quantum_serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,31 @@ def get_jobs(self, **kwargs):
"""
return self._selected_provider.get_jobs(**kwargs)

def files(self):
"""Returns list of available files to download.

Example:
>>> serverless = QuantumServerless()
>>> serverless.files()

Returns:
list of available files
"""
return self._selected_provider.files()

def download(self, file: str, download_location: str = "./"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is adding the delete after download option a bod idea?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if user downloads the file, he probably need it 😄
Probably we do not need this flag

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How he can delete it if he is done with it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, you mean delete from server. For now bucket policy will delete it or user can remove it in a job

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to delete it it's probably worth createing follow up issue request to add /delete endpoint for files

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and probably /upload would be great to add too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but so far we only had download request :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""Downloads file.
IceKhan13 marked this conversation as resolved.
Show resolved Hide resolved

Example:
>>> serverless = QuantumServerless()
>>> serverless.download('artifact.tar', directory="./")

Args:
file: name of file to download
download_location: destination directory. Default: current directory
"""
return self._selected_provider.download(file, download_location)

def context(
self,
provider: Optional[Union[str, BaseProvider]] = None,
Expand Down
1 change: 1 addition & 0 deletions client/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ qiskit>=0.44.0
qiskit-ibm-runtime>=0.11.3
redis>=4.6.0, <5.0
cloudpickle>=2.2.1
tqdm>=4.65.0
# opentelemetry
opentelemetry-api>=1.18.0
opentelemetry-sdk>=1.18.0
Expand Down
196 changes: 196 additions & 0 deletions docs/development/guides/08_file_download.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "aa077791-1bcf-43f6-ab2b-e04db48b9cb1",
"metadata": {},
"source": [
"# File download (Experimental)\n",
"\n",
"In this tutorial we will describe a way to retrieve files produced by programs.\n",
"\n",
"This function provides a way to download files produced by programs during execution. All you need is to call `QuantumServerless.download` function and pass `tar` file name to start downloading the file. Or you can list all available file to you by calling `QuantumServerless.files`.\n",
"\n",
"> &#x26A0; This interface is experimental, therefore it is subjected to breaking changes.\n",
"> \n",
"> Limitations:\n",
"> - only `tar` files are supported\n",
"> - `tar` file should be saved in `/data` directory during your program execution to be visible by `.files()` method call\n",
"> - only `/data` directory is supported, `/data/other_folder` will not be visible"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "93717e14-d06e-4e11-bd5b-6cdc3f1b1abd",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Attempting to instrument while already instrumented\n"
]
},
{
"data": {
"text/plain": [
"<QuantumServerless | providers [gateway-provider]>"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import os\n",
"from quantum_serverless import QuantumServerless, Provider, Program\n",
"\n",
"provider = Provider(\n",
" username=\"user\",\n",
" password=\"password123\",\n",
" host=os.environ.get(\"GATEWAY_HOST\", \"http://localhost:8000\"),\n",
")\n",
"\n",
"serverless = QuantumServerless(provider)\n",
"serverless"
]
},
{
"cell_type": "markdown",
"id": "fc30a74a-2100-40b8-a283-30bd51875b45",
"metadata": {},
"source": [
"Let's create a program to write `tar` file into `/data` folder"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "8d93f33b-f7f1-475d-b46e-1106cbe45cae",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<Job | 99e8c764-cc5f-4305-9238-471e8024f188>"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"program = Program(\n",
" title=\"File producer\",\n",
" entrypoint=\"produce_files.py\",\n",
" working_dir=\"./src/\"\n",
")\n",
"\n",
"job = serverless.run(program)\n",
"job"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "ecd0bb68-4d3c-450e-b363-a58fd91880b3",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'{\"Message\": \"my_file.txt archived into my_file.tar\"}'"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"job.result()"
]
},
{
"cell_type": "markdown",
"id": "f0e57aa6-5573-4f07-9ac8-753cb7998091",
"metadata": {},
"source": [
"Now we can look at files available using `files` method"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "08205fd4-b3d6-44d1-a33c-fb3918c26b12",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['my_file.tar', 'awesome.tar']"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"available_files = serverless.files()\n",
"available_files"
]
},
{
"cell_type": "markdown",
"id": "16c3d7a6-4cce-4ef0-a1f2-e8a6d7f2c531",
"metadata": {},
"source": [
"And download them if needed using `download` method"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "39ca652d-77d7-49d2-97e9-42b60963a671",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 196/196 [00:00<00:00, 141kiB/s]\n"
]
}
],
"source": [
"if len(available_files) > 0:\n",
" serverless.download(available_files[0])"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.16"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
11 changes: 11 additions & 0 deletions docs/development/guides/src/produce_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import os
import tarfile
from quantum_serverless import save_result

with open("./my_file.txt", "w") as f:
f.write("Hello!")

with tarfile.open("/data/my_file.tar", "w:gz") as tar:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file in the data directory is not deleted after this execution. The next execution of the same program (or generating the same name file) override the file. It it fails to generate the file, the old file may be down loaded. It is user responsibility to generate an unique file name so that it would not be override or confused?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is up to user to handle their files and names of them inside the program.

tar.add("./my_file.txt")

save_result({"Message": "my_file.txt archived into my_file.tar"})
4 changes: 3 additions & 1 deletion gateway/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def decrypt_env_vars(env_vars: Dict[str, str]) -> Dict[str, str]:
if "token" in key.lower():
try:
env_vars[key] = decrypt_string(value)
except Exception as decryption_error: # pylint: disable=broad-exception-caught
except (
Exception # pylint: disable=broad-exception-caught
) as decryption_error:
logger.error("Cannot decrypt %s. %s", key, decryption_error)
return env_vars
Loading