Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions c++/src/ByteRLE.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ namespace orc {

virtual void recordPosition(PositionRecorder* recorder) const override;

virtual void suppress() override;

/**
* Reset to initial state
*/
void reset();

protected:
std::unique_ptr<BufferedOutputStream> outputStream;
char* literals;
Expand All @@ -80,12 +87,7 @@ namespace orc {
std::unique_ptr<BufferedOutputStream> output)
: outputStream(std::move(output)) {
literals = new char[MAX_LITERAL_SIZE];
numLiterals = 0;
tailRunLength = 0;
repeat = false;
bufferPosition = 0;
bufferLength = 0;
buffer = nullptr;
reset();
}

ByteRleEncoderImpl::~ByteRleEncoderImpl() {
Expand Down Expand Up @@ -203,6 +205,21 @@ namespace orc {
recorder->add(static_cast<uint64_t>(numLiterals));
}

void ByteRleEncoderImpl::reset() {
numLiterals = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not nullify buffer? I'd add a reset() function to clear the state and use it here and in the ByteRleEncoderImpl constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't find the reset function, so I add this function by the way.

tailRunLength = 0;
repeat = false;
bufferPosition = 0;
bufferLength = 0;
buffer = nullptr;
}

void ByteRleEncoderImpl::suppress() {
// written data can be just ignored because they are only flushed in memory
outputStream->suppress();
reset();
}

std::unique_ptr<ByteRleEncoder> createByteRleEncoder
(std::unique_ptr<BufferedOutputStream> output) {
return std::unique_ptr<ByteRleEncoder>(new ByteRleEncoderImpl
Expand Down
5 changes: 5 additions & 0 deletions c++/src/ByteRLE.hh
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ namespace orc {
* @param recorder use the recorder to record current positions
*/
virtual void recordPosition(PositionRecorder* recorder) const = 0;

/**
* suppress the data and reset to initial state
*/
virtual void suppress() = 0;
};

class ByteRleDecoder {
Expand Down
32 changes: 30 additions & 2 deletions c++/src/ColumnWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ namespace orc {
enableBloomFilter(false),
memPool(*options.getMemoryPool()),
indexStream(),
bloomFilterStream() {
bloomFilterStream(),
hasNullValue(false) {

std::unique_ptr<BufferedOutputStream> presentStream =
factory.createStream(proto::Stream_Kind_PRESENT);
Expand Down Expand Up @@ -139,10 +140,22 @@ namespace orc {
uint64_t offset,
uint64_t numValues,
const char* incomingMask) {
notNullEncoder->add(batch.notNull.data() + offset, numValues, incomingMask);
const char* notNull = batch.notNull.data() + offset;
notNullEncoder->add(notNull, numValues, incomingMask);
hasNullValue |= batch.hasNulls;
for (uint64_t i = 0; !hasNullValue && i < numValues; ++i) {
if (!notNull[i]) {
hasNullValue = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest checking hasNullValue in line 145 or 146 because hasNullValue may already be set by previous nulls.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, we should check batch.hasNulls as a shortcut.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic has been changed to take the or operation with batch.hasNulls.

}
}
}

void ColumnWriter::flush(std::vector<proto::Stream>& streams) {
if (!hasNullValue) {
// supress the present stream
notNullEncoder->suppress();
return;
}
proto::Stream stream;
stream.set_kind(proto::Stream_Kind_PRESENT);
stream.set_column(static_cast<uint32_t>(columnId));
Expand Down Expand Up @@ -199,6 +212,21 @@ namespace orc {
}

void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
if (!hasNullValue) {
// remove positions of present stream
int presentCount = indexStream->isCompressed() ? 4 : 3;
for (int i = 0; i != rowIndex->entry_size(); ++i) {
proto::RowIndexEntry* entry = rowIndex->mutable_entry(i);
std::vector<uint64_t> positions;
for (int j = presentCount; j < entry->positions_size(); ++j) {
positions.push_back(entry->positions(j));
}
entry->clear_positions();
for (size_t j = 0; j != positions.size(); ++j) {
entry->add_positions(positions[j]);
}
}
}
// write row index to output stream
rowIndex->SerializeToZeroCopyStream(indexStream.get());

Expand Down
1 change: 1 addition & 0 deletions c++/src/ColumnWriter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ namespace orc {
MemoryPool& memPool;
std::unique_ptr<BufferedOutputStream> indexStream;
std::unique_ptr<BufferedOutputStream> bloomFilterStream;
bool hasNullValue;
};

/**
Expand Down
4 changes: 4 additions & 0 deletions c++/src/io/OutputStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ namespace orc {
return dataSize;
}

void BufferedOutputStream::suppress() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an empty line before this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

dataBuffer->resize(0);
}

void AppendOnlyBufferedStream::write(const char * data, size_t size) {
size_t dataOffset = 0;
while (size > 0) {
Expand Down
1 change: 1 addition & 0 deletions c++/src/io/OutputStream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ namespace orc {
virtual std::string getName() const;
virtual uint64_t getSize() const;
virtual uint64_t flush();
virtual void suppress();

virtual bool isCompressed() const { return false; }
};
Expand Down
95 changes: 95 additions & 0 deletions c++/test/TestWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "MemoryInputStream.hh"
#include "MemoryOutputStream.hh"
#include "Reader.hh"

#include "wrap/gmock.h"
#include "wrap/gtest-wrapper.h"
Expand Down Expand Up @@ -1901,5 +1902,99 @@ namespace orc {
}
}

TEST(WriterTest, testSuppressPresentStream) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool* pool = getDefaultPool();
size_t rowCount = 2000;
{
auto type = std::unique_ptr<Type>(
Type::buildTypeFromString("struct<col1:int,col2:int>"));
WriterOptions options;
options.setStripeSize(1024 * 1024)
.setCompressionBlockSize(1024)
.setCompression(CompressionKind_NONE)
.setMemoryPool(pool)
.setRowIndexStride(1000);

auto writer = createWriter(*type, &memStream, options);
auto batch = writer->createRowBatch(rowCount);
auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
auto& longBatch2 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]);
structBatch.numElements = rowCount;
longBatch1.numElements = rowCount;
longBatch2.numElements = rowCount;
longBatch1.hasNulls = true;
for (size_t i = 0; i < rowCount; ++i) {
if (i % 2 == 0) {
longBatch1.notNull[i] = 0;
} else {
longBatch1.notNull[i] = 1;
longBatch1.data[i] = static_cast<int64_t>(i*100);
}
longBatch2.data[i] = static_cast<int64_t>(i*300);
}
writer->add(*batch);
writer->close();
}
// read file & check the present stream
{
std::unique_ptr<InputStream> inStream(
new MemoryInputStream(memStream.getData(), memStream.getLength()));
ReaderOptions readerOptions;
readerOptions.setMemoryPool(*pool);
std::unique_ptr<Reader> reader =
createReader(std::move(inStream), readerOptions);
EXPECT_EQ(rowCount, reader->getNumberOfRows());
std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
auto batch = rowReader->createRowBatch(1000);
EXPECT_TRUE(rowReader->next(*batch));
EXPECT_EQ(1000, batch->numElements);
auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
auto& longBatch1 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
auto& longBatch2 = dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]);
for (size_t i = 0; i < 1000; ++i) {
if (i % 2 == 0) {
EXPECT_FALSE(longBatch1.notNull[i]);
} else {
EXPECT_TRUE(longBatch1.notNull[i]);
EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>(i*100));
}
EXPECT_EQ(longBatch2.data[i], static_cast<int64_t>(i*300));
}
// Read rows 1500 - 2000
rowReader->seekToRow(1500);
EXPECT_TRUE(rowReader->next(*batch));
EXPECT_EQ(500, batch->numElements);
for (size_t i = 0; i < 500; ++i) {
if (i % 2 == 0) {
EXPECT_FALSE(longBatch1.notNull[i]);
} else {
EXPECT_TRUE(longBatch1.notNull[i]);
EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>((i + 1500)*100));
}
EXPECT_EQ(longBatch2.data[i], static_cast<int64_t>((i + 1500)*300));
}
// fetch StripeFooter from pb stream
std::unique_ptr<StripeInformation> stripeInfo = reader->getStripe(0);
ReaderImpl* readerImpl = dynamic_cast<ReaderImpl*>(reader.get());
std::unique_ptr<SeekableInputStream> pbStream(
new SeekableFileInputStream(readerImpl->getStream(),
stripeInfo->getOffset() + stripeInfo->getIndexLength() + stripeInfo->getDataLength(),
stripeInfo->getFooterLength(),
*pool));
proto::StripeFooter stripeFooter;
if (!stripeFooter.ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError("Parse stripe footer from pb stream failed");
}
for (int i = 0; i < stripeFooter.streams_size(); ++i) {
const proto::Stream& stream = stripeFooter.streams(i);
if (stream.has_kind() && stream.kind() == proto::Stream_Kind_PRESENT) {
EXPECT_EQ(stream.column(), 1UL);
}
}
}
}

INSTANTIATE_TEST_CASE_P(OrcTest, WriterTest, Values(FileVersion::v_0_11(), FileVersion::v_0_12()));
}