Skip to content

Commit

Permalink
Fix GCS multiple writing functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
mpenkov authored Mar 16, 2020
2 parents 9ff0215 + 8f6752d commit aab385c
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 73 deletions.
164 changes: 113 additions & 51 deletions smart_open/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@

_WHENCE_CHOICES = (START, CURRENT, END)

_SUCCESSFUL_STATUS_CODES = (200, 201)
_UPLOAD_INCOMPLETE_STATUS_CODE = 308
_UPLOAD_COMPLETE_STATUS_CODES = (200, 201)


def _make_range_string(start, stop=None, end=_UNKNOWN_FILE_SIZE):
Expand Down Expand Up @@ -90,6 +91,18 @@ def __init__(self, message, status_code, text):
self.status_code = status_code
self.text = text

@classmethod
def from_response(cls, response, part_num, content_length, total_size, headers):
status_code = response.status_code
response_text = response.text
total_size_gb = total_size / 1024.0 ** 3

msg = (
"upload failed (status code: %(status_code)d, response text: %(response_text)s), "
"part #%(part_num)d, %(total_size)d bytes (total %(total_size_gb).3fGB), headers: %(headers)r"
) % locals()
return cls(msg, response.status_code, response.text)


def open(
bucket_id,
Expand Down Expand Up @@ -362,16 +375,8 @@ def __str__(self):
return "(%s, %r, %r)" % (self.__class__.__name__, self._bucket.name, self._blob.name)

def __repr__(self):
return (
"%s("
"bucket=%r, "
"blob=%r, "
"buffer_size=%r)"
) % (
self.__class__.__name__,
self._bucket.name,
self._blob.name,
self._current_part_size,
return "%s(bucket=%r, blob=%r, buffer_size=%r)" % (
self.__class__.__name__, self._bucket.name, self._blob.name, self._current_part_size,
)


Expand Down Expand Up @@ -399,6 +404,7 @@ def __init__(

self._total_size = 0
self._total_parts = 0
self._bytes_uploaded = 0
self._current_part = io.BytesIO()

self._session = google_requests.AuthorizedSession(self._credentials)
Expand All @@ -421,12 +427,18 @@ def flush(self):
#
def close(self):
logger.debug("closing")
if self._total_size == 0: # empty files
self._upload_empty_part()
if self._current_part.tell():
self._upload_next_part()
if not self.closed:
if self._total_size == 0: # empty files
self._upload_empty_part()
else:
self._upload_final_part()
self._client = None
logger.debug("successfully closed")

@property
def closed(self):
return self._client is None

def writable(self):
"""Return True if the stream supports writing."""
return True
Expand All @@ -453,7 +465,13 @@ def write(self, b):
self._current_part.write(b)
self._total_size += len(b)

if self._current_part.tell() >= self._min_part_size:
#
# If the size of this part is precisely equal to the minimum part size,
# we don't perform the actual write now, and wait until we see more data.
# We do this because the very last part of the upload must be handled slightly
# differently (see comments in the _upload_next_part method).
#
if self._current_part.tell() > self._min_part_size:
self._upload_next_part()

return len(b)
Expand All @@ -470,49 +488,101 @@ def terminate(self):
#
def _upload_next_part(self):
part_num = self._total_parts + 1

# upload the largest amount possible given GCS's restriction
# of parts being multiples of 256kB, except for the last one
size_of_leftovers = self._current_part.tell() % self._min_part_size
content_length = self._current_part.tell() - size_of_leftovers

# a final upload of 0 bytes does not work, so we need to guard against this edge case
# this results in occasionally keeping an additional 256kB in the buffer after uploading a part,
# but until this is fixed on Google's end there is no other option
# https://stackoverflow.com/questions/60230631/upload-zero-size-final-part-to-google-cloud-storage-resumable-upload
if size_of_leftovers == 0:
content_length -= _REQUIRED_CHUNK_MULTIPLE

total_size = self._bytes_uploaded + content_length

start = self._bytes_uploaded
stop = total_size - 1

self._current_part.seek(0)

headers = {
'Content-Length': str(content_length),
'Content-Range': _make_range_string(start, stop, _UNKNOWN_FILE_SIZE),
}

logger.info(
"uploading part #%i, %i bytes (total %.3fGB)",
part_num,
self._current_part.tell(),
self._total_size / 1024.0 ** 3
"uploading part #%i, %i bytes (total %.3fGB) headers %r",
part_num, content_length, total_size / 1024.0 ** 3, headers,
)
content_length = end = self._current_part.tell()
start = self._total_size - content_length
stop = self._total_size - 1

self._current_part.seek(0)
response = self._session.put(
self._resumable_upload_url,
data=self._current_part.read(content_length),
headers=headers,
)

if response.status_code != _UPLOAD_INCOMPLETE_STATUS_CODE:
raise UploadFailedError.from_response(
response,
part_num,
content_length,
self._total_size,
headers,
)
logger.debug("upload of part #%i finished" % part_num)

self._total_parts += 1
self._bytes_uploaded += content_length
# handle the leftovers
self._current_part = io.BytesIO(self._current_part.read())
self._current_part.seek(0, io.SEEK_END)

def _upload_final_part(self):
part_num = self._total_parts + 1
content_length = self._current_part.tell()
stop = self._total_size - 1
start = self._bytes_uploaded

headers = {
'Content-Length': str(content_length),
'Content-Range': _make_range_string(start, stop, end)
'Content-Range': _make_range_string(start, stop, self._total_size),
}
response = self._session.put(self._resumable_upload_url, data=self._current_part, headers=headers)

if response.status_code not in _SUCCESSFUL_STATUS_CODES:
msg = (
"upload failed ("
"status code: %i"
"response text=%s, "
"part #%i, "
"%i bytes (total %.3fGB)"
) % (
response.status_code,
response.text,

logger.info(
"uploading part #%i, %i bytes (total %.3fGB) headers %r",
part_num, content_length, self._total_size / 1024.0 ** 3, headers,
)

self._current_part.seek(0)

response = self._session.put(
self._resumable_upload_url,
data=self._current_part,
headers=headers,
)

if response.status_code not in _UPLOAD_COMPLETE_STATUS_CODES:
raise UploadFailedError.from_response(
response,
part_num,
self._current_part.tell(),
self._total_size / 1024.0 ** 3,
content_length,
self._total_size,
headers,
)
raise UploadFailedError(msg, response.status_code, response.text)
logger.debug("upload of part #%i finished" % part_num)

self._total_parts += 1
self._bytes_uploaded += content_length
self._current_part = io.BytesIO()

def _upload_empty_part(self):
logger.debug("creating empty file")
headers = {'Content-Length': '0'}
response = self._session.put(self._resumable_upload_url, headers=headers)
assert response.status_code in _SUCCESSFUL_STATUS_CODES
assert response.status_code in _UPLOAD_COMPLETE_STATUS_CODES

self._total_parts += 1

Expand All @@ -529,14 +599,6 @@ def __str__(self):
return "(%s, %r, %r)" % (self.__class__.__name__, self._bucket.name, self._blob.name)

def __repr__(self):
return (
"%s("
"bucket=%r, "
"blob=%r, "
"min_part_size=%r)"
) % (
self.__class__.__name__,
self._bucket.name,
self._blob.name,
self._min_part_size,
return "%s(bucket=%r, blob=%r, min_part_size=%r)" % (
self.__class__.__name__, self._bucket.name, self._blob.name, self._min_part_size,
)
Loading

0 comments on commit aab385c

Please sign in to comment.