From 95ded0a5abc454a3b044011a0ed5233beeba794f Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Wed, 11 Aug 2021 16:42:52 -0400 Subject: [PATCH] Add basic implementation of Unnest operator This is an initial cut which supports only one unnest column of type ARRAY and doesn't support WITH ORDINALITY clause. --- velox/core/PlanNode.cpp | 49 ++++++++++ velox/core/PlanNode.h | 64 ++++++++++++ velox/exec/CMakeLists.txt | 1 + velox/exec/LocalPlanner.cpp | 7 +- velox/exec/Unnest.cpp | 139 +++++++++++++++++++++++++++ velox/exec/Unnest.h | 43 +++++++++ velox/exec/tests/CMakeLists.txt | 3 +- velox/exec/tests/PlanBuilder.cpp | 48 +++++++++ velox/exec/tests/PlanBuilder.h | 8 ++ velox/exec/tests/QueryAssertions.cpp | 69 +++++++++---- velox/exec/tests/UnnestTest.cpp | 48 +++++++++ 11 files changed, 456 insertions(+), 23 deletions(-) create mode 100644 velox/exec/Unnest.cpp create mode 100644 velox/exec/Unnest.h create mode 100644 velox/exec/tests/UnnestTest.cpp diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 58a1c227344cc..e166f56f80dce 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -34,4 +34,53 @@ const std::vector>& ExchangeNode::sources() return EMPTY_SOURCES; } +UnnestNode::UnnestNode( + const PlanNodeId& id, + std::vector> replicateVariables, + std::vector> unnestVariables, + const std::vector& unnestNames, + const std::optional& ordinalityName, + const std::shared_ptr& source) + : PlanNode(id), + replicateVariables_{std::move(replicateVariables)}, + unnestVariables_{std::move(unnestVariables)}, + withOrdinality_{ordinalityName.has_value()}, + sources_{source} { + // Calculate output type. First come "replicate" columns, followed by + // "unnest" columns, followed by an optional ordinality column. + std::vector names; + std::vector types; + + for (const auto& variable : replicateVariables_) { + names.emplace_back(variable->name()); + types.emplace_back(variable->type()); + } + + int unnestIndex = 0; + for (const auto& variable : unnestVariables_) { + if (variable->type()->isArray()) { + names.emplace_back(unnestNames[unnestIndex++]); + types.emplace_back(variable->type()->asArray().elementType()); + } else if (variable->type()->isMap()) { + const auto& mapType = variable->type()->asMap(); + + names.emplace_back(unnestNames[unnestIndex++]); + types.emplace_back(mapType.keyType()); + + names.emplace_back(unnestNames[unnestIndex++]); + types.emplace_back(mapType.valueType()); + } else { + VELOX_FAIL( + "Unexpected type of unnest variable. Expected ARRAY or MAP, but got {}.", + variable->type()->toString()); + } + } + + if (ordinalityName.has_value()) { + names.emplace_back(ordinalityName.value()); + types.emplace_back(BIGINT()); + } + outputType_ = ROW(std::move(names), std::move(types)); +} + } // namespace facebook::velox::core diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 6bd56458e1e06..57697580db8b7 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -959,4 +959,68 @@ class LimitNode : public PlanNode { const std::vector> sources_; }; +/// Expands arrays and maps into separate columns. Arrays are expanded into a +/// single column, and maps are expanded into two columns (key, value). Can be +/// used to expand multiple columns. In this case will produce as many rows as +/// the highest cardinality array or map (the other columns are padded with +/// nulls). Optionally can produce an ordinality column that specifies the row +/// number starting with 1. +class UnnestNode : public PlanNode { + public: + /// @param replicateVariables Inputs that are projected as is + /// @param unnestVariables Inputs that are unnested. Must be of type ARRAY or + /// MAP. + /// @param unnestNames Names to use for unnested outputs: one name for each + /// array (element); two names for each map (key and value). The output names + /// must appear in the same order as unnestVariables. + /// @param ordinalityName Optional name for the ordinality columns. If not + /// present, ordinality column is not produced. + UnnestNode( + const PlanNodeId& id, + std::vector> + replicateVariables, + std::vector> unnestVariables, + const std::vector& unnestNames, + const std::optional& ordinalityName, + const std::shared_ptr& source); + + /// The order of columns in the output is: replicated columns (in the order + /// specified), unnested columns (in the order specified, for maps: key comes + /// before value), optional ordinality column. + const RowTypePtr& outputType() const override { + return outputType_; + } + + const std::vector>& sources() const override { + return sources_; + } + + const std::vector>& + replicateVariables() const { + return replicateVariables_; + } + + const std::vector>& + unnestVariables() const { + return unnestVariables_; + } + + bool withOrdinality() const { + return withOrdinality_; + } + + std::string_view name() const override { + return "unnest"; + } + + private: + const std::vector> + replicateVariables_; + const std::vector> + unnestVariables_; + const bool withOrdinality_; + const std::vector> sources_; + RowTypePtr outputType_; +}; + } // namespace facebook::velox::core diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 2d5e87f550f87..a0ec26400dd8d 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -26,6 +26,7 @@ add_library( Operator.cpp OperatorUtils.cpp FilterProject.cpp + Unnest.cpp Values.cpp VectorHasher.cpp PartitionedOutputBufferManager.cpp diff --git a/velox/exec/LocalPlanner.cpp b/velox/exec/LocalPlanner.cpp index 0682a1dafc9fc..8f5e55d655766 100644 --- a/velox/exec/LocalPlanner.cpp +++ b/velox/exec/LocalPlanner.cpp @@ -26,6 +26,7 @@ #include "velox/exec/TableScan.h" #include "velox/exec/TableWriter.h" #include "velox/exec/TopN.h" +#include "velox/exec/Unnest.h" #include "velox/exec/Values.h" namespace facebook::velox::exec { @@ -290,8 +291,12 @@ std::shared_ptr DriverFactory::createDriver( localPartitionNode->outputType(), localPartitionNode->id(), ctx->driverId)); + } else if ( + auto unnest = + std::dynamic_pointer_cast(planNode)) { + operators.push_back(std::make_unique(id, ctx.get(), unnest)); } else { - VELOX_FAIL("Unsupported plan node"); + VELOX_FAIL("Unsupported plan node: {}", planNode->toString()); } } if (consumerSupplier) { diff --git a/velox/exec/Unnest.cpp b/velox/exec/Unnest.cpp new file mode 100644 index 0000000000000..0f42c850c2028 --- /dev/null +++ b/velox/exec/Unnest.cpp @@ -0,0 +1,139 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/Unnest.h" +#include "velox/exec/OperatorUtils.h" + +namespace facebook::velox::exec { +Unnest::Unnest( + int32_t operatorId, + DriverCtx* driverCtx, + const std::shared_ptr& unnestNode) + : Operator( + driverCtx, + unnestNode->outputType(), + operatorId, + unnestNode->id(), + "Unnest") { + if (unnestNode->unnestVariables().size() > 1) { + VELOX_UNSUPPORTED( + "Unnest operator doesn't support multiple unnest columns yet"); + } + + const auto& unnestVariable = unnestNode->unnestVariables()[0]; + if (!unnestVariable->type()->isArray()) { + VELOX_UNSUPPORTED( + "Unnest operator doesn't support non-ARRAY unnest variables yet: {}", + unnestVariable->type()->toString()) + } + + if (unnestNode->withOrdinality()) { + VELOX_UNSUPPORTED("Unnest operator doesn't support ordinality column yet"); + } + + const auto& inputType = unnestNode->sources()[0]->outputType(); + ChannelIndex outputChannel = 0; + for (const auto& variable : unnestNode->replicateVariables()) { + identityProjections_.emplace_back(IdentityProjection{ + inputType->getChildIdx(variable->name()), outputChannel++}); + } + + unnestChannel_ = inputType->getChildIdx(unnestVariable->name()); +} + +void Unnest::addInput(RowVectorPtr input) { + input_ = std::move(input); +} + +RowVectorPtr Unnest::getOutput() { + if (!input_) { + return nullptr; + } + + auto size = input_->size(); + + // Build repeated indices to apply to "replicated" columns. + const auto& unnestVector = input_->childAt(unnestChannel_); + + inputRows_.resize(size); + unnestDecoded_.decode(*unnestVector, inputRows_); + + auto unnestIndices = unnestDecoded_.indices(); + + auto unnestBase = unnestDecoded_.base()->as(); + auto rawSizes = unnestBase->rawSizes(); + auto rawOffsets = unnestBase->rawOffsets(); + + // Count number of elements. + vector_size_t numElements = 0; + for (auto row = 0; row < size; ++row) { + if (!unnestDecoded_.isNullAt(row)) { + numElements += rawSizes[unnestIndices[row]]; + } + } + + // Create "indices" buffer to repeat rows as many times as there are elements + // in the array. + BufferPtr repeatedIndices = + AlignedBuffer::allocate(numElements, pool()); + auto* rawIndices = repeatedIndices->asMutable(); + vector_size_t index = 0; + for (auto row = 0; row < size; ++row) { + if (!unnestDecoded_.isNullAt(row)) { + auto unnestSize = rawSizes[unnestIndices[row]]; + for (auto i = 0; i < unnestSize; i++) { + rawIndices[index++] = row; + } + } + } + + // Wrap "replicated" columns in a dictionary using 'repeatedIndices'. + std::vector outputs(outputType_->size()); + for (const auto& projection : identityProjections_) { + outputs[projection.outputChannel] = wrapChild( + numElements, repeatedIndices, input_->childAt(projection.inputChannel)); + } + + // Make "elements" column. Elements may be out of order. Use a + // dictionary to ensure the right order. + BufferPtr elementIndices = + AlignedBuffer::allocate(numElements, pool()); + auto* rawElementIndices = elementIndices->asMutable(); + index = 0; + bool identityMapping = true; + for (auto row = 0; row < size; ++row) { + if (!unnestDecoded_.isNullAt(row)) { + auto offset = rawOffsets[unnestIndices[row]]; + auto unnestSize = rawSizes[unnestIndices[row]]; + + if (index != offset) { + identityMapping = false; + } + + for (auto i = 0; i < unnestSize; i++) { + rawElementIndices[index++] = offset + i; + } + } + } + + outputs[identityProjections_.size()] = identityMapping + ? unnestBase->elements() + : wrapChild(numElements, elementIndices, unnestBase->elements()); + + input_ = nullptr; + + return std::make_shared( + pool(), outputType_, BufferPtr(nullptr), numElements, std::move(outputs)); +} +} // namespace facebook::velox::exec diff --git a/velox/exec/Unnest.h b/velox/exec/Unnest.h new file mode 100644 index 0000000000000..ef0828a0ccb44 --- /dev/null +++ b/velox/exec/Unnest.h @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include "velox/exec/Operator.h" + +namespace facebook::velox::exec { +class Unnest : public Operator { + public: + Unnest( + int32_t operatorId, + DriverCtx* driverCtx, + const std::shared_ptr& unnestNode); + + BlockingReason isBlocked(ContinueFuture* /*future*/) override { + return BlockingReason::kNotBlocked; + } + + bool needsInput() const override { + return true; + } + + void addInput(RowVectorPtr input) override; + + RowVectorPtr getOutput() override; + + private: + ChannelIndex unnestChannel_; + + SelectivityVector inputRows_; + DecodedVector unnestDecoded_; +}; +} // namespace facebook::velox::exec diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 91c39f933ed39..f7209ff32961f 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -54,7 +54,8 @@ add_executable( HashJoinTest.cpp CastExprTest.cpp PlanNodeToStringTest.cpp - FunctionSignatureBuilderTest.cpp) + FunctionSignatureBuilderTest.cpp + UnnestTest.cpp) add_test( NAME velox_exec_test diff --git a/velox/exec/tests/PlanBuilder.cpp b/velox/exec/tests/PlanBuilder.cpp index cbf5582ae93ca..5e32dd127f754 100644 --- a/velox/exec/tests/PlanBuilder.cpp +++ b/velox/exec/tests/PlanBuilder.cpp @@ -399,6 +399,48 @@ PlanBuilder& PlanBuilder::hashJoin( return *this; } +PlanBuilder& PlanBuilder::unnest( + const std::vector& replicateColumns, + const std::vector& unnestColumns, + const std::optional& ordinalColumn) { + std::vector> + replicateFields; + replicateFields.reserve(replicateColumns.size()); + for (const auto& name : replicateColumns) { + replicateFields.emplace_back(field(name)); + } + + std::vector> unnestFields; + unnestFields.reserve(unnestColumns.size()); + for (const auto& name : unnestColumns) { + unnestFields.emplace_back(field(name)); + } + + std::vector unnestNames; + for (const auto& name : unnestColumns) { + auto input = planNode_->outputType()->findChild(name); + if (input->isArray()) { + unnestNames.push_back(name + "_e"); + } else if (input->isMap()) { + unnestNames.push_back(name + "_k"); + unnestNames.push_back(name + "_v"); + } else { + VELOX_NYI( + "Unsupported type of unnest variable. Expected ARRAY or MAP, but got {}.", + input->toString()); + } + } + + planNode_ = std::make_shared( + nextPlanNodeId(), + replicateFields, + unnestFields, + unnestNames, + ordinalColumn, + planNode_); + return *this; +} + std::string PlanBuilder::nextPlanNodeId() { auto id = fmt::format("{}", planNodeId_); planNodeId_++; @@ -410,6 +452,12 @@ std::shared_ptr PlanBuilder::field( return field(planNode_->outputType(), index); } +std::shared_ptr PlanBuilder::field( + const std::string& name) { + auto index = planNode_->outputType()->getChildIdx(name); + return field(planNode_->outputType(), index); +} + std::shared_ptr PlanBuilder::field( const std::shared_ptr& inputType, int index) { diff --git a/velox/exec/tests/PlanBuilder.h b/velox/exec/tests/PlanBuilder.h index bbce50a04a40e..58ab4808b0454 100644 --- a/velox/exec/tests/PlanBuilder.h +++ b/velox/exec/tests/PlanBuilder.h @@ -124,6 +124,9 @@ class PlanBuilder { std::shared_ptr field(int index); + std::shared_ptr field( + const std::string& name); + PlanBuilder& partitionedOutput( const std::vector& keyIndices, int numPartitions, @@ -148,6 +151,11 @@ class PlanBuilder { const std::vector& output, core::JoinType joinType = core::JoinType::kInner); + PlanBuilder& unnest( + const std::vector& replicateColumns, + const std::vector& unnestColumns, + const std::optional& ordinalColumn = std::nullopt); + const std::shared_ptr& planNode() const { return planNode_; } diff --git a/velox/exec/tests/QueryAssertions.cpp b/velox/exec/tests/QueryAssertions.cpp index 2fe4db3419dad..9d471d9062b3b 100644 --- a/velox/exec/tests/QueryAssertions.cpp +++ b/velox/exec/tests/QueryAssertions.cpp @@ -32,41 +32,66 @@ std::string makeCreateTableSql( if (i > 0) { sql << ", "; } - sql << rowType.nameOf(i) << " " << rowType.childAt(i)->kindName(); + sql << rowType.nameOf(i) << " "; + auto child = rowType.childAt(i); + if (child->isArray()) { + sql << child->asArray().elementType()->kindName() << "[]"; + } else { + sql << child->kindName(); + } } sql << ")"; return sql.str(); } template -void appendNonNullValue( - const VectorPtr& vector, - vector_size_t index, - ::duckdb::Appender& appender) { +::duckdb::Value duckValueAt(const VectorPtr& vector, vector_size_t index) { using T = typename KindToFlatVector::WrapperType; - appender.Append(vector->as>()->valueAt(index)); + return ::duckdb::Value(vector->as>()->valueAt(index)); } template <> -void appendNonNullValue( +::duckdb::Value duckValueAt( const VectorPtr& vector, - vector_size_t index, - ::duckdb::Appender& appender) { + vector_size_t index) { // DuckDB requires zero-ending string auto copy = vector->as>()->valueAt(index).str(); - appender.Append(copy.data()); + return ::duckdb::Value(copy); } template <> -void appendNonNullValue( +::duckdb::Value duckValueAt( const VectorPtr& vector, - vector_size_t index, - ::duckdb::Appender& appender) { + vector_size_t index) { using T = typename KindToFlatVector::WrapperType; - appender.Append<::duckdb::timestamp_t>( + return ::duckdb::Value::TIMESTAMP( veloxTimestampToDuckDB(vector->as>()->valueAt(index))); } +template <> +::duckdb::Value duckValueAt( + const VectorPtr& vector, + int32_t row) { + auto arrayVector = vector->as(); + auto& elements = arrayVector->elements(); + auto offset = arrayVector->offsetAt(row); + auto size = arrayVector->sizeAt(row); + + std::vector<::duckdb::Value> array; + array.reserve(size); + for (auto i = 0; i < size; i++) { + auto innerRow = offset + i; + if (elements->isNullAt(innerRow)) { + array.emplace_back(::duckdb::Value(nullptr)); + } else { + array.emplace_back(VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + duckValueAt, elements->typeKind(), elements, innerRow)); + } + } + + return ::duckdb::Value::LIST(array); +} + template velox::variant variantAt(::duckdb::DataChunk* dataChunk, int32_t row, int32_t column) { @@ -341,7 +366,10 @@ void DuckDbQueryRunner::createTable( con.Query("DROP TABLE IF EXISTS " + name); auto rowType = data[0]->type()->as(); - con.Query(makeCreateTableSql(name, rowType)); + auto res = con.Query(makeCreateTableSql(name, rowType)); + if (!res->success) { + VELOX_FAIL(res->error); + } for (auto& vector : data) { for (int32_t row = 0; row < vector->size(); row++) { @@ -351,13 +379,12 @@ void DuckDbQueryRunner::createTable( auto columnVector = vector->childAt(column); if (columnVector->isNullAt(row)) { appender.Append(nullptr); + } else if (rowType.childAt(column)->isArray()) { + appender.Append(duckValueAt(columnVector, row)); } else { - VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( - appendNonNullValue, - rowType.childAt(column)->kind(), - columnVector, - row, - appender); + auto value = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + duckValueAt, rowType.childAt(column)->kind(), columnVector, row); + appender.Append(value); } } appender.EndRow(); diff --git a/velox/exec/tests/UnnestTest.cpp b/velox/exec/tests/UnnestTest.cpp new file mode 100644 index 0000000000000..85fdd3601b08b --- /dev/null +++ b/velox/exec/tests/UnnestTest.cpp @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/tests/HiveConnectorTestBase.h" +#include "velox/exec/tests/PlanBuilder.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec::test; + +static const std::string kWriter = "UnnestTest.Writer"; + +class UnnestTest : public HiveConnectorTestBase { + protected: + void SetUp() override { + HiveConnectorTestBase::SetUp(); + } +}; + +TEST_F(UnnestTest, basic) { + auto vector = makeRowVector({ + makeFlatVector(100, [](auto row) { return row; }), + vectorMaker_.arrayVector( + 100, + [](auto row) { return 3; }, + [](auto row, auto index) { return index; }), + }); + + createDuckDbTable({vector}); + + // TODO Add tests with arrays of variable size including empty arrays and null + // arrays. This requires better UNNEST support in DuckDB. + + auto op = PlanBuilder().values({vector}).unnest({"c0"}, {"c1"}).planNode(); + assertQuery( + op, + std::vector>{}, + "SELECT c0, x FROM tmp, UNNEST(ARRAY[0, 1, 2]) as t(x)"); +}