Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make it possible to disable the buffer of ReadChannels returned from Storage.reader #1974

Merged
merged 1 commit into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import org.checkerframework.checker.nullness.qual.Nullable;

abstract class BaseStorageReadChannel<T> implements StorageReadChannel {
Expand All @@ -40,7 +40,7 @@ abstract class BaseStorageReadChannel<T> implements StorageReadChannel {
private ByteRangeSpec byteRangeSpec;
private int chunkSize = _2MiB;
private BufferHandle bufferHandle;
private LazyReadChannel<T> lazyReadChannel;
private LazyReadChannel<?, T> lazyReadChannel;

protected BaseStorageReadChannel(Decoder<T, BlobInfo> objectDecoder) {
this.objectDecoder = objectDecoder;
Expand All @@ -64,15 +64,18 @@ public final synchronized boolean isOpen() {
public final synchronized void close() {
open = false;
if (internalGetLazyChannel().isOpen()) {
StorageException.wrapIOException(internalGetLazyChannel().getChannel()::close);
ReadableByteChannel channel = internalGetLazyChannel().getChannel();
StorageException.wrapIOException(channel::close);
}
}

@Override
public final synchronized StorageReadChannel setByteRangeSpec(ByteRangeSpec byteRangeSpec) {
requireNonNull(byteRangeSpec, "byteRangeSpec must be non null");
StorageException.wrapIOException(() -> maybeResetChannel(false));
this.byteRangeSpec = byteRangeSpec;
if (!this.byteRangeSpec.equals(byteRangeSpec)) {
StorageException.wrapIOException(() -> maybeResetChannel(false));
this.byteRangeSpec = byteRangeSpec;
}
return this;
}

Expand All @@ -95,7 +98,7 @@ public final synchronized int read(ByteBuffer dst) throws IOException {
}
try {
// trap if the fact that tmp is already closed, and instead return -1
BufferedReadableByteChannel tmp = internalGetLazyChannel().getChannel();
ReadableByteChannel tmp = internalGetLazyChannel().getChannel();
if (!tmp.isOpen()) {
return -1;
}
Expand Down Expand Up @@ -146,7 +149,7 @@ protected final T getResolvedObject() {
}
}

protected abstract LazyReadChannel<T> newLazyReadChannel();
protected abstract LazyReadChannel<?, T> newLazyReadChannel();

private void maybeResetChannel(boolean freeBuffer) throws IOException {
if (lazyReadChannel != null) {
Expand All @@ -162,9 +165,9 @@ private void maybeResetChannel(boolean freeBuffer) throws IOException {
}
}

private LazyReadChannel<T> internalGetLazyChannel() {
private LazyReadChannel<?, T> internalGetLazyChannel() {
if (lazyReadChannel == null) {
LazyReadChannel<T> tmp = newLazyReadChannel();
LazyReadChannel<?, T> tmp = newLazyReadChannel();
ApiFuture<T> future = tmp.getSession().getResult();
ApiFutures.addCallback(
future,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.ReadChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest;
import com.google.cloud.storage.HttpDownloadSessionBuilder.ReadableByteChannelSessionBuilder;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.base.MoreObjects;
import java.io.Serializable;
Expand Down Expand Up @@ -56,16 +57,25 @@ public synchronized RestorableState<ReadChannel> capture() {
apiaryReadRequest, blobReadChannelContext.getStorageOptions(), getChunkSize());
}

protected LazyReadChannel<StorageObject> newLazyReadChannel() {
protected LazyReadChannel<?, StorageObject> newLazyReadChannel() {
return new LazyReadChannel<>(
() ->
ResumableMedia.http()
.read()
.byteChannel(blobReadChannelContext)
.setAutoGzipDecompression(autoGzipDecompression)
.buffered(getBufferHandle())
.setApiaryReadRequest(getApiaryReadRequest())
.build());
() -> {
ReadableByteChannelSessionBuilder b =
ResumableMedia.http()
.read()
.byteChannel(blobReadChannelContext)
.setAutoGzipDecompression(autoGzipDecompression);
BufferHandle bufferHandle = getBufferHandle();
// because we're erasing the specific type of channel, we need to declare it here.
// If we don't, the compiler complains we're not returning a compliant type.
ReadableByteChannelSession<?, StorageObject> session;
if (bufferHandle.capacity() > 0) {
session = b.buffered(bufferHandle).setApiaryReadRequest(getApiaryReadRequest()).build();
} else {
session = b.unbuffered().setApiaryReadRequest(getApiaryReadRequest()).build();
}
return session;
});
}

private ApiaryReadRequest getApiaryReadRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.cloud.ReadChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.storage.GapicDownloadSessionBuilder.ReadableByteChannelSessionBuilder;
import com.google.storage.v2.Object;
import com.google.storage.v2.ReadObjectRequest;
import com.google.storage.v2.ReadObjectResponse;
Expand Down Expand Up @@ -46,17 +47,27 @@ public RestorableState<ReadChannel> capture() {
}

@Override
protected LazyReadChannel<Object> newLazyReadChannel() {
protected LazyReadChannel<?, Object> newLazyReadChannel() {
return new LazyReadChannel<>(
() ->
ResumableMedia.gapic()
.read()
.byteChannel(read)
.setHasher(Hasher.noop())
.setAutoGzipDecompression(autoGzipDecompression)
.buffered(getBufferHandle())
.setReadObjectRequest(getReadObjectRequest())
.build());
() -> {
ReadableByteChannelSessionBuilder b =
ResumableMedia.gapic()
.read()
.byteChannel(read)
.setHasher(Hasher.noop())
.setAutoGzipDecompression(autoGzipDecompression);
BufferHandle bufferHandle = getBufferHandle();
// because we're erasing the specific type of channel, we need to declare it here.
// If we don't, the compiler complains we're not returning a compliant type.
ReadableByteChannelSession<?, Object> session;
if (bufferHandle.capacity() > 0) {
session =
b.buffered(getBufferHandle()).setReadObjectRequest(getReadObjectRequest()).build();
} else {
session = b.unbuffered().setReadObjectRequest(getReadObjectRequest()).build();
}
return session;
});
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,26 @@

package com.google.cloud.storage;

import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;

final class LazyReadChannel<T> {
final class LazyReadChannel<RBC extends ReadableByteChannel, T> {

private final Supplier<BufferedReadableByteChannelSession<T>> sessionSupplier;
private final Supplier<ReadableByteChannelSession<RBC, T>> sessionSupplier;

@MonotonicNonNull private volatile BufferedReadableByteChannelSession<T> session;
@MonotonicNonNull private volatile BufferedReadableByteChannel channel;
@MonotonicNonNull private volatile ReadableByteChannelSession<RBC, T> session;
@MonotonicNonNull private volatile RBC channel;

private boolean open = false;

LazyReadChannel(Supplier<BufferedReadableByteChannelSession<T>> sessionSupplier) {
LazyReadChannel(Supplier<ReadableByteChannelSession<RBC, T>> sessionSupplier) {
this.sessionSupplier = sessionSupplier;
}

@NonNull
BufferedReadableByteChannel getChannel() {
RBC getChannel() {
if (channel != null) {
return channel;
} else {
Expand All @@ -50,7 +50,7 @@ BufferedReadableByteChannel getChannel() {
}

@NonNull
BufferedReadableByteChannelSession<T> getSession() {
ReadableByteChannelSession<RBC, T> getSession() {
if (session != null) {
return session;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ public final class LazyReadChannelTest {

@Test
public void repeatedCallsOfGetSessionMustReturnTheSameInstance() {
LazyReadChannel<String> lrc = new LazyReadChannel<>(this::newTestSession);
LazyReadChannel<BufferedReadableByteChannel, String> lrc =
new LazyReadChannel<>(this::newTestSession);

BufferedReadableByteChannelSession<String> session1 = lrc.getSession();
BufferedReadableByteChannelSession<String> session2 = lrc.getSession();
ReadableByteChannelSession<BufferedReadableByteChannel, String> session1 = lrc.getSession();
ReadableByteChannelSession<BufferedReadableByteChannel, String> session2 = lrc.getSession();
assertThat(session1).isSameInstanceAs(session2);
}

@Test
public void repeatedCallsOfGetChannelMustReturnTheSameInstance() {
LazyReadChannel<String> lrc = new LazyReadChannel<>(this::newTestSession);
LazyReadChannel<BufferedReadableByteChannel, String> lrc =
new LazyReadChannel<>(this::newTestSession);

BufferedReadableByteChannel channel1 = lrc.getChannel();
BufferedReadableByteChannel channel2 = lrc.getChannel();
Expand All @@ -52,7 +54,8 @@ public void repeatedCallsOfGetChannelMustReturnTheSameInstance() {

@Test
public void isNotOpenUntilGetChannelIsCalled() {
LazyReadChannel<String> lrc = new LazyReadChannel<>(this::newTestSession);
LazyReadChannel<BufferedReadableByteChannel, String> lrc =
new LazyReadChannel<>(this::newTestSession);

assertThat(lrc.isOpen()).isFalse();
BufferedReadableByteChannel channel = lrc.getChannel();
Expand All @@ -63,7 +66,8 @@ public void isNotOpenUntilGetChannelIsCalled() {

@Test
public void closingUnderlyingChannelClosesTheLazyReadChannel() throws IOException {
LazyReadChannel<String> lrc = new LazyReadChannel<>(this::newTestSession);
LazyReadChannel<BufferedReadableByteChannel, String> lrc =
new LazyReadChannel<>(this::newTestSession);

BufferedReadableByteChannel channel = lrc.getChannel();
assertThat(channel.isOpen()).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.cloud.storage.it.runner.registry.Generator;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -107,6 +108,35 @@ public void storageReadChannel_getObject_returns() throws Exception {
}
}

@Test
// @CrossRun.Exclude(transports = Transport.GRPC)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we remove this line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'll add it's removal in my next PR (don't want to waste all the cpu time to do a full rebuild and test to remove this comment in a testclass)

public void storageReadChannel_shouldAllowDisablingBufferingBySettingChunkSize_lteq0()
throws IOException {
int _512KiB = 512 * 1024;
int _1MiB = 1024 * 1024;

final BlobInfo info;
byte[] uncompressedBytes = DataGenerator.base64Characters().genBytes(_512KiB);
{
BlobInfo tmp = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
Blob gen1 = storage.create(tmp, uncompressedBytes, BlobTargetOption.doesNotExist());
info = gen1.asBlobInfo();
}

try (ReadChannel c = storage.reader(info.getBlobId())) {
c.setChunkSize(0);

ByteBuffer buf = ByteBuffer.allocate(_1MiB);
// Because this is unbuffered, the underlying channel will not necessarily fill up the buf
// in a single read call. Repeatedly read until full or EOF.
int read = fillFrom(buf, c);
assertThat(read).isEqualTo(_512KiB);
String actual = xxd(buf);
String expected = xxd(uncompressedBytes);
assertThat(actual).isEqualTo(expected);
}
}

@Test
public void storageReadChannel_getObject_404() {
BlobId id = BlobId.of(bucket.getName(), generator.randomObjectName());
Expand All @@ -129,4 +159,17 @@ private static <T, F> void equalForField(T actual, T expected, Function<T, F> f)
F eF = f.apply(expected);
assertThat(aF).isEqualTo(eF);
}

static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException {
int total = 0;
while (buf.hasRemaining()) {
int read = c.read(buf);
if (read != -1) {
total += read;
} else {
break;
}
}
return total;
}
}