Skip to content

Commit

Permalink
feat: Modified parquet decompression from buffered to streaming opera…
Browse files Browse the repository at this point in the history
…tion (#5712)

This helps reduce memory consumption when reading parquet files by
almost 30%.
  • Loading branch information
malhotrashivam authored Jul 12, 2024
1 parent ae966a8 commit f8b5e19
Show file tree
Hide file tree
Showing 23 changed files with 356 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* This is an InputStream implementation which reads from a java.nio.ByteBuffer. If a read operation crosses the end of
* the buffer, the BufferUnderflowException is converted to an EOFException.
* <p>
* The stream contains no state other than that in the buffer itself, so the buffer can be exchanged at will with the
* The stream contains no state other than the buffer itself, so the buffer can be exchanged at will with the
* setBuffer() method.
* <p>
* Endianness is determined by the provided buffer itself.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,54 @@
import org.jetbrains.annotations.Nullable;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

public class BaseSeekableChannelContext implements SeekableChannelContext {

/**
* A sentinel value to indicate that a resource is {@code null}.
*/
private static final SafeCloseable NULL_SENTINEL = () -> {
};

/**
* An opaque resource object hosted by this context.
* An empty cache of resources.
*/
private SafeCloseable resource;
private static final Map<String, SafeCloseable> EMPTY_CACHE = Map.of();

/**
* A cache of opaque resource objects hosted by this context.
*/
private Map<String, SafeCloseable> resourceCache = EMPTY_CACHE;

@Override
@Nullable
public final SafeCloseable apply(@NotNull final Supplier<SafeCloseable> resourceFactory) {
public final <T extends SafeCloseable> T getCachedResource(
final String key,
@NotNull final Supplier<T> resourceFactory) {
SafeCloseable resource;
if (resourceCache == EMPTY_CACHE) {
resourceCache = new HashMap<>(1);
resource = null;
} else {
resource = resourceCache.get(key);
if (resource == NULL_SENTINEL) {
return null;
}
}
if (resource == null) {
resource = resourceFactory.get();
resourceCache.put(key, (resource = resourceFactory.get()) == null ? NULL_SENTINEL : resource);
}
return resource;
// noinspection unchecked
return (T) resource;
}

@Override
@OverridingMethodsMustInvokeSuper
public void close() {
if (resource != null) {
resource.close();
resource = null;
}
SafeCloseable.closeAll(resourceCache.values().iterator());
resourceCache = EMPTY_CACHE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//
package io.deephaven.util.channel;

import java.util.function.Function;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.Nullable;

Expand All @@ -12,10 +11,11 @@
/**
* Context object for reading and writing to channels created by {@link SeekableChannelsProvider}.
* <p>
* The context object can hold a {@link SafeCloseable} resource, which can be plugged into the context by calling the
* {@link #apply(Supplier)} method. The resource will be closed when the context is closed.
* The context object can hold {@link SafeCloseable} resources corresponding to {@link String} keys, which can be
* plugged into the context by calling the {@link #getCachedResource(String, Supplier)} method. These resources will be
* closed when the context is closed.
*/
public interface SeekableChannelContext extends Function<Supplier<SafeCloseable>, SafeCloseable>, SafeCloseable {
public interface SeekableChannelContext extends SafeCloseable {

SeekableChannelContext NULL = SeekableChannelContextNull.NULL_CONTEXT_INSTANCE;

Expand All @@ -39,11 +39,12 @@ static ContextHolder ensureContext(SeekableChannelsProvider provider, SeekableCh
}

/**
* If this instance holds a resource, return it. Otherwise, use the resource factory to create a new resource, store
* it, and return it. This method can return a {@code null} if the factory returns a {@code null}.
* If this instance holds a resource corresponding to the given key, return it. Otherwise, use the resource factory
* to create a new resource, store it, and return it. This method can return a {@code null} if the factory returns a
* {@code null}.
*/
@Nullable
SafeCloseable apply(Supplier<SafeCloseable> resourceFactory);
<T extends SafeCloseable> T getCachedResource(String key, Supplier<T> resourceFactory);

/**
* Release any resources associated with this context. The context should not be used afterward.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ enum SeekableChannelContextNull implements SeekableChannelContext {

@Override
@Nullable
public SafeCloseable apply(final Supplier<SafeCloseable> resourceFactory) {
public <T extends SafeCloseable> T getCachedResource(final String key, final Supplier<T> resourceFactory) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@ private class TestChannelProvider implements SeekableChannelsProvider {

private final class TestChannelContext implements SeekableChannelContext {
@Override
public @Nullable SafeCloseable apply(Supplier<SafeCloseable> resourceFactory) {
throw new UnsupportedOperationException("apply");
@Nullable
public <T extends SafeCloseable> T getCachedResource(final String key, final Supplier<T> resourceFactory) {
throw new UnsupportedOperationException("getCachedResource");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.chunk.WritableBooleanChunk;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.Nullable;

/**
* A wrapper for a boolean chunk that allows you to resize the chunk to a capacity.
Expand All @@ -28,8 +29,9 @@ public SizedBooleanChunk(final int initialSize) {
/**
* Get the underlying chunk.
*
* @return the underlying chunk.
* @return the underlying chunk. May be {@code null} if the chunk has not been initialized.
*/
@Nullable
public WritableBooleanChunk<T> get() {
return chunk;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.Nullable;

/**
* A wrapper for a byte chunk that allows you to resize the chunk to a capacity.
Expand All @@ -28,8 +29,9 @@ public SizedByteChunk(final int initialSize) {
/**
* Get the underlying chunk.
*
* @return the underlying chunk.
* @return the underlying chunk. May be {@code null} if the chunk has not been initialized.
*/
@Nullable
public WritableByteChunk<T> get() {
return chunk;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.chunk.WritableCharChunk;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.Nullable;

/**
* A wrapper for a char chunk that allows you to resize the chunk to a capacity.
Expand All @@ -24,8 +25,9 @@ public SizedCharChunk(final int initialSize) {
/**
* Get the underlying chunk.
*
* @return the underlying chunk.
* @return the underlying chunk. May be {@code null} if the chunk has not been initialized.
*/
@Nullable
public WritableCharChunk<T> get() {
return chunk;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.chunk.WritableDoubleChunk;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.Nullable;

/**
* A wrapper for a double chunk that allows you to resize the chunk to a capacity.
Expand All @@ -28,8 +29,9 @@ public SizedDoubleChunk(final int initialSize) {
/**
* Get the underlying chunk.
*
* @return the underlying chunk.
* @return the underlying chunk. May be {@code null} if the chunk has not been initialized.
*/
@Nullable
public WritableDoubleChunk<T> get() {
return chunk;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.chunk.WritableFloatChunk;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.Nullable;

/**
* A wrapper for a float chunk that allows you to resize the chunk to a capacity.
Expand All @@ -28,8 +29,9 @@ public SizedFloatChunk(final int initialSize) {
/**
* Get the underlying chunk.
*
* @return the underlying chunk.
* @return the underlying chunk. May be {@code null} if the chunk has not been initialized.
*/
@Nullable
public WritableFloatChunk<T> get() {
return chunk;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.chunk.WritableIntChunk;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.Nullable;

/**
* A wrapper for a int chunk that allows you to resize the chunk to a capacity.
Expand All @@ -28,8 +29,9 @@ public SizedIntChunk(final int initialSize) {
/**
* Get the underlying chunk.
*
* @return the underlying chunk.
* @return the underlying chunk. May be {@code null} if the chunk has not been initialized.
*/
@Nullable
public WritableIntChunk<T> get() {
return chunk;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.Nullable;

/**
* A wrapper for a long chunk that allows you to resize the chunk to a capacity.
Expand All @@ -28,8 +29,9 @@ public SizedLongChunk(final int initialSize) {
/**
* Get the underlying chunk.
*
* @return the underlying chunk.
* @return the underlying chunk. May be {@code null} if the chunk has not been initialized.
*/
@Nullable
public WritableLongChunk<T> get() {
return chunk;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.Nullable;

/**
* A wrapper for a Object chunk that allows you to resize the chunk to a capacity.
Expand All @@ -28,8 +29,9 @@ public SizedObjectChunk(final int initialSize) {
/**
* Get the underlying chunk.
*
* @return the underlying chunk.
* @return the underlying chunk. May be {@code null} if the chunk has not been initialized.
*/
@Nullable
public WritableObjectChunk<T, ATTR> get() {
return chunk;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.chunk.WritableShortChunk;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.Nullable;

/**
* A wrapper for a short chunk that allows you to resize the chunk to a capacity.
Expand All @@ -28,8 +29,9 @@ public SizedShortChunk(final int initialSize) {
/**
* Get the underlying chunk.
*
* @return the underlying chunk.
* @return the underlying chunk. May be {@code null} if the chunk has not been initialized.
*/
@Nullable
public WritableShortChunk<T> get() {
return chunk;
}
Expand Down
1 change: 1 addition & 0 deletions extensions/parquet/base/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
implementation project(':Base')
implementation project(':Util')
implementation project(':engine-time')
implementation project(':engine-chunk')
implementation project(':Configuration')
implementation project(':DataStructures')
implementation libs.commons.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.function.Function;

import static io.deephaven.base.FileUtils.convertToURI;
import static io.deephaven.parquet.base.ColumnPageReaderImpl.getDecompressorHolder;
import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;
import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY;
import static org.apache.parquet.format.Encoding.RLE_DICTIONARY;
Expand Down Expand Up @@ -223,8 +224,10 @@ private Dictionary readDictionary(long dictionaryPageOffset, SeekableChannelCont
// Sometimes the size is explicitly empty, just use an empty payload
payload = BytesInput.empty();
} else {
payload = decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(),
holder.get());
payload = BytesInput.from(
decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(),
getDecompressorHolder(holder.get())),
pageHeader.getUncompressed_page_size());
}
final Encoding encoding = Encoding.valueOf(dictHeader.getEncoding().name());
final DictionaryPage dictionaryPage = new DictionaryPage(payload, dictHeader.getNum_values(), encoding);
Expand Down
Loading

0 comments on commit f8b5e19

Please sign in to comment.