Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions c++/src/ByteRLE.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ namespace orc {

virtual void recordPosition(PositionRecorder* recorder) const override;

virtual void suppress() override;

private:
int bitsRemained;
char current;
Expand Down Expand Up @@ -291,6 +293,12 @@ namespace orc {
recorder->add(static_cast<uint64_t>(8 - bitsRemained));
}

void BooleanRleEncoderImpl::suppress() {
ByteRleEncoderImpl::suppress();
bitsRemained = 8;
current = static_cast<char>(0);
}

std::unique_ptr<ByteRleEncoder> createBooleanRleEncoder(
std::unique_ptr<BufferedOutputStream> output) {
BooleanRleEncoderImpl* encoder = new BooleanRleEncoderImpl(std::move(output));
Expand Down
78 changes: 78 additions & 0 deletions c++/test/TestWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1994,6 +1994,84 @@ namespace orc {
EXPECT_FALSE(rowReader->next(*batch));
}

// first stripe has no null value and second stripe has null value.
// make sure stripes do not have dirty data in the present streams.
TEST_P(WriterTest, testSuppressPresentStreamInPreStripe) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool* pool = getDefaultPool();

// [1-998000): notNull, value is equal to index
// [998000-999000): null
// [999000-1000000]: notNoll, value is equal to index
size_t rowCount = 1000000;
size_t nullBeginCount = 998000;
size_t nullEndCount = 999000;
size_t batchSize = 5;
{
auto type = std::unique_ptr<Type>(Type::buildTypeFromString("struct<col1:int>"));
WriterOptions options;
options.setStripeSize(16 * 1024)
.setCompressionBlockSize(1024)
.setCompression(CompressionKind_NONE)
.setMemoryPool(pool)
.setRowIndexStride(1000);

auto writer = createWriter(*type, &memStream, options);

uint64_t batchCount = rowCount / batchSize;
size_t rowsWrite = 0;
for (uint64_t batchIdx = 0; batchIdx < batchCount; batchIdx++) {
auto batch = writer->createRowBatch(batchSize);
auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
structBatch.numElements = batchSize;
longBatch.numElements = batchSize;
longBatch.hasNulls = false;
for (uint64_t row = 0; row < batchSize; ++row) {
size_t rowIndex = rowsWrite + row + 1;
if (rowIndex < nullBeginCount || rowIndex >= nullEndCount) {
longBatch.data[row] = static_cast<int64_t>(rowIndex);
} else {
longBatch.notNull[row] = 0;
longBatch.hasNulls = true;
}
}

writer->add(*batch);
rowsWrite += batch->numElements;
}
writer->close();
}
// read file & check the column value correct
{
std::unique_ptr<MemoryInputStream> inStream(new MemoryInputStream(
memStream.getData(), memStream.getLength()));
ReaderOptions readerOptions;
readerOptions.setMemoryPool(*pool);
std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions);
EXPECT_EQ(reader->getNumberOfStripes(), 2);
EXPECT_EQ(rowCount, reader->getNumberOfRows());
std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
size_t rowsRead = 0;
while (rowsRead < rowCount) {
auto batch = rowReader->createRowBatch(1000);
EXPECT_TRUE(rowReader->next(*batch));
auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
for (size_t i = 0; i < batch->numElements; ++i) {
size_t rowIndex = rowsRead + i + 1;
if (rowIndex < nullBeginCount || rowIndex >= nullEndCount) {
EXPECT_TRUE(longBatch.notNull[i]);
EXPECT_EQ(longBatch.data[i], static_cast<int64_t>(rowIndex));
} else {
EXPECT_FALSE(longBatch.notNull[i]);
}
}
rowsRead += batch->numElements;
}
}
}

INSTANTIATE_TEST_SUITE_P(OrcTest, WriterTest,
Values(FileVersion::v_0_11(), FileVersion::v_0_12(),
FileVersion::UNSTABLE_PRE_2_0()));
Expand Down