Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Modified parquet decompression from buffered to streaming operation #5712

Merged
merged 10 commits into from
Jul 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
* This is an InputStream implementation which reads from a java.nio.ByteBuffer. If a read operation crosses the end of
* the buffer, the BufferUnderflowException is converted to an EOFException.
*
* The stream contains no state other than that in in the buffer itself, so the buffer can be exchanged at will with the
* The stream contains no state other than the buffer itself, so the buffer can be exchanged at will with the
* setBuffer() method.
*/
public class ByteBufferInputStream extends java.io.InputStream implements DataInput {

/** the buffer from which we read */
protected ByteBuffer buf;

private char[] utfChars;

/**
* The DataOutput interface always writes bytes in big-endian order, while ByteBuffer allows the order to be big- or
* little-endian. Set this flag true to assume that the buffer is bid-endian, or false to check the buffer's order
Expand All @@ -40,7 +38,6 @@ public class ByteBufferInputStream extends java.io.InputStream implements DataIn
*/
public ByteBufferInputStream(ByteBuffer buf) {
this.buf = buf;
this.utfChars = new char[0];
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,48 @@
import org.jetbrains.annotations.Nullable;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

public class BaseSeekableChannelContext implements SeekableChannelContext {

/**
* An opaque resource object hosted by this context.
* A sentinel value to indicate that a resource is {@code null}.
*/
private SafeCloseable resource;
private static final SafeCloseable NULL_SENTINEL = () -> {
};

/**
* An empty cache of resources.
*/
private static final Map<String, SafeCloseable> EMPTY_CACHE = Map.of();

/**
* A cache of opaque resource objects hosted by this context.
*/
private Map<String, SafeCloseable> resourceCache = EMPTY_CACHE;

@Override
@Nullable
public final SafeCloseable apply(@NotNull final Supplier<SafeCloseable> resourceFactory) {
public final SafeCloseable apply(final String key, @NotNull final Supplier<SafeCloseable> resourceFactory) {
final Map<String, SafeCloseable> localResourceCache = resourceCache == EMPTY_CACHE
? resourceCache = new HashMap<>(1)
: resourceCache;
SafeCloseable resource = localResourceCache.get(key);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
if (resource == NULL_SENTINEL) {
return null;
}
if (resource == null) {
resource = resourceFactory.get();
resourceCache.put(key, (resource = resourceFactory.get()) == null ? NULL_SENTINEL : resource);
}
return resource;
}

@Override
@OverridingMethodsMustInvokeSuper
public void close() {
if (resource != null) {
resource.close();
resource = null;
}
SafeCloseable.closeAll(resourceCache.values().iterator());
resourceCache = EMPTY_CACHE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//
package io.deephaven.util.channel;

import java.util.function.Function;
import java.util.function.BiFunction;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.Nullable;

Expand All @@ -12,10 +12,12 @@
/**
* Context object for reading and writing to channels created by {@link SeekableChannelsProvider}.
* <p>
* The context object can hold a {@link SafeCloseable} resource, which can be plugged into the context by calling the
* {@link #apply(Supplier)} method. The resource will be closed when the context is closed.
* The context object can hold {@link SafeCloseable} resources corresponding to {@link String} keys, which can be
* plugged into the context by calling the {@link #apply(String, Supplier)} method. These resources will be closed when
* the context is closed.
*/
public interface SeekableChannelContext extends Function<Supplier<SafeCloseable>, SafeCloseable>, SafeCloseable {
public interface SeekableChannelContext
extends BiFunction<String, Supplier<SafeCloseable>, SafeCloseable>, SafeCloseable {

SeekableChannelContext NULL = SeekableChannelContextNull.NULL_CONTEXT_INSTANCE;

Expand All @@ -39,11 +41,12 @@ static ContextHolder ensureContext(SeekableChannelsProvider provider, SeekableCh
}

/**
* If this instance holds a resource, return it. Otherwise, use the resource factory to create a new resource, store
* it, and return it. This method can return a {@code null} if the factory returns a {@code null}.
* If this instance holds a resource corresponding to the given key, return it. Otherwise, use the resource factory
* to create a new resource, store it, and return it. This method can return a {@code null} if the factory returns a
* {@code null}.
*/
@Nullable
SafeCloseable apply(Supplier<SafeCloseable> resourceFactory);
SafeCloseable apply(String key, Supplier<SafeCloseable> resourceFactory);

/**
* Release any resources associated with this context. The context should not be used afterward.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ enum SeekableChannelContextNull implements SeekableChannelContext {

@Override
@Nullable
public SafeCloseable apply(final Supplier<SafeCloseable> resourceFactory) {
public SafeCloseable apply(final String key, final Supplier<SafeCloseable> resourceFactory) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private class TestChannelProvider implements SeekableChannelsProvider {

private final class TestChannelContext implements SeekableChannelContext {
@Override
public @Nullable SafeCloseable apply(Supplier<SafeCloseable> resourceFactory) {
public @Nullable SafeCloseable apply(String key, Supplier<SafeCloseable> resourceFactory) {
throw new UnsupportedOperationException("apply");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,10 @@ private Dictionary readDictionary(long dictionaryPageOffset, SeekableChannelCont
// Sometimes the size is explicitly empty, just use an empty payload
payload = BytesInput.empty();
} else {
payload = decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(),
holder.get());
payload = BytesInput.from(
decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(),
holder.get()),
pageHeader.getUncompressed_page_size());
}
final Encoding encoding = Encoding.valueOf(dictHeader.getEncoding().name());
final DictionaryPage dictionaryPage = new DictionaryPage(payload, dictHeader.getNum_values(), encoding);
Expand Down
Loading
Loading