Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added support to read parquet metadata files from S3 #5777

Merged
merged 6 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,14 @@ public boolean isCompatibleWith(@NotNull final SeekableChannelContext channelCon
return wrappedProvider.isCompatibleWith(channelContext);
}

@Override
public boolean exists(@NotNull final URI uri) {
return wrappedProvider.exists(uri);
}

@Override
public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext channelContext,
@NotNull final URI uri)
throws IOException {
@NotNull final URI uri) throws IOException {
final String uriString = uri.toString();
final KeyedObjectHashMap<String, PerPathPool> channelPool = channelPools.get(ChannelType.Read);
final CachedChannel result = tryGetPooledChannel(uriString, channelPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public boolean isCompatibleWith(@Nullable final SeekableChannelContext channelCo
return true;
}

@Override
public boolean exists(@NotNull final URI uri) {
return Files.exists(Path.of(uri));
}

@Override
public SeekableByteChannel getReadChannel(@Nullable final SeekableChannelContext channelContext,
@NotNull final URI uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ default SeekableChannelContext makeSingleUseContext() {
*/
boolean isCompatibleWith(@NotNull SeekableChannelContext channelContext);

/**
* Returns true if the given URI exists in the underlying storage.
*
* @param uri the URI to check
* @return true if the URI exists
*/
boolean exists(@NotNull URI uri);

default SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull String uriStr)
throws IOException {
return getReadChannel(channelContext, convertToURI(uriStr, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public boolean isCompatibleWith(@NotNull SeekableChannelContext channelContext)
return channelContext == SeekableChannelContext.NULL;
}

@Override
public boolean exists(@NotNull URI uri) {
throw new UnsupportedOperationException("exists");
}

@Override
public SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext,
@NotNull String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3976,7 +3976,7 @@ public void testMultiPartitionSymbolTableBy() throws IOException {
t4.updateView("Date=`2021-07-21`", "Num=400")).moveColumnsUp("Date", "Num");

final Table loaded = ParquetTools.readTable(
new ParquetKeyValuePartitionedLayout(testRootFile.toURI(), 2, ParquetInstructions.EMPTY),
ParquetKeyValuePartitionedLayout.create(testRootFile.toURI(), 2, ParquetInstructions.EMPTY, null),
ParquetInstructions.EMPTY);

// verify the sources are identical
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory;
import io.deephaven.util.channel.SeekableChannelContext.ContextHolder;
import io.deephaven.util.datastructures.SoftCachingFunction;
import org.apache.commons.io.FilenameUtils;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
Expand All @@ -28,14 +27,12 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Function;

import static io.deephaven.base.FileUtils.convertToURI;
import static io.deephaven.parquet.base.ParquetUtils.resolve;
import static io.deephaven.parquet.base.ColumnPageReaderImpl.getDecompressorHolder;
import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;
import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY;
import static org.apache.parquet.format.Encoding.RLE_DICTIONARY;

Expand Down Expand Up @@ -83,12 +80,10 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
this.dictionarySupplier = new SoftCachingFunction<>(this::getDictionary);
this.numRows = numRows;
this.version = version;
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
final String relativePath = FilenameUtils.separatorsToSystem(columnChunk.getFile_path());
this.columnChunkURI = convertToURI(Path.of(rootURI).resolve(relativePath), false);
if (columnChunk.isSetFile_path()) {
columnChunkURI = resolve(rootURI, columnChunk.getFile_path());
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
this.columnChunkURI = rootURI;
columnChunkURI = rootURI;
}
// Construct the reader object but don't read the offset index yet
this.offsetIndexReader = (columnChunk.isSetOffset_index_offset())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,34 @@
//
package io.deephaven.parquet.base;

import io.deephaven.UncheckedDeephavenException;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;

import static io.deephaven.base.FileUtils.URI_SEPARATOR;
import static io.deephaven.base.FileUtils.URI_SEPARATOR_CHAR;

public final class ParquetUtils {

public static final String PARQUET_FILE_EXTENSION = ".parquet";

public static final String METADATA_FILE_NAME = "_metadata";
public static final String COMMON_METADATA_FILE_NAME = "_common_metadata";
public static final String PARQUET_FILE_EXTENSION = ".parquet";
public static final String METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + METADATA_FILE_NAME;
public static final String COMMON_METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + COMMON_METADATA_FILE_NAME;
public static final String METADATA_FILE_SUFFIX = File.separatorChar + METADATA_FILE_NAME;
public static final String COMMON_METADATA_FILE_SUFFIX = File.separatorChar + COMMON_METADATA_FILE_NAME;
private static final String METADATA_FILE_SUFFIX = File.separatorChar + METADATA_FILE_NAME;
private static final String COMMON_METADATA_FILE_SUFFIX = File.separatorChar + COMMON_METADATA_FILE_NAME;

private static final String MAGIC_STR = "PAR1";
public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);

private static final String WINDOWS_FILE_SEPARATOR = "\\";

/**
* The number of bytes to buffer before flushing while writing parquet files and metadata files.
*/
Expand All @@ -42,17 +50,23 @@ public static String getPerFileMetadataKey(final String filePath) {
}

/**
* This method verifies if the source points to a parquet file or a metadata file. Provided source can be a local
* file path or a URI. Also, it can point to a parquet file, metadata file or a directory.
* This method verifies if the source points to a parquet file. Provided source can be a local file path or a URI.
*/
public static boolean isParquetFile(@NotNull final String source) {
boolean ret = source.endsWith(PARQUET_FILE_EXTENSION)
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
|| source.endsWith(METADATA_FILE_URI_SUFFIX)
|| source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX);
return source.endsWith(PARQUET_FILE_EXTENSION);
}

/**
* This method verifies if the source points to a metadata file. Provided source can be a local file path or a URI.
*/
public static boolean isMetadataFile(@NotNull final String source) {
if (source.endsWith(METADATA_FILE_URI_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX)) {
return true;
}
if (File.separatorChar != URI_SEPARATOR_CHAR) {
ret = ret || source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX);
return source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX);
}
return ret;
return false;
}

/**
Expand All @@ -74,4 +88,18 @@ public static boolean isVisibleParquetFile(@NotNull final Path rootDir, @NotNull
}
return true;
}

/**
* Resolve a relative path against a base URI. The path can be from Windows or Unix systems.
*/
public static URI resolve(final URI base, final String relativePath) {
final URI relativeURI;
try {
// Sanitize the relative path before resolving it to avoid issues with separators and special characters
relativeURI = new URI(null, null, relativePath.replace(WINDOWS_FILE_SEPARATOR, URI_SEPARATOR), null);
} catch (final URISyntaxException e) {
throw new UncheckedDeephavenException("Failed to create URI from relative path: " + relativePath, e);
}
return base.resolve(relativeURI);
}
}
Loading
Loading