Skip to content

Commit

Permalink
Synchronize ByteSourcePayload in LocalBlobStore
Browse files Browse the repository at this point in the history
ByteSourcePayload's.openStream is not thread safe and lack of
synchronization can throw ArrayIndexOutOfBoundsExceptions.
Fixes gaul/s3proxy#303.
  • Loading branch information
gaul committed Jul 30, 2022
1 parent aea2603 commit dea756a
Showing 1 changed file with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,9 @@ public String copyBlob(String fromContainer, String fromName, String toContainer

InputStream is = null;
try {
is = blob.getPayload().openStream();
synchronized (blob.getPayload()) {
is = blob.getPayload().openStream();
}
ContentMetadata metadata = blob.getMetadata().getContentMetadata();
BlobBuilder.PayloadBlobBuilder builder = blobBuilder(toName)
.payload(is);
Expand Down Expand Up @@ -690,7 +692,9 @@ public Blob getBlob(String containerName, String key, GetOptions options) {
} else {
// This should not happen.
try {
byteSource = ByteSource.wrap(ByteStreams2.toByteArrayAndClose(blob.getPayload().openStream()));
synchronized (blob.getPayload()) {
byteSource = ByteSource.wrap(ByteStreams2.toByteArrayAndClose(blob.getPayload().openStream()));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -738,7 +742,10 @@ public Blob getBlob(String containerName, String key, GetOptions options) {
// return InputStream to more closely follow real blobstore
Payload payload;
try {
InputStream is = blob.getPayload().openStream();
InputStream is;
synchronized (blob.getPayload()) {
is = blob.getPayload().openStream();
}
if (is instanceof FileInputStream) {
// except for FileInputStream since large MPU can open too many fds
is.close();
Expand Down Expand Up @@ -1017,7 +1024,10 @@ public int read() throws IOException {
if (!blobs.hasNext()) {
return -1;
}
current = blobs.next().getPayload().openStream();
Payload payload = blobs.next().getPayload();
synchronized (payload) {
current = payload.openStream();
}
}
int result = current.read();
if (result == -1) {
Expand All @@ -1036,7 +1046,10 @@ public int read(byte[] b, int off, int len) throws IOException {
if (!blobs.hasNext()) {
return -1;
}
current = blobs.next().getPayload().openStream();
Payload payload = blobs.next().getPayload();
synchronized (payload) {
current = payload.openStream();
}
}
int result = current.read(b, off, len);
if (result == -1) {
Expand Down

0 comments on commit dea756a

Please sign in to comment.