Skip to content

Commit

Permalink
Add basic implementation of Unnest operator (#27)
Browse files Browse the repository at this point in the history
Summary:
This is an initial cut which supports only one unnest column of type ARRAY and doesn't support WITH ORDINALITY clause.

Pull Request resolved: #27

Differential Revision: D30264256

Pulled By: mbasmanova

fbshipit-source-id: e64552af02027b10b773942321193a0b7c7ba332
  • Loading branch information
mbasmanova authored and facebook-github-bot committed Aug 12, 2021
1 parent f148b28 commit c2a6ca0
Show file tree
Hide file tree
Showing 12 changed files with 476 additions and 23 deletions.
49 changes: 49 additions & 0 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,53 @@ const std::vector<std::shared_ptr<const PlanNode>>& ExchangeNode::sources()
return EMPTY_SOURCES;
}

UnnestNode::UnnestNode(
const PlanNodeId& id,
std::vector<std::shared_ptr<const FieldAccessTypedExpr>> replicateVariables,
std::vector<std::shared_ptr<const FieldAccessTypedExpr>> unnestVariables,
const std::vector<std::string>& unnestNames,
const std::optional<std::string>& ordinalityName,
const std::shared_ptr<const PlanNode>& source)
: PlanNode(id),
replicateVariables_{std::move(replicateVariables)},
unnestVariables_{std::move(unnestVariables)},
withOrdinality_{ordinalityName.has_value()},
sources_{source} {
// Calculate output type. First come "replicate" columns, followed by
// "unnest" columns, followed by an optional ordinality column.
std::vector<std::string> names;
std::vector<TypePtr> types;

for (const auto& variable : replicateVariables_) {
names.emplace_back(variable->name());
types.emplace_back(variable->type());
}

int unnestIndex = 0;
for (const auto& variable : unnestVariables_) {
if (variable->type()->isArray()) {
names.emplace_back(unnestNames[unnestIndex++]);
types.emplace_back(variable->type()->asArray().elementType());
} else if (variable->type()->isMap()) {
const auto& mapType = variable->type()->asMap();

names.emplace_back(unnestNames[unnestIndex++]);
types.emplace_back(mapType.keyType());

names.emplace_back(unnestNames[unnestIndex++]);
types.emplace_back(mapType.valueType());
} else {
VELOX_FAIL(
"Unexpected type of unnest variable. Expected ARRAY or MAP, but got {}.",
variable->type()->toString());
}
}

if (ordinalityName.has_value()) {
names.emplace_back(ordinalityName.value());
types.emplace_back(BIGINT());
}
outputType_ = ROW(std::move(names), std::move(types));
}

} // namespace facebook::velox::core
64 changes: 64 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -959,4 +959,68 @@ class LimitNode : public PlanNode {
const std::vector<std::shared_ptr<const PlanNode>> sources_;
};

