Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Modified parquet decompression from buffered to streaming operation #5712

Merged
merged 10 commits into from
Jul 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,36 @@
import org.jetbrains.annotations.Nullable;

import javax.annotation.OverridingMethodsMustInvokeSuper;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

public class BaseSeekableChannelContext implements SeekableChannelContext {
/**
* An opaque resource object hosted by this context.
* A cache of opaque resource objects hosted by this context.
*/
private SafeCloseable resource;
private final Map<String, SafeCloseable> resourceCache = new HashMap<>();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

@Override
@Nullable
public final SafeCloseable apply(@NotNull final Supplier<SafeCloseable> resourceFactory) {
public final SafeCloseable apply(final String key, @NotNull final Supplier<SafeCloseable> resourceFactory) {
SafeCloseable resource = resourceCache.get(key);
if (resource == null) {
resource = resourceFactory.get();
if (resource != null) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
resourceCache.put(key, resource);
}
}
return resource;
}

@Override
@OverridingMethodsMustInvokeSuper
public void close() {
if (resource != null) {
resource.close();
resource = null;
for (final SafeCloseable resource : resourceCache.values()) {
if (resource != null) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
resource.close();
}
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//
package io.deephaven.util.channel;

import java.util.function.Function;
import java.util.function.BiFunction;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.Nullable;

Expand All @@ -12,10 +12,12 @@
/**
* Context object for reading and writing to channels created by {@link SeekableChannelsProvider}.
* <p>
* The context object can hold a {@link SafeCloseable} resource, which can be plugged into the context by calling the
* {@link #apply(Supplier)} method. The resource will be closed when the context is closed.
* The context object can hold {@link SafeCloseable} resources corresponding to {@link String} keys, which can be
* plugged into the context by calling the {@link #apply(String, Supplier)} method. These resources will be closed when
* the context is closed.
*/
public interface SeekableChannelContext extends Function<Supplier<SafeCloseable>, SafeCloseable>, SafeCloseable {
public interface SeekableChannelContext
extends BiFunction<String, Supplier<SafeCloseable>, SafeCloseable>, SafeCloseable {

SeekableChannelContext NULL = SeekableChannelContextNull.NULL_CONTEXT_INSTANCE;

Expand All @@ -39,11 +41,14 @@ static ContextHolder ensureContext(SeekableChannelsProvider provider, SeekableCh
}

/**
* If this instance holds a resource, return it. Otherwise, use the resource factory to create a new resource, store
* it, and return it. This method can return a {@code null} if the factory returns a {@code null}.
* If this instance holds a resource corresponding to the given key, return it. Otherwise, use the resource factory
* to create a new resource, store it, and return it. This method can return a {@code null} if the factory returns a
* {@code null}.
*/
// TODO Does this model look okay to you where we keep a string key, and strings are controlled by the classes
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
// instead of a central place?
@Nullable
SafeCloseable apply(Supplier<SafeCloseable> resourceFactory);
SafeCloseable apply(final String key, Supplier<SafeCloseable> resourceFactory);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

/**
* Release any resources associated with this context. The context should not be used afterward.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ enum SeekableChannelContextNull implements SeekableChannelContext {

@Override
@Nullable
public SafeCloseable apply(final Supplier<SafeCloseable> resourceFactory) {
public SafeCloseable apply(final String key, final Supplier<SafeCloseable> resourceFactory) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private class TestChannelProvider implements SeekableChannelsProvider {

private final class TestChannelContext implements SeekableChannelContext {
@Override
public @Nullable SafeCloseable apply(Supplier<SafeCloseable> resourceFactory) {
public @Nullable SafeCloseable apply(String key, Supplier<SafeCloseable> resourceFactory) {
throw new UnsupportedOperationException("apply");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,10 @@ private Dictionary readDictionary(long dictionaryPageOffset, SeekableChannelCont
// Sometimes the size is explicitly empty, just use an empty payload
payload = BytesInput.empty();
} else {
payload = decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(),
holder.get());
payload = BytesInput.from(
decompressor.decompress(in, compressedPageSize, pageHeader.getUncompressed_page_size(),
holder.get()),
pageHeader.getUncompressed_page_size());
}
final Encoding encoding = Encoding.valueOf(dictHeader.getEncoding().name());
final DictionaryPage dictionaryPage = new DictionaryPage(payload, dictHeader.getNum_values(), encoding);
Expand Down
Loading
Loading