Skip to content

Commit

Permalink
Add basic implementation of Unnest operator
Browse files Browse the repository at this point in the history
This is an initial cut which supports only one unnest column of type ARRAY
and doesn't support WITH ORDINALITY clause.
  • Loading branch information
mbasmanova committed Aug 11, 2021
1 parent f6b80b7 commit 95ded0a
Show file tree
Hide file tree
Showing 11 changed files with 456 additions and 23 deletions.
49 changes: 49 additions & 0 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,53 @@ const std::vector<std::shared_ptr<const PlanNode>>& ExchangeNode::sources()
return EMPTY_SOURCES;
}

UnnestNode::UnnestNode(
const PlanNodeId& id,
std::vector<std::shared_ptr<const FieldAccessTypedExpr>> replicateVariables,
std::vector<std::shared_ptr<const FieldAccessTypedExpr>> unnestVariables,
const std::vector<std::string>& unnestNames,
const std::optional<std::string>& ordinalityName,
const std::shared_ptr<const PlanNode>& source)
: PlanNode(id),
replicateVariables_{std::move(replicateVariables)},
unnestVariables_{std::move(unnestVariables)},
withOrdinality_{ordinalityName.has_value()},
sources_{source} {
// Calculate output type. First come "replicate" columns, followed by
// "unnest" columns, followed by an optional ordinality column.
std::vector<std::string> names;
std::vector<TypePtr> types;

for (const auto& variable : replicateVariables_) {
names.emplace_back(variable->name());
types.emplace_back(variable->type());
}

int unnestIndex = 0;
for (const auto& variable : unnestVariables_) {
if (variable->type()->isArray()) {
names.emplace_back(unnestNames[unnestIndex++]);
types.emplace_back(variable->type()->asArray().elementType());
} else if (variable->type()->isMap()) {
const auto& mapType = variable->type()->asMap();

names.emplace_back(unnestNames[unnestIndex++]);
types.emplace_back(mapType.keyType());

names.emplace_back(unnestNames[unnestIndex++]);
types.emplace_back(mapType.valueType());
} else {
VELOX_FAIL(
"Unexpected type of unnest variable. Expected ARRAY or MAP, but got {}.",
variable->type()->toString());
}
}

if (ordinalityName.has_value()) {
names.emplace_back(ordinalityName.value());
types.emplace_back(BIGINT());
}
outputType_ = ROW(std::move(names), std::move(types));
}

} // namespace facebook::velox::core
64 changes: 64 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -959,4 +959,68 @@ class LimitNode : public PlanNode {
const std::vector<std::shared_ptr<const PlanNode>> sources_;
};

