Skip to content

Commit

Permalink
Reapplying changes for aloneguid#252 after rebasing with current
Browse files Browse the repository at this point in the history
  • Loading branch information
mirosuav committed Jul 10, 2023
1 parent 5ea5ba5 commit 7eb67a7
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 30 deletions.
53 changes: 53 additions & 0 deletions src/Parquet.Test/ParquetRowGroupReaderTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Parquet.Data;
using Parquet.File;
using Parquet.Schema;
using Xunit;

namespace Parquet.Test {
public class ParquetRowGroupReaderTest : TestBase {

[Theory]
[InlineData("multi.page.parquet")]
[InlineData("multi.page.v2.parquet")]
public async Task GetColumnStatistics_ShouldNotBeEmpty(string parquetFile) {
using(ParquetReader reader = await ParquetReader.CreateAsync(OpenTestFile(parquetFile), leaveStreamOpen: false)) {
for(int gidx = 0; gidx < reader.RowGroupCount; gidx++) {
using(ParquetRowGroupReader rowGroupReader = reader.OpenRowGroupReader(0)) {

foreach(DataField df in reader.Schema.DataFields) {
DataColumnReader columnReader = rowGroupReader.GetColumnReader(df);
DataColumnStatistics? stats = columnReader.GetColumnStatistics();

Assert.NotNull(stats);
}
}
}
}

}

[Theory]
[InlineData("multi.page.parquet")]
[InlineData("multi.page.v2.parquet")]
public async Task GetColumnReader_MustFailOnInvalidField(string parquetFile) {
using(ParquetReader reader = await ParquetReader.CreateAsync(OpenTestFile(parquetFile), leaveStreamOpen: false)) {
using(ParquetRowGroupReader rowGroupReader = reader.OpenRowGroupReader(0)) {

Assert.Throws<ArgumentNullException>(() => rowGroupReader.GetColumnReader(null!));
DataField nonExistingField = new DataField("non_existing_field7862425", typeof(int));
Assert.Throws<ParquetException>(() => rowGroupReader.GetColumnReader(nonExistingField));
}
}
}

}

}




66 changes: 43 additions & 23 deletions src/Parquet/File/DataColumnReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@
using Parquet.Meta.Proto;

