Skip to content

Commit

Permalink
Fixes to prevent segment fault errors arising due to unexpected SDK b…
Browse files Browse the repository at this point in the history
…ehaviour
  • Loading branch information
vikasvb90 committed Nov 26, 2023
1 parent 5bb6cae commit 56f801e
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.common.blobstore.transfer;

import com.jcraft.jzlib.JZlib;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
Expand All @@ -19,6 +20,7 @@
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
import org.opensearch.common.blobstore.transfer.stream.RateLimitingOffsetRangeInputStream;
import org.opensearch.common.blobstore.transfer.stream.ResettableCheckedInputStream;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.util.ByteUtils;
Expand All @@ -27,10 +29,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.CRC32;

import com.jcraft.jzlib.JZlib;

/**
* RemoteTransferContainer is an encapsulation for managing file transfers.
*
Expand All @@ -51,6 +52,7 @@ public class RemoteTransferContainer implements Closeable {
private final long expectedChecksum;
private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
private final boolean isRemoteDataIntegritySupported;
private final AtomicBoolean closed = new AtomicBoolean();

private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class);

Expand Down Expand Up @@ -160,6 +162,10 @@ private LocalStreamSupplier<InputStreamContainer> getMultipartStreamSupplier(
return () -> {
try {
OffsetRangeInputStream offsetRangeInputStream = offsetRangeInputStreamSupplier.get(size, position);
if (offsetRangeInputStream instanceof RateLimitingOffsetRangeInputStream) {
RateLimitingOffsetRangeInputStream rangeIndexInputStream = (RateLimitingOffsetRangeInputStream) offsetRangeInputStream;
rangeIndexInputStream.setClose(closed);
}
InputStream inputStream = !isRemoteDataIntegrityCheckPossible()
? new ResettableCheckedInputStream(offsetRangeInputStream, fileName)
: offsetRangeInputStream;
Expand Down Expand Up @@ -226,27 +232,7 @@ private long getActualChecksum() {

@Override
public void close() throws IOException {
if (inputStreams.get() == null) {
log.warn("Input streams cannot be closed since they are not yet set for multi stream upload");
return;
}

boolean closeStreamException = false;
for (InputStream is : Objects.requireNonNull(inputStreams.get())) {
try {
if (is != null) {
is.close();
}
} catch (IOException ex) {
closeStreamException = true;
// Attempting to close all streams first before throwing exception.
log.error("Multipart stream failed to close ", ex);
}
}

if (closeStreamException) {
throw new IOException("Closure of some of the multi-part streams failed.");
}
closed.set(true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,31 @@

package org.opensearch.common.blobstore.transfer.stream;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.concurrent.RefCountedReleasable;
import org.opensearch.common.lucene.store.InputStreamIndexInput;
import org.opensearch.common.util.concurrent.RunOnce;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* OffsetRangeIndexInputStream extends InputStream to read from a specified offset using IndexInput
*
* @opensearch.internal
*/
public class OffsetRangeIndexInputStream extends OffsetRangeInputStream {

private static final Logger logger = LogManager.getLogger(OffsetRangeIndexInputStream.class);
private final InputStreamIndexInput inputStreamIndexInput;
private final IndexInput indexInput;
private AtomicBoolean closed;

private final OffsetRangeRefCount offsetRangeRefCount;

private final RunOnce closeOnce;

/**
* Construct a new OffsetRangeIndexInputStream object
Expand All @@ -35,16 +46,50 @@ public OffsetRangeIndexInputStream(IndexInput indexInput, long size, long positi
indexInput.seek(position);
this.indexInput = indexInput;
this.inputStreamIndexInput = new InputStreamIndexInput(indexInput, size);
ClosingStreams closingStreams = new ClosingStreams();
closingStreams.indexInput = indexInput;
closingStreams.inputStreamIndexInput = inputStreamIndexInput;
offsetRangeRefCount = new OffsetRangeRefCount(closingStreams);
closeOnce = new RunOnce(offsetRangeRefCount::decRef);
}

public void setClose(AtomicBoolean close) {
this.closed = close;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return inputStreamIndexInput.read(b, off, len);
ensureOpen();
try (OffsetRangeRefCount ignored = getStreamReference()) {
return inputStreamIndexInput.read(b, off, len);
}
}

private OffsetRangeRefCount getStreamReference() {
boolean successIncrement = offsetRangeRefCount.tryIncRef();
if (successIncrement == false) {
throw alreadyClosed("OffsetRangeIndexInputStream is already unreferenced.");
}
return offsetRangeRefCount;
}

private void ensureOpen() {
if (closed != null && closed.get() == true) {
logger.debug("Read on stream was attempted after during the close of overall file stream!");
throw alreadyClosed("Already closed: ");
}
}

AlreadyClosedException alreadyClosed(String msg) {
return new AlreadyClosedException(msg + this);
}

@Override
public int read() throws IOException {
return inputStreamIndexInput.read();
ensureOpen();
try (OffsetRangeRefCount ignored = getStreamReference()) {
return inputStreamIndexInput.read();
}
}

@Override
Expand All @@ -67,9 +112,39 @@ public long getFilePointer() throws IOException {
return indexInput.getFilePointer();
}

@Override
public String toString() {
return "OffsetRangeIndexInputStream{" +
"indexInput=" + indexInput +
", closed=" + closed +
'}';
}

private static class ClosingStreams {
private InputStreamIndexInput inputStreamIndexInput;
private IndexInput indexInput;
}

private static class OffsetRangeRefCount extends RefCountedReleasable<ClosingStreams> {
private static final Logger logger = LogManager.getLogger(OffsetRangeRefCount.class);
public OffsetRangeRefCount(ClosingStreams ref) {
super("OffsetRangeRefCount", ref, () -> {
try {
ref.inputStreamIndexInput.close();
} catch (IOException ex) {
logger.error("Failed to close indexStreamIndexInput", ex);
}
try {
ref.indexInput.close();
} catch (IOException ex) {
logger.error("Failed to close indexInput", ex);
}
});
}
}

@Override
public void close() throws IOException {
inputStreamIndexInput.close();
indexInput.close();
closeOnce.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* OffsetRangeInputStream is an abstract class that extends from {@link InputStream}
Expand All @@ -19,4 +20,8 @@
*/
public abstract class OffsetRangeInputStream extends InputStream {
public abstract long getFilePointer() throws IOException;

public void setClose(AtomicBoolean close) {
// Nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.StreamLimiter;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -40,6 +41,10 @@ public RateLimitingOffsetRangeInputStream(
this.delegate = delegate;
}

public void setClose(AtomicBoolean close) {
delegate.setClose(close);
}

@Override
public int read() throws IOException {
int b = delegate.read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import org.junit.Before;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.UUID;

public class RemoteTransferContainerTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -92,25 +94,37 @@ private void testSupplyStreamContext(
int partCount = streamContext.getNumberOfParts();
assertEquals(expectedPartCount, partCount);
Thread[] threads = new Thread[partCount];
InputStream[] streams = new InputStream[partCount];
long totalContentLength = remoteTransferContainer.getContentLength();
assert partSize * (partCount - 1) + lastPartSize == totalContentLength
: "part sizes and last part size don't add up to total content length";
logger.info("partSize: {}, lastPartSize: {}, partCount: {}", partSize, lastPartSize, streamContext.getNumberOfParts());
for (int partIdx = 0; partIdx < partCount; partIdx++) {
int finalPartIdx = partIdx;
long expectedPartSize = (partIdx == partCount - 1) ? lastPartSize : partSize;
threads[partIdx] = new Thread(() -> {
try {
for (int partIdx = 0; partIdx < partCount; partIdx++) {
int finalPartIdx = partIdx;
long expectedPartSize = (partIdx == partCount - 1) ? lastPartSize : partSize;
threads[partIdx] = new Thread(() -> {
try {
InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx);
streams[finalPartIdx] = inputStreamContainer.getInputStream();
assertEquals(expectedPartSize, inputStreamContainer.getContentLength());
} catch (IOException e) {
fail("IOException during stream creation");
}
});
threads[partIdx].start();
}
for (int i = 0; i < partCount; i++) {
threads[i].join();
}
} finally {
Arrays.stream(streams).forEach(stream -> {
try {
InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx);
assertEquals(expectedPartSize, inputStreamContainer.getContentLength());
stream.close();
} catch (IOException e) {
fail("IOException during stream creation");
throw new RuntimeException(e);
}
});
threads[partIdx].start();
}
for (int i = 0; i < partCount; i++) {
threads[i].join();
}
}

Expand Down Expand Up @@ -182,6 +196,7 @@ public OffsetRangeInputStream get(long size, long position) throws IOException {
}

private void testTypeOfProvidedStreams(boolean isRemoteDataIntegritySupported) throws IOException {
InputStream inputStream = null;
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
testFile.getFileName().toString(),
Expand All @@ -201,12 +216,17 @@ public OffsetRangeInputStream get(long size, long position) throws IOException {
) {
StreamContext streamContext = remoteTransferContainer.supplyStreamContext(16);
InputStreamContainer inputStreamContainer = streamContext.provideStream(0);
inputStream = inputStreamContainer.getInputStream();
if (shouldOffsetInputStreamsBeChecked(isRemoteDataIntegritySupported)) {
assertTrue(inputStreamContainer.getInputStream() instanceof ResettableCheckedInputStream);
} else {
assertTrue(inputStreamContainer.getInputStream() instanceof OffsetRangeInputStream);
}
assertThrows(RuntimeException.class, () -> remoteTransferContainer.supplyStreamContext(16));
} finally {
if (inputStream != null) {
inputStream.close();
}
}
}

Expand Down

0 comments on commit 56f801e

Please sign in to comment.