Skip to content

Commit

Permalink
Add ownership check to E2EFilterTest (#1491)
Browse files Browse the repository at this point in the history
Summary:
Makes a shallow and a deep copy of every even numbered batch in the
different tests. Checks that these remain =equal after receiving the
odd-numbered batch. Does this for the first 12 batches of each test
run so as not to increase test time.

This effectively checks that when the reader hands out a second
reference to a Buffer, this does not get overwritten if the Buffer is
multiply referenced when getting the next batch. The same applies to
reusing parts of the 'result' VectorPtr in getValues of the different
reader classes.

Fixes a bad reuse of a multiply referenced RowVector child of the
top-level RowVector. Note that RowVector children of RowVectors must
be initialized to at least an empty RowVector, while other children
can be set to nullptr.

Pull Request resolved: #1491

Reviewed By: mbasmanova

Differential Revision: D35987608

Pulled By: oerling

fbshipit-source-id: a68f488b24ee924d93edc92de204d3d0770c29c8
  • Loading branch information
Orri Erling authored and facebook-github-bot committed Apr 28, 2022
1 parent 84c0e8d commit 798d421
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 45 deletions.
42 changes: 40 additions & 2 deletions velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,51 @@ void SelectiveStructColumnReader::read(
readOffset_ = offset + rows.back() + 1;
}

namespace {
// Recursively makes empty RowVectors for positions in 'children'
// where the corresponding child type in 'rowType' is a row. The
// reader expects RowVector outputs to be initialized so that the
// content corresponds to the query schema regardless of the file
// schema. An empty RowVector can have nullptr for all its non-row
// children.
void fillRowVectorChildren(
memory::MemoryPool& pool,
const RowType& rowType,
std::vector<VectorPtr>& children) {
for (auto i = 0; i < children.size(); ++i) {
const auto& type = rowType.childAt(i);
if (type->isRow()) {
std::vector<VectorPtr> innerChildren(type->size());
fillRowVectorChildren(pool, type->asRow(), innerChildren);
children[i] = std::make_shared<RowVector>(
&pool, type, nullptr, 0, std::move(innerChildren));
}
}
}
} // namespace

void SelectiveStructColumnReader::getValues(RowSet rows, VectorPtr* result) {
assert(!children_.empty());
VELOX_CHECK(
*result != nullptr,
"SelectiveStructColumnReader expects a non-null result");
RowVector* resultRow = dynamic_cast<RowVector*>(result->get());
VELOX_CHECK(resultRow, "Struct reader expects a result of type ROW.");
VELOX_CHECK(
result->get()->type()->isRow(),
"Struct reader expects a result of type ROW.");

auto resultRow = static_cast<RowVector*>(result->get());
if (!result->unique()) {
std::vector<VectorPtr> children(resultRow->children().size());
fillRowVectorChildren(
*resultRow->pool(), resultRow->type()->asRow(), children);
*result = std::make_unique<RowVector>(
resultRow->pool(),
resultRow->type(),
BufferPtr(nullptr),
0,
std::move(children));
resultRow = static_cast<RowVector*>(result->get());
}
resultRow->resize(rows.size());
if (!rows.size()) {
return;
Expand Down
96 changes: 53 additions & 43 deletions velox/dwio/dwrf/test/E2EFilterTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,40 +143,27 @@ void E2EFilterTestBase::makeStringUnique(const Subfield& field) {
}

void E2EFilterTestBase::makeNotNull(int32_t firstRow) {
for (RowVectorPtr batch : batches_) {
makeNotNullRecursive(firstRow, batch);
}
}

void E2EFilterTestBase::makeNotNullRecursive(
int32_t firstRow,
RowVectorPtr batch) {
// make all children in nested structs non-null.
for (auto& data : batch->children()) {
if (data->typeKind() == TypeKind::ROW) {
auto rowVector = std::dynamic_pointer_cast<RowVector>(data);
makeNotNullRecursive(firstRow, rowVector);
}
}
for (auto& data : batch->children()) {
std::vector<vector_size_t> nonNulls;
vector_size_t probe = 0;
for (auto counter = 0; counter < 23; ++counter) {
// Sample with a prime stride for a handful of non-null values.
probe = (probe + 47) % data->size();
if (!data->isNullAt(probe)) {
nonNulls.push_back(probe);
for (const auto& batch : batches_) {
for (auto& data : batch->children()) {
std::vector<vector_size_t> nonNulls;
vector_size_t probe = 0;
for (auto counter = 0; counter < 23; ++counter) {
// Sample with a prime stride for a handful of non-null values.
probe = (probe + 47) % data->size();
if (!data->isNullAt(probe)) {
nonNulls.push_back(probe);
}
}
}
if (nonNulls.empty()) {
continue;
}
int32_t nonNullCounter = 0;
for (auto row = firstRow; row < data->size(); ++row) {
if (data->isNullAt(row)) {
data->copy(
data.get(), row, nonNulls[nonNullCounter % nonNulls.size()], 1);
++nonNullCounter;
if (nonNulls.empty()) {
continue;
}
int32_t nonNullCounter = 0;
for (auto row = firstRow; row < data->size(); ++row) {
if (data->isNullAt(row)) {
data->copy(
data.get(), row, nonNulls[nonNullCounter % nonNulls.size()], 1);
++nonNullCounter;
}
}
}
}
Expand All @@ -195,6 +182,7 @@ void E2EFilterTestBase::readWithoutFilter(
;
// The spec must stay live over the lifetime of the reader.
rowReaderOpts.setScanSpec(spec);
OwnershipChecker ownershipChecker;
auto rowReader = reader->createRowReader(rowReaderOpts);

auto batchIndex = 0;
Expand All @@ -210,6 +198,7 @@ void E2EFilterTestBase::readWithoutFilter(
break;
}

ownershipChecker.check(batch);
for (int32_t i = 0; i < batch->size(); ++i) {
ASSERT_TRUE(batch->equalValueAt(batches[batchIndex].get(), i, rowIndex))
<< "Content mismatch at batch " << batchIndex << " at index "
Expand Down Expand Up @@ -243,6 +232,7 @@ void E2EFilterTestBase::readWithFilter(
auto factory = std::make_unique<SelectiveColumnReaderFactory>(spec);
// The spec must stay live over the lifetime of the reader.
rowReaderOpts.setScanSpec(spec);
OwnershipChecker ownershipChecker;
auto rowReader = reader->createRowReader(rowReaderOpts);
runtimeStats_ = dwio::common::RuntimeStatistics();
auto rowIndex = 0;
Expand Down Expand Up @@ -294,6 +284,8 @@ void E2EFilterTestBase::readWithFilter(
<< batches[batchNumber(hit)]->toString(batchRow(hit))
<< " actual: " << batch->toString(i);
}
// Check no overwrites after all LazyVectors are loaded.
ownershipChecker.check(batch);
}
if (!skipCheck) {
ASSERT_EQ(rowIndex, hitRows.size());
Expand All @@ -315,15 +307,6 @@ bool E2EFilterTestBase::loadWithHook(
return VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
checkLoadWithHook, kind, batch, columnIndex, child, hitRows, rowIndex);
}
namespace {
// Recursively clear extractValues so that eager read is not forced.
void setLazy(ScanSpec& spec) {
spec.setExtractValues(false);
for (auto& child : spec.children()) {
setLazy(*child);
}
}
} // namespace

void E2EFilterTestBase::testFilterSpecs(
const std::vector<FilterSpec>& filterSpecs) {
Expand All @@ -343,7 +326,7 @@ void E2EFilterTestBase::testFilterSpecs(
// Redo the test with LazyVectors for non-filtered columns.
timeWithFilter = 0;
for (auto& childSpec : spec->children()) {
setLazy(*childSpec);
childSpec->setExtractValues(false);
}
readWithFilter(spec.get(), batches_, hitRows, timeWithFilter, false);
timeWithFilter = 0;
Expand Down Expand Up @@ -411,4 +394,31 @@ void E2EFilterTestBase::testWithTypes(
}
}

void OwnershipChecker::check(const VectorPtr& batch) {
// Check the 6 first pairs of previous, next batch to see that
// fetching the next does not overwrite parts reachable from a
// retained reference to the previous one.
if (batchCounter_ > 11) {
return;
}
if (batchCounter_ % 2 == 0) {
previousBatch_ = std::make_shared<RowVector>(
batch->pool(),
batch->type(),
BufferPtr(nullptr),
batch->size(),
batch->as<RowVector>()->children());
previousBatchCopy_ = BaseVector::copy(*batch);
}
if (batchCounter_ % 2 == 1) {
for (auto i = 0; i < previousBatch_->size(); ++i) {
ASSERT_TRUE(previousBatch_->equalValueAt(previousBatchCopy_.get(), i, i))
<< "Retained reference of a batch has been overwritten by the next "
<< "index " << i << " batch " << previousBatch_->toString(i)
<< " original " << previousBatchCopy_->toString(i);
}
}
++batchCounter_;
}

} // namespace facebook::velox::dwio::dwrf
19 changes: 19 additions & 0 deletions velox/dwio/dwrf/test/E2EFilterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,25 @@ inline void TestingHook<StringView>::addValue(
row, StringView(*reinterpret_cast<const folly::StringPiece*>(value)));
}

// Utility for checking that a subsequent batch of output does not
// overwrite internals of a possibly retained previous batch.
class OwnershipChecker {
public:
// Receives consecutive batches during a test and checks that
// subsequent ones do not overwrite retained content from previous
// ones.
void check(const VectorPtr& batch);

private:
// Sequence number of batch for check().
int32_t batchCounter_{0};
// References the previous batch of results for check().
VectorPtr previousBatch_;
// Copy of 'previousBatch_' used for validating no overwrite from fetching the
// next batch.
VectorPtr previousBatchCopy_;
};

class E2EFilterTestBase : public testing::Test {
protected:
static constexpr int32_t kRowsInGroup = 10'000;
Expand Down
8 changes: 8 additions & 0 deletions velox/vector/BaseVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,14 @@ class BaseVector {
});
}

// Utility for making a deep copy of a whole vector.
static std::shared_ptr<BaseVector> copy(const BaseVector& vector) {
auto result =
BaseVector::create(vector.type(), vector.size(), vector.pool());
result->copy(&vector, 0, 0, vector.size());
return result;
}

// Move or copy an element at 'source' row into 'target' row.
// This can be more efficient than copy for complex types.
virtual void move(vector_size_t source, vector_size_t target) {
Expand Down

0 comments on commit 798d421

Please sign in to comment.