Skip to content

Commit

Permalink
Fix uploading in-memory files
Browse files Browse the repository at this point in the history
  • Loading branch information
aniezurawski committed Jun 28, 2022
1 parent 1f7d9ac commit 10c8abd
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 24 deletions.
16 changes: 9 additions & 7 deletions neptune/internal/storage/datastream.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import math
import os
import tarfile
from typing import Any, BinaryIO, Generator, Union
from typing import Any, BinaryIO, Generator, Optional, Union

from future.builtins import object

Expand All @@ -38,8 +38,10 @@ class FileChunk:


class FileChunker:
def __init__(self, filename, fobj, total_size, multipart_config: MultipartConfig):
self._filename = filename
def __init__(
self, filename: Optional[str], fobj, total_size, multipart_config: MultipartConfig
):
self._filename: Optional[str] = filename
self._fobj = fobj
self._total_size = total_size
self._min_chunk_size = multipart_config.min_chunk_size
Expand All @@ -51,7 +53,7 @@ def _get_chunk_size(self) -> int:
# can't fit it
max_size = self._max_chunk_count * self._max_chunk_size
raise InternalClientError(
f"File {self._filename} is too big to upload:"
f"File {self._filename or 'stream'} is too big to upload:"
f" {self._total_size} bytes exceeds max size {max_size}"
)
if self._total_size <= self._max_chunk_count * self._min_chunk_size:
Expand All @@ -64,11 +66,11 @@ def _get_chunk_size(self) -> int:
def generate(self) -> Generator[FileChunk, Any, None]:
chunk_size = self._get_chunk_size()
last_offset = 0
last_change = os.stat(self._filename).st_mtime
last_change: Optional = os.stat(self._filename).st_mtime if self._filename else None
while last_offset < self._total_size:
chunk = self._fobj.read(chunk_size)
if chunk:
if last_change < os.stat(self._filename).st_mtime:
if last_change and last_change < os.stat(self._filename).st_mtime:
raise UploadedFileChanged(self._filename)
if isinstance(chunk, str):
chunk = chunk.encode("utf-8")
Expand Down Expand Up @@ -118,7 +120,7 @@ def compress_to_tar_gz_in_memory(upload_entries) -> bytes:

with tarfile.TarFile.open(fileobj=f, mode="w|gz", dereference=True) as archive:
for entry in upload_entries:
archive.add(name=entry.source_path, arcname=entry.target_path, recursive=True)
archive.add(name=entry.source, arcname=entry.target_path, recursive=True)

f.seek(0)
data = f.read()
Expand Down
26 changes: 13 additions & 13 deletions neptune/internal/storage/storage_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,27 @@ class AttributeUploadConfiguration:


class UploadEntry(object):
def __init__(self, source_path: Union[str, BytesIO], target_path: str):
self.source_path = source_path
def __init__(self, source: Union[str, BytesIO], target_path: str):
self.source = source
self.target_path = target_path

def length(self) -> int:
if self.is_stream():
return self.source_path.getbuffer().nbytes
return self.source.getbuffer().nbytes
else:
return os.path.getsize(self.source_path)
return os.path.getsize(self.source)

def get_stream(self) -> Union[BinaryIO, io.BytesIO]:
if self.is_stream():
return self.source_path
return self.source
else:
return io.open(self.source_path, "rb")
return io.open(self.source, "rb")

def get_permissions(self) -> str:
if self.is_stream():
return "----------"
else:
return self.permissions_to_unix_string(self.source_path)
return self.permissions_to_unix_string(self.source)

@classmethod
def permissions_to_unix_string(cls, path):
Expand Down Expand Up @@ -92,7 +92,7 @@ def __hash__(self):
"""
Returns the hash of source and target path
"""
return hash((self.source_path, self.target_path))
return hash((self.source, self.target_path))

def to_str(self):
"""
Expand All @@ -107,7 +107,7 @@ def __repr__(self):
return self.to_str()

def is_stream(self):
return hasattr(self.source_path, "read")
return hasattr(self.source, "read")


class UploadPackage(object):
Expand Down Expand Up @@ -214,11 +214,11 @@ def scan_unique_upload_entries(upload_entries):
"""
walked_entries = set()
for entry in upload_entries:
if entry.is_stream() or not os.path.isdir(entry.source_path):
if entry.is_stream() or not os.path.isdir(entry.source):
walked_entries.add(entry)
else:
for root, _, files in os.walk(entry.source_path):
path_relative_to_entry_source = os.path.relpath(root, entry.source_path)
for root, _, files in os.walk(entry.source):
path_relative_to_entry_source = os.path.relpath(root, entry.source)
target_root = os.path.normpath(
os.path.join(entry.target_path, path_relative_to_entry_source)
)
Expand Down Expand Up @@ -249,7 +249,7 @@ def split_upload_files(
yield current_package
current_package.reset()
else:
size = os.path.getsize(entry.source_path)
size = os.path.getsize(entry.source)
if (
size + current_package.size > upload_configuration.chunk_size
or current_package.len > max_files
Expand Down
4 changes: 2 additions & 2 deletions neptune/new/internal/backends/hosted_file_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def upload_file_set_attribute(
creating_a_single_empty_dir = (
package.len == 1
and not package.items[0].is_stream()
and os.path.isdir(package.items[0].source_path)
and os.path.isdir(package.items[0].source)
)

if uploading_multiple_entries or creating_a_single_empty_dir or package.is_empty():
Expand Down Expand Up @@ -352,7 +352,7 @@ def _multichunk_upload(

upload_id = result.uploadId
chunker = FileChunker(
upload_entry.source_path,
None if upload_entry.is_stream() else upload_entry.source,
file_stream,
entry_length,
multipart_config,
Expand Down
2 changes: 1 addition & 1 deletion neptune/new/internal/backends/neptune_backend_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def download_file_set(

with ZipFile(target_file, "w") as zipObj:
for upload_entry in upload_entries:
zipObj.write(upload_entry.source_path, upload_entry.target_path)
zipObj.write(upload_entry.source, upload_entry.target_path)

def get_float_attribute(
self, container_id: str, container_type: ContainerType, path: List[str]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def test_split_upload_files_should_not_generate_empty_packages(self, getsize):
# GIVEN
entry = UploadEntry("/tmp/test.gz", "test.gz")
# AND
upload_entry = UploadEntry(entry.source_path, entry.target_path)
upload_entry = UploadEntry(entry.source, entry.target_path)
size = 10 * self.MAX_PACKAGE_SIZE
config = AttributeUploadConfiguration(size)
getsize.return_value = size
Expand Down

0 comments on commit 10c8abd

Please sign in to comment.