Skip to content

Commit

Permalink
Removed stream references from RemoteTransferContainer to completely …
Browse files Browse the repository at this point in the history
…remove stream ownership from here

Signed-off-by: vikasvb90 <vikasvb@amazon.com>
  • Loading branch information
vikasvb90 committed Dec 6, 2023
1 parent 9af223f commit d2713b4
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 36 deletions.
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/common/StreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ protected StreamContext(StreamContext streamContext) {
/**
* Vendor plugins can use this method to create new streams only when they are required for processing
* New streams won't be created till this method is called with the specific <code>partNumber</code>
* It is the responsibility of caller to ensure that stream is properly closed after consumption
* otherwise it can leak resources.
*
* @param partNumber The index of the part
* @return A stream reference to the part requested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.zip.CRC32;

import com.jcraft.jzlib.JZlib;
Expand All @@ -45,15 +46,15 @@ public class RemoteTransferContainer implements Closeable {
private long lastPartSize;

private final long contentLength;
private final SetOnce<InputStream[]> inputStreams = new SetOnce<>();
private final SetOnce<Supplier<Long>[]> checksumSuppliers = new SetOnce<>();
private final String fileName;
private final String remoteFileName;
private final boolean failTransferIfFileExists;
private final WritePriority writePriority;
private final long expectedChecksum;
private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
private final boolean isRemoteDataIntegritySupported;
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean readBlock = new AtomicBoolean();

private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class);

Expand Down Expand Up @@ -123,23 +124,24 @@ StreamContext supplyStreamContext(long partSize) {
}
}

@SuppressWarnings({ "unchecked" })
private StreamContext openMultipartStreams(long partSize) throws IOException {
if (inputStreams.get() != null) {
if (checksumSuppliers.get() != null) {
throw new IOException("Multi-part streams are already created.");
}

this.partSize = partSize;
this.lastPartSize = (contentLength % partSize) != 0 ? contentLength % partSize : partSize;
this.numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1);
InputStream[] streams = new InputStream[numberOfParts];
inputStreams.set(streams);
Supplier<Long>[] suppliers = new Supplier[numberOfParts];
checksumSuppliers.set(suppliers);

return new StreamContext(getTransferPartStreamSupplier(), partSize, lastPartSize, numberOfParts);
}

private CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException> getTransferPartStreamSupplier() {
return ((partNo, size, position) -> {
assert inputStreams.get() != null : "expected inputStreams to be initialised";
assert checksumSuppliers.get() != null : "expected container to be initialised";
return getMultipartStreamSupplier(partNo, size, position).get();
});
}
Expand All @@ -165,12 +167,19 @@ private LocalStreamSupplier<InputStreamContainer> getMultipartStreamSupplier(
OffsetRangeInputStream offsetRangeInputStream = offsetRangeInputStreamSupplier.get(size, position);
if (offsetRangeInputStream instanceof RateLimitingOffsetRangeInputStream) {
RateLimitingOffsetRangeInputStream rangeIndexInputStream = (RateLimitingOffsetRangeInputStream) offsetRangeInputStream;
rangeIndexInputStream.setClose(closed);
rangeIndexInputStream.setReadBlock(readBlock);
}
InputStream inputStream;
if (isRemoteDataIntegrityCheckPossible() == false) {
ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream(
offsetRangeInputStream,
fileName
);
Objects.requireNonNull(checksumSuppliers.get())[streamIdx] = resettableCheckedInputStream::getChecksum;
inputStream = resettableCheckedInputStream;
} else {
inputStream = offsetRangeInputStream;
}
InputStream inputStream = !isRemoteDataIntegrityCheckPossible()
? new ResettableCheckedInputStream(offsetRangeInputStream, fileName)
: offsetRangeInputStream;
Objects.requireNonNull(inputStreams.get())[streamIdx] = inputStream;

return new InputStreamContainer(inputStream, size, position);
} catch (IOException e) {
Expand Down Expand Up @@ -212,28 +221,23 @@ public long getContentLength() {
return contentLength;
}

private long getInputStreamChecksum(InputStream inputStream) {
assert inputStream instanceof ResettableCheckedInputStream
: "expected passed inputStream to be instance of ResettableCheckedInputStream";
return ((ResettableCheckedInputStream) inputStream).getChecksum();
}

private long getActualChecksum() {
InputStream[] currentInputStreams = Objects.requireNonNull(inputStreams.get());
long checksum = getInputStreamChecksum(currentInputStreams[0]);
for (int checkSumIdx = 1; checkSumIdx < Objects.requireNonNull(inputStreams.get()).length - 1; checkSumIdx++) {
checksum = JZlib.crc32_combine(checksum, getInputStreamChecksum(currentInputStreams[checkSumIdx]), partSize);
Supplier<Long>[] ckSumSuppliers = Objects.requireNonNull(checksumSuppliers.get());
long checksum = ckSumSuppliers[0].get();
for (int checkSumIdx = 1; checkSumIdx < ckSumSuppliers.length - 1; checkSumIdx++) {
checksum = JZlib.crc32_combine(checksum, ckSumSuppliers[checkSumIdx].get(), partSize);
}
if (numberOfParts > 1) {
checksum = JZlib.crc32_combine(checksum, getInputStreamChecksum(currentInputStreams[numberOfParts - 1]), lastPartSize);
checksum = JZlib.crc32_combine(checksum, ckSumSuppliers[numberOfParts - 1].get(), lastPartSize);
}

return checksum;
}

@Override
public void close() throws IOException {
closed.set(true);
// Setting a read block on all streams ever created by the container.
readBlock.set(true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class OffsetRangeIndexInputStream extends OffsetRangeInputStream {
private static final Logger logger = LogManager.getLogger(OffsetRangeIndexInputStream.class);
private final InputStreamIndexInput inputStreamIndexInput;
private final IndexInput indexInput;
private AtomicBoolean closed;
private AtomicBoolean readBlock;
private final OffsetRangeRefCount offsetRangeRefCount;
private final RunOnce closeOnce;

Expand All @@ -50,8 +50,8 @@ public OffsetRangeIndexInputStream(IndexInput indexInput, long size, long positi
}

@Override
public void setClose(AtomicBoolean close) {
this.closed = close;
public void setReadBlock(AtomicBoolean readBlock) {
this.readBlock = readBlock;
}

@Override
Expand All @@ -75,7 +75,7 @@ public int read(byte[] b, int off, int len) throws IOException {
//
// All these protection mechanisms are required in order to prevent invalid access to streams happening
// from the new S3 async SDK.
ensureOpen();
ensureReadable();
try (OffsetRangeRefCount ignored = getStreamReference()) {
return inputStreamIndexInput.read(b, off, len);
}
Expand All @@ -89,10 +89,10 @@ private OffsetRangeRefCount getStreamReference() {
return offsetRangeRefCount;
}

private void ensureOpen() {
if (closed != null && closed.get() == true) {
logger.debug("Read on stream was attempted after during the close of overall file stream!");
throw alreadyClosed("Already closed: ");
private void ensureReadable() {
if (readBlock != null && readBlock.get() == true) {
logger.debug("Read attempted on a stream which was read blocked!");
throw alreadyClosed("Read blocked stream.");
}
}

Expand All @@ -102,7 +102,7 @@ AlreadyClosedException alreadyClosed(String msg) {

@Override
public int read() throws IOException {
ensureOpen();
ensureReadable();
try (OffsetRangeRefCount ignored = getStreamReference()) {
return inputStreamIndexInput.read();
}
Expand Down Expand Up @@ -130,7 +130,7 @@ public long getFilePointer() throws IOException {

@Override
public String toString() {
return "OffsetRangeIndexInputStream{" + "indexInput=" + indexInput + ", closed=" + closed + '}';
return "OffsetRangeIndexInputStream{" + "indexInput=" + indexInput + ", readBlock=" + readBlock + '}';
}

private static class ClosingStreams {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
public abstract class OffsetRangeInputStream extends InputStream {
public abstract long getFilePointer() throws IOException;

public void setClose(AtomicBoolean close) {
public void setReadBlock(AtomicBoolean readBlock) {
// Nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public RateLimitingOffsetRangeInputStream(
this.delegate = delegate;
}

public void setClose(AtomicBoolean close) {
delegate.setClose(close);
public void setReadBlock(AtomicBoolean readBlock) {
delegate.setReadBlock(readBlock);
}

@Override
Expand Down

0 comments on commit d2713b4

Please sign in to comment.