/// Expands arrays and maps into separate columns. Arrays are expanded into a
/// single column, and maps are expanded into two columns (key, value). Can be
/// used to expand multiple columns. In this case will produce as many rows as
/// the highest cardinality array or map (the other columns are padded with
/// nulls). Optionally can produce an ordinality column that specifies the row
/// number starting with 1.
class UnnestNode : public PlanNode {
public:
/// @param replicateVariables Inputs that are projected as is
/// @param unnestVariables Inputs that are unnested. Must be of type ARRAY or
/// MAP.
/// @param unnestNames Names to use for unnested outputs: one name for each
/// array (element); two names for each map (key and value). The output names
/// must appear in the same order as unnestVariables.
/// @param ordinalityName Optional name for the ordinality columns. If not
/// present, ordinality column is not produced.
UnnestNode(
const PlanNodeId& id,
std::vector<std::shared_ptr<const FieldAccessTypedExpr>>
replicateVariables,
std::vector<std::shared_ptr<const FieldAccessTypedExpr>> unnestVariables,
const std::vector<std::string>& unnestNames,
const std::optional<std::string>& ordinalityName,
const std::shared_ptr<const PlanNode>& source);

/// The order of columns in the output is: replicated columns (in the order
/// specified), unnested columns (in the order specified, for maps: key comes
/// before value), optional ordinality column.
const RowTypePtr& outputType() const override {
return outputType_;
}

const std::vector<std::shared_ptr<const PlanNode>>& sources() const override {
return sources_;
}

const std::vector<std::shared_ptr<const FieldAccessTypedExpr>>&
replicateVariables() const {
return replicateVariables_;
}

const std::vector<std::shared_ptr<const FieldAccessTypedExpr>>&
unnestVariables() const {
return unnestVariables_;
}

bool withOrdinality() const {
return withOrdinality_;
}

std::string_view name() const override {
return "unnest";
}

private:
const std::vector<std::shared_ptr<const FieldAccessTypedExpr>>
replicateVariables_;
const std::vector<std::shared_ptr<const FieldAccessTypedExpr>>
unnestVariables_;
const bool withOrdinality_;
const std::vector<std::shared_ptr<const PlanNode>> sources_;
RowTypePtr outputType_;
};

} // namespace facebook::velox::core
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ add_library(
Operator.cpp
OperatorUtils.cpp
FilterProject.cpp
Unnest.cpp
Values.cpp
VectorHasher.cpp
PartitionedOutputBufferManager.cpp
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/exec/TableScan.h"
#include "velox/exec/TableWriter.h"
#include "velox/exec/TopN.h"
#include "velox/exec/Unnest.h"
#include "velox/exec/Values.h"

namespace facebook::velox::exec {
Expand Down Expand Up @@ -290,8 +291,12 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
localPartitionNode->outputType(),
localPartitionNode->id(),
ctx->driverId));
} else if (
auto unnest =
std::dynamic_pointer_cast<const core::UnnestNode>(planNode)) {
operators.push_back(std::make_unique<Unnest>(id, ctx.get(), unnest));
} else {
VELOX_FAIL("Unsupported plan node");
VELOX_FAIL("Unsupported plan node: {}", planNode->toString());
}
}
if (consumerSupplier) {
Expand Down
145 changes: 145 additions & 0 deletions velox/exec/Unnest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/Unnest.h"
#include "velox/exec/OperatorUtils.h"

namespace facebook::velox::exec {
Unnest::Unnest(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::UnnestNode>& unnestNode)
: Operator(
driverCtx,
unnestNode->outputType(),
operatorId,
unnestNode->id(),
"Unnest") {
if (unnestNode->unnestVariables().size() > 1) {
VELOX_UNSUPPORTED(
"Unnest operator doesn't support multiple unnest columns yet");
}

const auto& unnestVariable = unnestNode->unnestVariables()[0];
if (!unnestVariable->type()->isArray()) {
VELOX_UNSUPPORTED(
"Unnest operator doesn't support non-ARRAY unnest variables yet: {}",
unnestVariable->type()->toString())
}

if (unnestNode->withOrdinality()) {
VELOX_UNSUPPORTED("Unnest operator doesn't support ordinality column yet");
}

const auto& inputType = unnestNode->sources()[0]->outputType();
ChannelIndex outputChannel = 0;
for (const auto& variable : unnestNode->replicateVariables()) {
identityProjections_.emplace_back(
inputType->getChildIdx(variable->name()), outputChannel++);
}

unnestChannel_ = inputType->getChildIdx(unnestVariable->name());
}

void Unnest::addInput(RowVectorPtr input) {
input_ = std::move(input);
}

RowVectorPtr Unnest::getOutput() {
if (!input_) {
return nullptr;
}

auto size = input_->size();

// Build repeated indices to apply to "replicated" columns.
const auto& unnestVector = input_->childAt(unnestChannel_);

inputRows_.resize(size);
unnestDecoded_.decode(*unnestVector, inputRows_);

auto unnestIndices = unnestDecoded_.indices();

auto unnestBase = unnestDecoded_.base()->as<ArrayVector>();
auto rawSizes = unnestBase->rawSizes();
auto rawOffsets = unnestBase->rawOffsets();

// Count number of elements.
vector_size_t numElements = 0;
for (auto row = 0; row < size; ++row) {
if (!unnestDecoded_.isNullAt(row)) {
numElements += rawSizes[unnestIndices[row]];
}
}

if (numElements == 0) {
// All arrays are null or empty.
input_ = nullptr;
return nullptr;
}

// Create "indices" buffer to repeat rows as many times as there are elements
// in the array.
BufferPtr repeatedIndices =
AlignedBuffer::allocate<vector_size_t>(numElements, pool());
auto* rawIndices = repeatedIndices->asMutable<vector_size_t>();
vector_size_t index = 0;
for (auto row = 0; row < size; ++row) {
if (!unnestDecoded_.isNullAt(row)) {
auto unnestSize = rawSizes[unnestIndices[row]];
for (auto i = 0; i < unnestSize; i++) {
rawIndices[index++] = row;
}
}
}

// Wrap "replicated" columns in a dictionary using 'repeatedIndices'.
std::vector<VectorPtr> outputs(outputType_->size());
for (const auto& projection : identityProjections_) {
outputs[projection.outputChannel] = wrapChild(
numElements, repeatedIndices, input_->childAt(projection.inputChannel));
}

// Make "elements" column. Elements may be out of order. Use a
// dictionary to ensure the right order.
BufferPtr elementIndices =
AlignedBuffer::allocate<vector_size_t>(numElements, pool());
auto* rawElementIndices = elementIndices->asMutable<vector_size_t>();
index = 0;
bool identityMapping = true;
for (auto row = 0; row < size; ++row) {
if (!unnestDecoded_.isNullAt(row)) {
auto offset = rawOffsets[unnestIndices[row]];
auto unnestSize = rawSizes[unnestIndices[row]];

if (index != offset) {
identityMapping = false;
}

for (auto i = 0; i < unnestSize; i++) {
rawElementIndices[index++] = offset + i;
}
}
}

outputs[identityProjections_.size()] = identityMapping
? unnestBase->elements()
: wrapChild(numElements, elementIndices, unnestBase->elements());

input_ = nullptr;

return std::make_shared<RowVector>(
pool(), outputType_, BufferPtr(nullptr), numElements, std::move(outputs));
}
} // namespace facebook::velox::exec
43 changes: 43 additions & 0 deletions velox/exec/Unnest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "velox/exec/Operator.h"

namespace facebook::velox::exec {
class Unnest : public Operator {
public:
Unnest(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::UnnestNode>& unnestNode);

BlockingReason isBlocked(ContinueFuture* /*future*/) override {
return BlockingReason::kNotBlocked;
}

bool needsInput() const override {
return true;
}

void addInput(RowVectorPtr input) override;

RowVectorPtr getOutput() override;

private:
ChannelIndex unnestChannel_;

SelectivityVector inputRows_;
DecodedVector unnestDecoded_;
};
} // namespace facebook::velox::exec
3 changes: 2 additions & 1 deletion velox/exec/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ add_executable(
HashJoinTest.cpp
CastExprTest.cpp
PlanNodeToStringTest.cpp
FunctionSignatureBuilderTest.cpp)
FunctionSignatureBuilderTest.cpp
UnnestTest.cpp)

add_test(
NAME velox_exec_test
Expand Down
10 changes: 10 additions & 0 deletions velox/exec/tests/OperatorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ class OperatorTestBase : public testing::Test {
return vectorMaker_.flatVectorNullable(data);
}

template <typename T>
ArrayVectorPtr makeArrayVector(
vector_size_t size,
std::function<vector_size_t(vector_size_t /* row */)> sizeAt,
std::function<T(vector_size_t /* row */, vector_size_t /* idx */)>
valueAt,
std::function<bool(vector_size_t /*row */)> isNullAt = nullptr) {
return vectorMaker_.arrayVector<T>(size, sizeAt, valueAt, isNullAt);
}

template <typename TKey, typename TValue>
MapVectorPtr makeMapVector(
vector_size_t size,
Expand Down
Loading

0 comments on commit c2a6ca0

Please sign in to comment.