/// Expands arrays and maps into separate columns. Arrays are expanded into a
/// single column, and maps are expanded into two columns (key, value). Can be
/// used to expand multiple columns. In this case will produce as many rows as
/// the highest cardinality array or map (the other columns are padded with
/// nulls). Optionally can produce an ordinality column that specifies the row
/// number starting with 1.
class UnnestNode : public PlanNode {
public:
/// @param replicateVariables Inputs that are projected as is
/// @param unnestVariables Inputs that are unnested. Must be of type ARRAY or
/// MAP.
/// @param unnestNames Names to use for unnested outputs: one name for each
/// array (element); two names for each map (key and value). The output names
/// must appear in the same order as unnestVariables.
/// @param ordinalityName Optional name for the ordinality columns. If not
/// present, ordinality column is not produced.
UnnestNode(
const PlanNodeId& id,
std::vector<std::shared_ptr<const FieldAccessTypedExpr>>
replicateVariables,
std::vector<std::shared_ptr<const FieldAccessTypedExpr>> unnestVariables,
const std::vector<std::string>& unnestNames,
const std::optional<std::string>& ordinalityName,
const std::shared_ptr<const PlanNode>& source);

/// The order of columns in the output is: replicated columns (in the order
/// specified), unnested columns (in the order specified, for maps: key comes
/// before value), optional ordinality column.
const RowTypePtr& outputType() const override {
return outputType_;
}

const std::vector<std::shared_ptr<const PlanNode>>& sources() const override {
return sources_;
}

const std::vector<std::shared_ptr<const FieldAccessTypedExpr>>&
replicateVariables() const {
return replicateVariables_;
}

const std::vector<std::shared_ptr<const FieldAccessTypedExpr>>&
unnestVariables() const {
return unnestVariables_;
}

bool withOrdinality() const {
return withOrdinality_;
}

std::string_view name() const override {
return "unnest";
}

private:
const std::vector<std::shared_ptr<const FieldAccessTypedExpr>>
replicateVariables_;
const std::vector<std::shared_ptr<const FieldAccessTypedExpr>>
unnestVariables_;
const bool withOrdinality_;
const std::vector<std::shared_ptr<const PlanNode>> sources_;
RowTypePtr outputType_;
};

} // namespace facebook::velox::core
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ add_library(
Operator.cpp
OperatorUtils.cpp
FilterProject.cpp
Unnest.cpp
Values.cpp
VectorHasher.cpp
PartitionedOutputBufferManager.cpp
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/exec/TableScan.h"
#include "velox/exec/TableWriter.h"
#include "velox/exec/TopN.h"
#include "velox/exec/Unnest.h"
#include "velox/exec/Values.h"

namespace facebook::velox::exec {
Expand Down Expand Up @@ -290,8 +291,12 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
localPartitionNode->outputType(),
localPartitionNode->id(),
ctx->driverId));
} else if (
auto unnest =
std::dynamic_pointer_cast<const core::UnnestNode>(planNode)) {
operators.push_back(std::make_unique<Unnest>(id, ctx.get(), unnest));
} else {
VELOX_FAIL("Unsupported plan node");
VELOX_FAIL("Unsupported plan node: {}", planNode->toString());
}
}
if (consumerSupplier) {
Expand Down
139 changes: 139 additions & 0 deletions velox/exec/Unnest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/Unnest.h"
#include "velox/exec/OperatorUtils.h"

namespace facebook::velox::exec {
Unnest::Unnest(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::UnnestNode>& unnestNode)
: Operator(
driverCtx,
unnestNode->outputType(),
operatorId,
unnestNode->id(),
"Unnest") {
if (unnestNode->unnestVariables().size() > 1) {
VELOX_UNSUPPORTED(
"Unnest operator doesn't support multiple unnest columns yet");
}

const auto& unnestVariable = unnestNode->unnestVariables()[0];
if (!unnestVariable->type()->isArray()) {
VELOX_UNSUPPORTED(
"Unnest operator doesn't support non-ARRAY unnest variables yet: {}",
unnestVariable->type()->toString())
}

if (unnestNode->withOrdinality()) {
VELOX_UNSUPPORTED("Unnest operator doesn't support ordinality column yet");
}

const auto& inputType = unnestNode->sources()[0]->outputType();
ChannelIndex outputChannel = 0;
for (const auto& variable : unnestNode->replicateVariables()) {
identityProjections_.emplace_back(IdentityProjection{
inputType->getChildIdx(variable->name()), outputChannel++});
}

unnestChannel_ = inputType->getChildIdx(unnestVariable->name());
}

void Unnest::addInput(RowVectorPtr input) {
input_ = std::move(input);
}

RowVectorPtr Unnest::getOutput() {
if (!input_) {
return nullptr;
}

auto size = input_->size();

// Build repeated indices to apply to "replicated" columns.
const auto& unnestVector = input_->childAt(unnestChannel_);

inputRows_.resize(size);
unnestDecoded_.decode(*unnestVector, inputRows_);

auto unnestIndices = unnestDecoded_.indices();

auto unnestBase = unnestDecoded_.base()->as<ArrayVector>();
auto rawSizes = unnestBase->rawSizes();
auto rawOffsets = unnestBase->rawOffsets();

// Count number of elements.
vector_size_t numElements = 0;
for (auto row = 0; row < size; ++row) {
if (!unnestDecoded_.isNullAt(row)) {
numElements += rawSizes[unnestIndices[row]];
}
}

// Create "indices" buffer to repeat rows as many times as there are elements
// in the array.
BufferPtr repeatedIndices =
AlignedBuffer::allocate<vector_size_t>(numElements, pool());
auto* rawIndices = repeatedIndices->asMutable<vector_size_t>();
vector_size_t index = 0;
for (auto row = 0; row < size; ++row) {
if (!unnestDecoded_.isNullAt(row)) {
auto unnestSize = rawSizes[unnestIndices[row]];
for (auto i = 0; i < unnestSize; i++) {
rawIndices[index++] = row;
}
}
}

// Wrap "replicated" columns in a dictionary using 'repeatedIndices'.
std::vector<VectorPtr> outputs(outputType_->size());
for (const auto& projection : identityProjections_) {
outputs[projection.outputChannel] = wrapChild(
numElements, repeatedIndices, input_->childAt(projection.inputChannel));
}

// Make "elements" column. Elements may be out of order. Use a
// dictionary to ensure the right order.
BufferPtr elementIndices =
AlignedBuffer::allocate<vector_size_t>(numElements, pool());
auto* rawElementIndices = elementIndices->asMutable<vector_size_t>();
index = 0;
bool identityMapping = true;
for (auto row = 0; row < size; ++row) {
if (!unnestDecoded_.isNullAt(row)) {
auto offset = rawOffsets[unnestIndices[row]];
auto unnestSize = rawSizes[unnestIndices[row]];

if (index != offset) {
identityMapping = false;
}

for (auto i = 0; i < unnestSize; i++) {
rawElementIndices[index++] = offset + i;
}
}
}

outputs[identityProjections_.size()] = identityMapping
? unnestBase->elements()
: wrapChild(numElements, elementIndices, unnestBase->elements());

input_ = nullptr;

return std::make_shared<RowVector>(
pool(), outputType_, BufferPtr(nullptr), numElements, std::move(outputs));
}
} // namespace facebook::velox::exec
43 changes: 43 additions & 0 deletions velox/exec/Unnest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "velox/exec/Operator.h"

namespace facebook::velox::exec {
class Unnest : public Operator {
public:
Unnest(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::UnnestNode>& unnestNode);

BlockingReason isBlocked(ContinueFuture* /*future*/) override {
return BlockingReason::kNotBlocked;
}

bool needsInput() const override {
return true;
}

void addInput(RowVectorPtr input) override;

RowVectorPtr getOutput() override;

private:
ChannelIndex unnestChannel_;

SelectivityVector inputRows_;
DecodedVector unnestDecoded_;
};
} // namespace facebook::velox::exec
3 changes: 2 additions & 1 deletion velox/exec/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ add_executable(
HashJoinTest.cpp
CastExprTest.cpp
PlanNodeToStringTest.cpp
FunctionSignatureBuilderTest.cpp)
FunctionSignatureBuilderTest.cpp
UnnestTest.cpp)

add_test(
NAME velox_exec_test
Expand Down
Loading

0 comments on commit 95ded0a

Please sign in to comment.