Skip to content

Commit

Permalink
Fixes to prevent segment fault errors arising due to unexpected SDK b…
Browse files Browse the repository at this point in the history
…ehaviour

Signed-off-by: vikasvb90 <vikasvb@amazon.com>
  • Loading branch information
vikasvb90 committed Nov 28, 2023
1 parent 5bb6cae commit c671791
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
import org.opensearch.common.blobstore.transfer.stream.RateLimitingOffsetRangeInputStream;
import org.opensearch.common.blobstore.transfer.stream.ResettableCheckedInputStream;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.util.ByteUtils;
Expand All @@ -27,6 +28,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.CRC32;

import com.jcraft.jzlib.JZlib;
Expand All @@ -51,6 +53,7 @@ public class RemoteTransferContainer implements Closeable {
private final long expectedChecksum;
private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
private final boolean isRemoteDataIntegritySupported;
private final AtomicBoolean closed = new AtomicBoolean();

private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class);

Expand Down Expand Up @@ -160,6 +163,10 @@ private LocalStreamSupplier<InputStreamContainer> getMultipartStreamSupplier(
return () -> {
try {
OffsetRangeInputStream offsetRangeInputStream = offsetRangeInputStreamSupplier.get(size, position);
if (offsetRangeInputStream instanceof RateLimitingOffsetRangeInputStream) {
RateLimitingOffsetRangeInputStream rangeIndexInputStream = (RateLimitingOffsetRangeInputStream) offsetRangeInputStream;
rangeIndexInputStream.setClose(closed);
}
InputStream inputStream = !isRemoteDataIntegrityCheckPossible()
? new ResettableCheckedInputStream(offsetRangeInputStream, fileName)
: offsetRangeInputStream;
Expand Down Expand Up @@ -226,27 +233,7 @@ private long getActualChecksum() {

@Override
public void close() throws IOException {
if (inputStreams.get() == null) {
log.warn("Input streams cannot be closed since they are not yet set for multi stream upload");
return;
}

boolean closeStreamException = false;
for (InputStream is : Objects.requireNonNull(inputStreams.get())) {
try {
if (is != null) {
is.close();
}
} catch (IOException ex) {
closeStreamException = true;
// Attempting to close all streams first before throwing exception.
log.error("Multipart stream failed to close ", ex);
}
}

if (closeStreamException) {
throw new IOException("Closure of some of the multi-part streams failed.");
}
closed.set(true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,29 @@

package org.opensearch.common.blobstore.transfer.stream;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.concurrent.RefCountedReleasable;
import org.opensearch.common.lucene.store.InputStreamIndexInput;
import org.opensearch.common.util.concurrent.RunOnce;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* OffsetRangeIndexInputStream extends InputStream to read from a specified offset using IndexInput
*
* @opensearch.internal
*/
public class OffsetRangeIndexInputStream extends OffsetRangeInputStream {

private static final Logger logger = LogManager.getLogger(OffsetRangeIndexInputStream.class);
private final InputStreamIndexInput inputStreamIndexInput;
private final IndexInput indexInput;
private AtomicBoolean closed;
private final OffsetRangeRefCount offsetRangeRefCount;
private final RunOnce closeOnce;

/**
* Construct a new OffsetRangeIndexInputStream object
Expand All @@ -35,16 +44,68 @@ public OffsetRangeIndexInputStream(IndexInput indexInput, long size, long positi
indexInput.seek(position);
this.indexInput = indexInput;
this.inputStreamIndexInput = new InputStreamIndexInput(indexInput, size);
ClosingStreams closingStreams = new ClosingStreams(inputStreamIndexInput, indexInput);
offsetRangeRefCount = new OffsetRangeRefCount(closingStreams);
closeOnce = new RunOnce(offsetRangeRefCount::decRef);
}

@Override
public void setClose(AtomicBoolean close) {
this.closed = close;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return inputStreamIndexInput.read(b, off, len);
// There are two levels of check to ensure that we don't read an already closed stream and
// to not close the stream if it is already being read.
// 1. First check is a coarse-grained check outside reference check which allows us to fail fast if read
// was invoked after the stream was closed. We need a separate atomic boolean closed because we don't want a
// future read to succeed when #close has been invoked even if there are on-going reads. On-going reads would
// hold reference and since ref count will not be 0 even after close was invoked, future reads will go through
// without a check on closed. Also, we do need to set closed externally. It is shared across all streams of the
// file. Check on closed in this class makes sure that no other stream allows subsequent reads. closed is
// being set to true in RemoteTransferContainer#close which is invoked when we are done processing all
// parts/file. Processing completes when either all parts are completed successfully or if either of the parts
// failed. In successful case, subsequent read will anyway not go through since all streams would have been
// consumed fully but in case of failure, SDK can continue to invoke read and this would be a wasted compute
// and IO.
// 2. In second check, a tryIncRef is invoked which tries to increment reference under lock and fails if ref
// is already closed. If reference is successfully obtained by the stream then stream will not be closed.
// Ref counting ensures that stream isn't closed in between reads.
//
// All these protection mechanisms are required in order to prevent invalid access to streams happening
// from the new S3 async SDK.
ensureOpen();
try (OffsetRangeRefCount ignored = getStreamReference()) {
return inputStreamIndexInput.read(b, off, len);
}
}

private OffsetRangeRefCount getStreamReference() {
boolean successIncrement = offsetRangeRefCount.tryIncRef();
if (successIncrement == false) {
throw alreadyClosed("OffsetRangeIndexInputStream is already unreferenced.");
}
return offsetRangeRefCount;
}

private void ensureOpen() {
if (closed != null && closed.get() == true) {
logger.debug("Read on stream was attempted after during the close of overall file stream!");
throw alreadyClosed("Already closed: ");
}
}

AlreadyClosedException alreadyClosed(String msg) {
return new AlreadyClosedException(msg + this);
}

@Override
public int read() throws IOException {
return inputStreamIndexInput.read();
ensureOpen();
try (OffsetRangeRefCount ignored = getStreamReference()) {
return inputStreamIndexInput.read();
}
}

@Override
Expand All @@ -67,9 +128,42 @@ public long getFilePointer() throws IOException {
return indexInput.getFilePointer();
}

@Override
public String toString() {
return "OffsetRangeIndexInputStream{" + "indexInput=" + indexInput + ", closed=" + closed + '}';
}

private static class ClosingStreams {
private final InputStreamIndexInput inputStreamIndexInput;
private final IndexInput indexInput;

public ClosingStreams(InputStreamIndexInput inputStreamIndexInput, IndexInput indexInput) {
this.inputStreamIndexInput = inputStreamIndexInput;
this.indexInput = indexInput;
}
}

private static class OffsetRangeRefCount extends RefCountedReleasable<ClosingStreams> {
private static final Logger logger = LogManager.getLogger(OffsetRangeRefCount.class);

public OffsetRangeRefCount(ClosingStreams ref) {
super("OffsetRangeRefCount", ref, () -> {
try {
ref.inputStreamIndexInput.close();
} catch (IOException ex) {
logger.error("Failed to close indexStreamIndexInput", ex);
}
try {
ref.indexInput.close();
} catch (IOException ex) {
logger.error("Failed to close indexInput", ex);
}
});
}
}

@Override
public void close() throws IOException {
inputStreamIndexInput.close();
indexInput.close();
closeOnce.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* OffsetRangeInputStream is an abstract class that extends from {@link InputStream}
Expand All @@ -19,4 +20,8 @@
*/
public abstract class OffsetRangeInputStream extends InputStream {
public abstract long getFilePointer() throws IOException;

public void setClose(AtomicBoolean close) {
// Nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.StreamLimiter;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -40,6 +41,10 @@ public RateLimitingOffsetRangeInputStream(
this.delegate = delegate;
}

public void setClose(AtomicBoolean close) {
delegate.setClose(close);
}

@Override
public int read() throws IOException {
int b = delegate.read();
Expand Down
Loading

0 comments on commit c671791

Please sign in to comment.