namespace Parquet.File {
class DataColumnReader {

/// <summary>
/// Reader for Parquet data column
/// </summary>
public class DataColumnReader {
private readonly DataField _dataField;
private readonly Stream _inputStream;
private readonly ColumnChunk _thriftColumnChunk;
private readonly SchemaElement? _chemaElement;
private readonly SchemaElement? _schemaElement;
private readonly ThriftFooter _footer;
private readonly ParquetOptions _options;
private readonly DataColumnStatistics? _stats;

public DataColumnReader(
internal DataColumnReader(
DataField dataField,
Stream inputStream,
ColumnChunk thriftColumnChunk,
Expand All @@ -35,22 +39,37 @@ public DataColumnReader(

dataField.EnsureAttachedToSchema(nameof(dataField));

_chemaElement = _footer.GetSchemaElement(_thriftColumnChunk);
_schemaElement = _footer.GetSchemaElement(_thriftColumnChunk);

// read stats as soon as possible
_stats = ReadColumnStatistics();
}

private DataColumnStatistics? ReadColumnStatistics() {

Statistics? st = _thriftColumnChunk.MetaData!.Statistics;
if(st != null) {

ParquetPlainEncoder.TryDecode(st.MinValue, _chemaElement!, _options, out object? min);
ParquetPlainEncoder.TryDecode(st.MaxValue, _chemaElement!, _options, out object? max);
ParquetPlainEncoder.TryDecode(st.MinValue, _schemaElement!, _options, out object? min);
ParquetPlainEncoder.TryDecode(st.MaxValue, _schemaElement!, _options, out object? max);

_stats = new DataColumnStatistics(
st.NullCount,
st.DistinctCount,
min, max);
return new DataColumnStatistics(st.NullCount, st.DistinctCount, min, max);
}
return null;
}

/// <summary>
/// Return data column statistics
/// </summary>
/// <returns>Data column statistics or null</returns>
public DataColumnStatistics? GetColumnStatistics() => _stats;

/// <summary>
/// Read entire column data
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>DataColumn object filled in with data</returns>
/// <exception cref="NotSupportedException">Unsupported page type</exception>
public async Task<DataColumn> ReadAsync(CancellationToken cancellationToken = default) {

// how many values are in column chunk, as there may be multiple data pages
Expand All @@ -76,13 +95,15 @@ public async Task<DataColumn> ReadAsync(CancellationToken cancellationToken = de
await ReadDataPageV2Async(ph, pc, totalValuesInChunk);
break;
default:
throw new NotSupportedException($"can't read page type {ph.Type}"); ;
throw new NotSupportedException($"can't read page type {ph.Type}");
;
}
}

// all the data is available here!
DataColumn column = pc.Unpack();
if(_stats != null) column.Statistics = _stats;
if(_stats != null)
column.Statistics = _stats;
return column;
}

Expand All @@ -106,11 +127,11 @@ public async Task<DataColumn> ReadAsync(CancellationToken cancellationToken = de
data.AsSpan(0, ph.CompressedPageSize),
ph.UncompressedPageSize);
}

private async Task<IronCompress.IronCompressResult> ReadPageDataV2Async(PageHeader ph) {

int pageSize = ph.CompressedPageSize;

byte[] data = ArrayPool<byte>.Shared.Rent(pageSize);

int totalBytesRead = 0, remainingBytes = pageSize;
Expand All @@ -136,12 +157,12 @@ private async ValueTask ReadDictionaryPage(PageHeader ph, PackedColumn pc) {
Array dictionary = _dataField.CreateArray(ph.DictionaryPageHeader!.NumValues);

ParquetPlainEncoder.Decode(dictionary, 0, ph.DictionaryPageHeader.NumValues,
_chemaElement!, bytes.AsSpan(), out int dictionaryOffset);
_schemaElement!, bytes.AsSpan(), out int dictionaryOffset);

pc.AssignDictionary(dictionary);
}

private long GetFileOffset() =>
private long GetFileOffset() =>
// get the minimum offset, we'll just read pages in sequence as DictionaryPageOffset/Data_page_offset are not reliable
new[]
{
Expand Down Expand Up @@ -196,8 +217,8 @@ private async Task ReadDataPageV1Async(PageHeader ph, PackedColumn pc) {
private async Task ReadDataPageV2Async(PageHeader ph, PackedColumn pc, long maxValues) {
if(ph.DataPageHeaderV2 == null) {
throw new ParquetException($"column '{_dataField.Path}' is missing data page header, file is corrupt");
}
}

using IronCompress.IronCompressResult bytes = await ReadPageDataV2Async(ph);
int dataUsed = 0;

Expand Down Expand Up @@ -230,7 +251,7 @@ private async Task ReadDataPageV2Async(PageHeader ph, PackedColumn pc, long maxV

int decompressedSize = ph.UncompressedPageSize - ph.DataPageHeaderV2.RepetitionLevelsByteLength -
ph.DataPageHeaderV2.DefinitionLevelsByteLength;

IronCompress.IronCompressResult decompressedDataByes = Compressor.Decompress(
(CompressionMethod)(int)_thriftColumnChunk.MetaData.Codec,
bytes.AsSpan().Slice(dataUsed),
Expand Down Expand Up @@ -264,7 +285,7 @@ private void ReadColumn(Span<byte> src,
ParquetPlainEncoder.Decode(plainData,
offset,
totalValuesInPage,
_chemaElement!, src, out int read);
_schemaElement!, src, out int read);
pc.MarkUsefulPlainData(read);
}
break;
Expand All @@ -281,7 +302,7 @@ private void ReadColumn(Span<byte> src,
case Encoding.RLE: { // 3
Span<int> span = pc.AllocateOrGetDictionaryIndexes(totalValuesInPage);
int indexCount = RleBitpackedHybridEncoder.Decode(src,
_chemaElement!.TypeLength ?? 0,
_schemaElement!.TypeLength ?? 0,
src.Length, out int usedLength, span, totalValuesInPage);
pc.MarkUsefulDictionaryIndexes(indexCount);
pc.Checkpoint();
Expand Down Expand Up @@ -329,8 +350,7 @@ private static int ReadRleDictionary(Span<byte> s, int maxReadCount, Span<int> d
for(int i = 0; i < maxReadCount; i++) {
dest[destOffset++] = 0;
}
}
else {
} else {
if(length != 0) {
destOffset += RleBitpackedHybridEncoder.Decode(s.Slice(1), bitWidth, length, out int usedLength, dest, maxReadCount);
}
Expand Down
26 changes: 19 additions & 7 deletions src/Parquet/ParquetRowGroupReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,7 @@ internal ParquetRowGroupReader(
/// Reads a column from this row group. Unlike writing, columns can be read in any order.
/// </summary>
public Task<DataColumn> ReadColumnAsync(DataField field, CancellationToken cancellationToken = default) {

ColumnChunk? columnChunk = GetMetadata(field);
if(columnChunk == null) {
throw new ParquetException($"'{field.Path}' does not exist in this file");
}
var columnReader = new DataColumnReader(field, _stream, columnChunk, _footer, _parquetOptions);

DataColumnReader columnReader = GetColumnReader(field);
return columnReader.ReadAsync(cancellationToken);
}

Expand Down Expand Up @@ -86,6 +80,24 @@ public Dictionary<string, string> GetCustomMetadata(DataField field) {
return cc.MetaData.KeyValueMetadata.ToDictionary(kv => kv.Key, kv => kv.Value!);
}

/// <summary>
/// Returns DataColumnReader for given field
/// </summary>
/// <param name="field">DataField</param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="ParquetException"></exception>
public DataColumnReader GetColumnReader(DataField field) {

if(field == null)
throw new ArgumentNullException(nameof(field));

ColumnChunk columnChunk = GetMetadata(field)
?? throw new ParquetException($"'{field.Path}' does not exist in this file");

return new DataColumnReader(field, _stream, columnChunk, _footer, _parquetOptions);
}

/// <summary>
///
/// </summary>
Expand Down

0 comments on commit 7eb67a7

Please sign in to comment.