From b9c582e83d6835bcdff3c27fe3dc12616e444b5b Mon Sep 17 00:00:00 2001 From: Orri Erling Date: Wed, 20 Nov 2024 13:36:22 -0800 Subject: [PATCH] FEAT: Add runner for local distributed execution Adds LocalRunner and LocalSchema for running distributed Velox plans in-process with multiple fragments and multiple tasks per fragment. This introduces the abstractions for distributed plans, their options, schema and split sources. Adds an extension of PlanBuilder for building plan trees with shuffles. These get partitioned into multiple plan trees for execution with LocalRunner. Adds LocalRunnerTestBase, a test base class with utilities for generating multiple tables each with multiple files of test data. These are then returned as a LocalSchema that can be used to produce splits for LocalRunner. Adds error propagation for TaskCursor since distributed plans may have to produce errors from any fragment that are consumed by the application via TaskCursor. This will be generalized to distributed execution on a cluster with Velox coordination and shuffles. --- .github/workflows/scheduled.yml | 1 + velox/CMakeLists.txt | 4 + velox/connectors/hive/HiveConnectorSplit.h | 108 ++++++ velox/connectors/hive/tests/CMakeLists.txt | 1 + velox/connectors/hive/tests/HiveSplitTest.cpp | 64 ++++ velox/exec/tests/TableScanTest.cpp | 30 +- velox/exec/tests/utils/CMakeLists.txt | 36 +- velox/exec/tests/utils/Cursor.cpp | 20 ++ velox/exec/tests/utils/Cursor.h | 2 + .../tests/utils/DistributedPlanBuilder.cpp | 120 +++++++ .../exec/tests/utils/DistributedPlanBuilder.h | 82 +++++ .../exec/tests/utils/HiveConnectorTestBase.h | 106 +----- .../exec/tests/utils/LocalRunnerTestBase.cpp | 116 +++++++ velox/exec/tests/utils/LocalRunnerTestBase.h | 99 ++++++ velox/exec/tests/utils/PlanBuilder.h | 36 +- velox/runner/CMakeLists.txt | 27 ++ velox/runner/LocalRunner.cpp | 291 ++++++++++++++++ velox/runner/LocalRunner.h | 109 ++++++ velox/runner/LocalSchema.cpp | 321 ++++++++++++++++++ velox/runner/LocalSchema.h | 115 +++++++ velox/runner/MultiFragmentPlan.h | 96 ++++++ velox/runner/Runner.cpp | 59 ++++ velox/runner/Runner.h | 101 ++++++ velox/runner/Schema.h | 168 +++++++++ velox/runner/tests/CMakeLists.txt | 27 ++ velox/runner/tests/LocalRunnerTest.cpp | 175 ++++++++++ 26 files changed, 2198 insertions(+), 116 deletions(-) create mode 100644 velox/connectors/hive/tests/HiveSplitTest.cpp create mode 100644 velox/exec/tests/utils/DistributedPlanBuilder.cpp create mode 100644 velox/exec/tests/utils/DistributedPlanBuilder.h create mode 100644 velox/exec/tests/utils/LocalRunnerTestBase.cpp create mode 100644 velox/exec/tests/utils/LocalRunnerTestBase.h create mode 100644 velox/runner/CMakeLists.txt create mode 100644 velox/runner/LocalRunner.cpp create mode 100644 velox/runner/LocalRunner.h create mode 100644 velox/runner/LocalSchema.cpp create mode 100644 velox/runner/LocalSchema.h create mode 100644 velox/runner/MultiFragmentPlan.h create mode 100644 velox/runner/Runner.cpp create mode 100644 velox/runner/Runner.h create mode 100644 velox/runner/Schema.h create mode 100644 velox/runner/tests/CMakeLists.txt create mode 100644 velox/runner/tests/LocalRunnerTest.cpp diff --git a/.github/workflows/scheduled.yml b/.github/workflows/scheduled.yml index e6cf425881e68..cb72bc6a0b396 100644 --- a/.github/workflows/scheduled.yml +++ b/.github/workflows/scheduled.yml @@ -222,6 +222,7 @@ jobs: run: | python3 -m venv .venv source .venv/bin/activate + python3 -m pip install -e . - name: Create and test new function signatures diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index fe2d23371aa86..f904135355fac 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -58,6 +58,10 @@ add_subdirectory(connectors) if(${VELOX_ENABLE_EXEC}) add_subdirectory(exec) + # Disable runner from pyvelox builds + if(${VELOX_BUILD_RUNNER}) + add_subdirectory(runner) + endif() endif() if(${VELOX_ENABLE_DUCKDB}) diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index a06ffa668e11a..99bca9b309cf4 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -111,4 +111,112 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { static void registerSerDe(); }; +class HiveConnectorSplitBuilder { + public: + explicit HiveConnectorSplitBuilder(std::string filePath) + : filePath_{std::move(filePath)} { + infoColumns_["$path"] = filePath_; + } + + HiveConnectorSplitBuilder& start(uint64_t start) { + start_ = start; + return *this; + } + + HiveConnectorSplitBuilder& length(uint64_t length) { + length_ = length; + return *this; + } + + HiveConnectorSplitBuilder& splitWeight(int64_t splitWeight) { + splitWeight_ = splitWeight; + return *this; + } + + HiveConnectorSplitBuilder& fileFormat(dwio::common::FileFormat format) { + fileFormat_ = format; + return *this; + } + + HiveConnectorSplitBuilder& infoColumn( + const std::string& name, + const std::string& value) { + infoColumns_.emplace(std::move(name), std::move(value)); + return *this; + } + + HiveConnectorSplitBuilder& partitionKey( + std::string name, + std::optional value) { + partitionKeys_.emplace(std::move(name), std::move(value)); + return *this; + } + + HiveConnectorSplitBuilder& tableBucketNumber(int32_t bucket) { + tableBucketNumber_ = bucket; + infoColumns_["$bucket"] = std::to_string(bucket); + return *this; + } + + HiveConnectorSplitBuilder& customSplitInfo( + const std::unordered_map& customSplitInfo) { + customSplitInfo_ = customSplitInfo; + return *this; + } + + HiveConnectorSplitBuilder& extraFileInfo( + const std::shared_ptr& extraFileInfo) { + extraFileInfo_ = extraFileInfo; + return *this; + } + + HiveConnectorSplitBuilder& serdeParameters( + const std::unordered_map& serdeParameters) { + serdeParameters_ = serdeParameters; + return *this; + } + + HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) { + connectorId_ = connectorId; + return *this; + } + + HiveConnectorSplitBuilder& fileProperties(FileProperties fileProperties) { + fileProperties_ = fileProperties; + return *this; + } + + std::shared_ptr build() const { + return std::make_shared( + connectorId_, + filePath_, + fileFormat_, + start_, + length_, + partitionKeys_, + tableBucketNumber_, + customSplitInfo_, + extraFileInfo_, + serdeParameters_, + splitWeight_, + infoColumns_, + fileProperties_); + } + + private: + const std::string filePath_; + dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF}; + uint64_t start_{0}; + uint64_t length_{std::numeric_limits::max()}; + std::unordered_map> partitionKeys_; + std::optional tableBucketNumber_; + std::unordered_map customSplitInfo_ = {}; + std::shared_ptr extraFileInfo_ = {}; + std::unordered_map serdeParameters_ = {}; + std::unordered_map infoColumns_ = {}; + std::string connectorId_; + int64_t splitWeight_{0}; + std::optional fileProperties_; +}; + } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/tests/CMakeLists.txt b/velox/connectors/hive/tests/CMakeLists.txt index f4235cfa13c05..ad3a373dab119 100644 --- a/velox/connectors/hive/tests/CMakeLists.txt +++ b/velox/connectors/hive/tests/CMakeLists.txt @@ -21,6 +21,7 @@ add_executable( HiveConnectorSerDeTest.cpp HivePartitionFunctionTest.cpp HivePartitionUtilTest.cpp + HiveSplitTest.cpp PartitionIdGeneratorTest.cpp TableHandleTest.cpp) add_test(velox_hive_connector_test velox_hive_connector_test) diff --git a/velox/connectors/hive/tests/HiveSplitTest.cpp b/velox/connectors/hive/tests/HiveSplitTest.cpp new file mode 100644 index 0000000000000..91412caaee2d6 --- /dev/null +++ b/velox/connectors/hive/tests/HiveSplitTest.cpp @@ -0,0 +1,64 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "gtest/gtest.h" +#include "velox/common/config/Config.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" + +using namespace facebook::velox; +using namespace facebook::velox::connector::hive; + +TEST(HiveSplitTest, builder) { + FileProperties properties = {11, 1111}; + auto extra = std::make_shared("extra file info"); + std::unordered_map custom; + custom["custom1"] = "customValue1"; + std::unordered_map serde; + serde["serde1"] = "serdeValue1"; + auto split = HiveConnectorSplitBuilder("filepath") + .start(100) + .length(100000) + .splitWeight(1) + .fileFormat(dwio::common::FileFormat::DWRF) + .infoColumn("info1", "infoValue1") + .partitionKey("DS", "2024-11-01") + .tableBucketNumber(11) + .customSplitInfo(custom) + .extraFileInfo(extra) + .serdeParameters(serde) + .connectorId("connectorId") + .fileProperties(properties) + .build(); + + EXPECT_EQ(100, split->start); + EXPECT_EQ(100000, split->length); + EXPECT_EQ(1, split->splitWeight); + EXPECT_TRUE(dwio::common::FileFormat::DWRF == split->fileFormat); + EXPECT_EQ("infoValue1", split->infoColumns["info1"]); + auto it = split->partitionKeys.find("DS"); + EXPECT_TRUE(it != split->partitionKeys.end()); + EXPECT_EQ("2024-11-01", it->second.value()); + EXPECT_EQ(11, split->tableBucketNumber.value()); + EXPECT_EQ("customValue1", split->customSplitInfo["custom1"]); + EXPECT_EQ(std::string("extra file info"), *split->extraFileInfo); + EXPECT_EQ("serdeValue1", split->serdeParameters["serde1"]); + EXPECT_EQ("connectorId", split->connectorId); + EXPECT_EQ( + properties.fileSize.value(), split->properties.value().fileSize.value()); + EXPECT_EQ( + properties.modificationTime.value(), + split->properties.value().modificationTime.value()); +} diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index dc74ef0fb3b64..42cee37afb365 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -180,7 +180,7 @@ class TableScanTest : public virtual HiveConnectorTestBase { const std::string& filePath, const TypePtr& partitionType, const std::optional& partitionValue) { - auto split = HiveConnectorSplitBuilder(filePath) + auto split = exec::test::HiveConnectorSplitBuilder(filePath) .partitionKey("pkey", partitionValue) .build(); auto outputType = @@ -411,7 +411,7 @@ TEST_F(TableScanTest, partitionKeyAlias) { {"a", regularColumn("c0", BIGINT())}, {"ds_alias", partitionKey("ds", VARCHAR())}}; - auto split = HiveConnectorSplitBuilder(filePath->getPath()) + auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("ds", "2021-12-02") .build(); @@ -1806,7 +1806,8 @@ TEST_F(TableScanTest, splitOffsetAndLength) { } TEST_F(TableScanTest, fileNotFound) { - auto split = HiveConnectorSplitBuilder("/path/to/nowhere.orc").build(); + auto split = + exec::test::HiveConnectorSplitBuilder("/path/to/nowhere.orc").build(); auto assertMissingFile = [&](bool ignoreMissingFiles) { AssertQueryBuilder(tableScanNode()) .connectorSessionProperty( @@ -1829,7 +1830,7 @@ TEST_F(TableScanTest, validFileNoData) { auto filePath = facebook::velox::test::getDataFilePath( "velox/exec/tests", "data/emptyPresto.dwrf"); - auto split = HiveConnectorSplitBuilder(filePath) + auto split = exec::test::HiveConnectorSplitBuilder(filePath) .start(0) .length(fs::file_size(filePath) / 2) .build(); @@ -1949,7 +1950,7 @@ TEST_F(TableScanTest, partitionedTableDateKey) { // Test partition filter on date column. { - auto split = HiveConnectorSplitBuilder(filePath->getPath()) + auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("pkey", partitionValue) .build(); auto outputType = ROW({"pkey", "c0", "c1"}, {DATE(), BIGINT(), DOUBLE()}); @@ -2851,9 +2852,10 @@ TEST_F(TableScanTest, bucket) { writeToFile(filePaths[i]->getPath(), rowVector); rowVectors.emplace_back(rowVector); - splits.emplace_back(HiveConnectorSplitBuilder(filePaths[i]->getPath()) - .tableBucketNumber(bucket) - .build()); + splits.emplace_back( + exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath()) + .tableBucketNumber(bucket) + .build()); } createDuckDbTable(rowVectors); @@ -2877,7 +2879,7 @@ TEST_F(TableScanTest, bucket) { for (int i = 0; i < buckets.size(); ++i) { int bucketValue = buckets[i]; - auto hsplit = HiveConnectorSplitBuilder(filePaths[i]->getPath()) + auto hsplit = exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath()) .tableBucketNumber(bucketValue) .build(); @@ -2897,7 +2899,7 @@ TEST_F(TableScanTest, bucket) { // Filter on bucket column, but don't project it out auto rowTypes = ROW({"c0", "c1"}, {INTEGER(), BIGINT()}); - hsplit = HiveConnectorSplitBuilder(filePaths[i]->getPath()) + hsplit = exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath()) .tableBucketNumber(bucketValue) .build(); op = PlanBuilder() @@ -4168,7 +4170,7 @@ TEST_F(TableScanTest, reuseRowVector) { .tableScan(rowType, {}, "c0 < 5") .project({"c1.c0"}) .planNode(); - auto split = HiveConnectorSplitBuilder(file->getPath()).build(); + auto split = exec::test::HiveConnectorSplitBuilder(file->getPath()).build(); auto expected = makeRowVector( {makeFlatVector(10, [](auto i) { return i % 5; })}); AssertQueryBuilder(plan).splits({split, split}).assertResults(expected); @@ -4749,7 +4751,7 @@ TEST_F(TableScanTest, varbinaryPartitionKey) { {"a", regularColumn("c0", BIGINT())}, {"ds_alias", partitionKey("ds", VARBINARY())}}; - auto split = HiveConnectorSplitBuilder(filePath->getPath()) + auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("ds", "2021-12-02") .build(); @@ -4788,7 +4790,7 @@ TEST_F(TableScanTest, timestampPartitionKey) { ColumnHandleMap assignments = {{"t", partitionKey("t", TIMESTAMP())}}; std::vector> splits; for (auto& t : inputs) { - splits.push_back(HiveConnectorSplitBuilder(filePath->getPath()) + splits.push_back(exec::test::HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("t", t) .build()); } @@ -4807,7 +4809,7 @@ TEST_F(TableScanTest, partitionKeyNotMatchPartitionKeysHandle) { writeToFile(filePath->getPath(), vectors); createDuckDbTable(vectors); - auto split = HiveConnectorSplitBuilder(filePath->getPath()) + auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("ds", "2021-12-02") .build(); diff --git a/velox/exec/tests/utils/CMakeLists.txt b/velox/exec/tests/utils/CMakeLists.txt index c2d227410c1e7..294493367012a 100644 --- a/velox/exec/tests/utils/CMakeLists.txt +++ b/velox/exec/tests/utils/CMakeLists.txt @@ -17,11 +17,26 @@ add_library(velox_temp_path TempFilePath.cpp TempDirectoryPath.cpp) target_link_libraries( velox_temp_path velox_exception) +add_library(velox_cursor Cursor.cpp) +target_link_libraries( + velox_cursor + velox_core + velox_exception + velox_expression + velox_dwio_common + velox_dwio_dwrf_reader + velox_dwio_dwrf_writer + velox_type_fbhive + velox_hive_connector + velox_tpch_connector + velox_presto_serializer + velox_functions_prestosql + velox_aggregates) + add_library( velox_exec_test_lib AssertQueryBuilder.cpp ArbitratorTestUtil.cpp - Cursor.cpp HiveConnectorTestBase.cpp LocalExchangeSource.cpp OperatorTestBase.cpp @@ -37,6 +52,7 @@ target_link_libraries( velox_exec_test_lib velox_vector_test_lib velox_temp_path + velox_cursor velox_core velox_exception velox_expression @@ -53,3 +69,21 @@ target_link_libraries( velox_presto_serializer velox_functions_prestosql velox_aggregates) + +if(${VELOX_BUILD_RUNNER}) + add_library(velox_exec_runner_test_util DistributedPlanBuilder.cpp + LocalRunnerTestBase.cpp) + + target_link_libraries( + velox_exec_runner_test_util + velox_temp_path + velox_exec_test_lib + velox_exec + velox_file_test_utils + velox_hive_connector + velox_tpch_connector + velox_dwio_parquet_reader + velox_dwio_parquet_writer + velox_local_runner) + +endif() diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index 1aed8908206b9..ad7b4133c6c73 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -275,6 +275,10 @@ class MultiThreadedTaskCursor : public TaskCursorBase { /// Starts the task if not started yet. bool moveNext() override { start(); + if (error_) { + std::rethrow_exception(error_); + } + current_ = queue_->dequeue(); if (task_->error()) { // Wait for the task to finish (there's' a small period of time between @@ -302,6 +306,13 @@ class MultiThreadedTaskCursor : public TaskCursorBase { return current_; } + void setError(std::exception_ptr error) override { + error_ = error; + if (task_) { + task_->setError(error); + } + } + const std::shared_ptr& task() override { return task_; } @@ -316,6 +327,7 @@ class MultiThreadedTaskCursor : public TaskCursorBase { std::shared_ptr task_; RowVectorPtr current_; bool atEnd_{false}; + std::exception_ptr error_; }; class SingleThreadedTaskCursor : public TaskCursorBase { @@ -391,6 +403,13 @@ class SingleThreadedTaskCursor : public TaskCursorBase { return current_; } + void setError(std::exception_ptr error) override { + error_ = error; + if (task_) { + task_->setError(error); + } + } + const std::shared_ptr& task() override { return task_; } @@ -399,6 +418,7 @@ class SingleThreadedTaskCursor : public TaskCursorBase { std::shared_ptr task_; RowVectorPtr current_; RowVectorPtr next_; + std::exception_ptr error_; }; std::unique_ptr TaskCursor::create(const CursorParameters& params) { diff --git a/velox/exec/tests/utils/Cursor.h b/velox/exec/tests/utils/Cursor.h index 314eb89587609..9c8cfaf7d7fa0 100644 --- a/velox/exec/tests/utils/Cursor.h +++ b/velox/exec/tests/utils/Cursor.h @@ -139,6 +139,8 @@ class TaskCursor { virtual RowVectorPtr& current() = 0; + virtual void setError(std::exception_ptr error) = 0; + virtual const std::shared_ptr& task() = 0; }; diff --git a/velox/exec/tests/utils/DistributedPlanBuilder.cpp b/velox/exec/tests/utils/DistributedPlanBuilder.cpp new file mode 100644 index 0000000000000..fddb79a6f3caf --- /dev/null +++ b/velox/exec/tests/utils/DistributedPlanBuilder.cpp @@ -0,0 +1,120 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/tests/utils/DistributedPlanBuilder.h" + +namespace facebook::velox::exec::test { + +DistributedPlanBuilder::DistributedPlanBuilder( + const runner::MultiFragmentPlan::Options& options, + std::shared_ptr planNodeIdGenerator, + memory::MemoryPool* pool) + : PlanBuilder(planNodeIdGenerator, pool), options_(options), root_(this) { + root_->stack_.push_back(this); + newFragment(); + current_->width = options_.numWorkers; +} + +DistributedPlanBuilder::DistributedPlanBuilder(DistributedPlanBuilder& root) + : PlanBuilder(root.planNodeIdGenerator(), root.pool()), + options_(root.options_), + root_(&root) { + root_->stack_.push_back(this); + newFragment(); + current_->width = options_.numWorkers; +} + +std::vector DistributedPlanBuilder::fragments() { + newFragment(); + return std::move(fragments_); +} + +void DistributedPlanBuilder::newFragment() { + if (current_) { + gatherScans(planNode_); + current_->fragment = core::PlanFragment(std::move(planNode_)); + fragments_.push_back(std::move(*current_)); + } + current_ = std::make_unique( + fmt::format("{}.{}", options_.queryId, root_->fragmentCounter_++)); + planNode_ = nullptr; +} + +PlanBuilder& DistributedPlanBuilder::shuffle( + const std::vector& partitionKeys, + int numPartitions, + bool replicateNullsAndAny, + const std::vector& outputLayout) { + partitionedOutput( + partitionKeys, numPartitions, replicateNullsAndAny, outputLayout); + auto* output = + dynamic_cast(planNode_.get()); + VELOX_CHECK_NOT_NULL(output); + auto producerPrefix = current_->taskPrefix; + newFragment(); + current_->width = numPartitions; + exchange(output->outputType(), VectorSerde::Kind::kPresto); + auto* exchange = dynamic_cast(planNode_.get()); + VELOX_CHECK_NOT_NULL(exchange); + current_->inputStages.push_back( + runner::InputStage{exchange->id(), producerPrefix}); + return *this; +} + +core::PlanNodePtr DistributedPlanBuilder::shuffleResult( + const std::vector& partitionKeys, + int numPartitions, + bool replicateNullsAndAny, + const std::vector& outputLayout) { + partitionedOutput( + partitionKeys, numPartitions, replicateNullsAndAny, outputLayout); + auto* output = + dynamic_cast(planNode_.get()); + VELOX_CHECK_NOT_NULL(output); + const auto producerPrefix = current_->taskPrefix; + auto result = planNode_; + newFragment(); + root_->stack_.pop_back(); + auto* consumer = root_->stack_.back(); + if (consumer->current_->width != 0) { + VELOX_CHECK_EQ( + numPartitions, + consumer->current_->width, + "The consumer width should match the producer fanout"); + } else { + consumer->current_->width = numPartitions; + } + + for (auto& fragment : fragments_) { + root_->fragments_.push_back(std::move(fragment)); + } + exchange(output->outputType(), VectorSerde::Kind::kPresto); + auto* exchange = dynamic_cast(planNode_.get()); + consumer->current_->inputStages.push_back( + runner::InputStage{exchange->id(), producerPrefix}); + return std::move(planNode_); +} + +void DistributedPlanBuilder::gatherScans(const core::PlanNodePtr& plan) { + if (auto scan = std::dynamic_pointer_cast(plan)) { + current_->scans.push_back(scan); + return; + } + for (auto& source : plan->sources()) { + gatherScans(source); + } +} +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/DistributedPlanBuilder.h b/velox/exec/tests/utils/DistributedPlanBuilder.h new file mode 100644 index 0000000000000..ba091ef8c66a6 --- /dev/null +++ b/velox/exec/tests/utils/DistributedPlanBuilder.h @@ -0,0 +1,82 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/runner/MultiFragmentPlan.h" + +namespace facebook::velox::exec::test { + +/// Builder for distributed plans. Adds a shuffle() and related +/// methods for building PartitionedOutput-Exchange pairs between +/// fragments. Not thread safe. +class DistributedPlanBuilder : public PlanBuilder { + public: + /// Constructs a top level DistributedPlanBuilder. + DistributedPlanBuilder( + const runner::MultiFragmentPlan::Options& options, + std::shared_ptr planNodeIdGenerator, + memory::MemoryPool* pool = nullptr); + + /// Constructs a child builder. Used for branching plans, e.g. the subplan for + /// a join build side. + DistributedPlanBuilder(DistributedPlanBuilder& root); + + /// Returns the planned fragments. The builder will be empty after this. This + /// is only called on the root builder. + std::vector fragments(); + + PlanBuilder& shuffle( + const std::vector& keys, + int numPartitions, + bool replicateNullsAndAny, + const std::vector& outputLayout = {}) override; + + core::PlanNodePtr shuffleResult( + const std::vector& keys, + int numPartitions, + bool replicateNullsAndAny, + const std::vector& outputLayout = {}) override; + + private: + void newFragment(); + + void gatherScans(const core::PlanNodePtr& plan); + + const runner::MultiFragmentPlan::Options& options_; + DistributedPlanBuilder* const root_; + + // Stack of outstanding builders. The last element is the immediately + // enclosing one. When returning an ExchangeNode from returnShuffle, the stack + // is used to establish the linkage between the fragment of the returning + // builder and the fragment current in the calling builder. Only filled in the + // root builder. + std::vector stack_; + + // Fragment counter. Only used in root builder. + int32_t fragmentCounter_{0}; + + // The fragment being built. Will be moved to the root builder's 'fragments_' + // when complete. + std::unique_ptr current_; + + // The fragments gathered under this builder. Moved to the root builder when + // returning the subplan. + std::vector fragments_; +}; + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index 74dd5223307b9..a53b2634ef4b5 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -223,109 +223,15 @@ class HiveConnectorTestBase : public OperatorTestBase { } }; -class HiveConnectorSplitBuilder { +/// Same as connector::hive::HiveConnectorBuilder, except that this defaults +/// connectorId to kHiveConnectorId. +class HiveConnectorSplitBuilder + : public connector::hive::HiveConnectorSplitBuilder { public: explicit HiveConnectorSplitBuilder(std::string filePath) - : filePath_{std::move(filePath)} { - infoColumns_["$path"] = filePath_; + : connector::hive::HiveConnectorSplitBuilder(filePath) { + connectorId(kHiveConnectorId); } - - HiveConnectorSplitBuilder& start(uint64_t start) { - start_ = start; - return *this; - } - - HiveConnectorSplitBuilder& length(uint64_t length) { - length_ = length; - return *this; - } - - HiveConnectorSplitBuilder& splitWeight(int64_t splitWeight) { - splitWeight_ = splitWeight; - return *this; - } - - HiveConnectorSplitBuilder& fileFormat(dwio::common::FileFormat format) { - fileFormat_ = format; - return *this; - } - - HiveConnectorSplitBuilder& infoColumn( - const std::string& name, - const std::string& value) { - infoColumns_.emplace(std::move(name), std::move(value)); - return *this; - } - - HiveConnectorSplitBuilder& partitionKey( - std::string name, - std::optional value) { - partitionKeys_.emplace(std::move(name), std::move(value)); - return *this; - } - - HiveConnectorSplitBuilder& tableBucketNumber(int32_t bucket) { - tableBucketNumber_ = bucket; - infoColumns_["$bucket"] = std::to_string(bucket); - return *this; - } - - HiveConnectorSplitBuilder& customSplitInfo( - const std::unordered_map& customSplitInfo) { - customSplitInfo_ = customSplitInfo; - return *this; - } - - HiveConnectorSplitBuilder& extraFileInfo( - const std::shared_ptr& extraFileInfo) { - extraFileInfo_ = extraFileInfo; - return *this; - } - - HiveConnectorSplitBuilder& serdeParameters( - const std::unordered_map& serdeParameters) { - serdeParameters_ = serdeParameters; - return *this; - } - - HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) { - connectorId_ = connectorId; - return *this; - } - - std::shared_ptr build() const { - static const std::unordered_map customSplitInfo; - static const std::shared_ptr extraFileInfo; - static const std::unordered_map serdeParameters; - return std::make_shared( - connectorId_, - filePath_, - fileFormat_, - start_, - length_, - partitionKeys_, - tableBucketNumber_, - customSplitInfo, - extraFileInfo, - serdeParameters, - splitWeight_, - infoColumns_, - std::nullopt); - } - - private: - const std::string filePath_; - dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF}; - uint64_t start_{0}; - uint64_t length_{std::numeric_limits::max()}; - std::unordered_map> partitionKeys_; - std::optional tableBucketNumber_; - std::unordered_map customSplitInfo_ = {}; - std::shared_ptr extraFileInfo_ = {}; - std::unordered_map serdeParameters_ = {}; - std::unordered_map infoColumns_ = {}; - std::string connectorId_ = kHiveConnectorId; - int64_t splitWeight_{0}; }; } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/LocalRunnerTestBase.cpp b/velox/exec/tests/utils/LocalRunnerTestBase.cpp new file mode 100644 index 0000000000000..c0572217644b9 --- /dev/null +++ b/velox/exec/tests/utils/LocalRunnerTestBase.cpp @@ -0,0 +1,116 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/tests/utils/LocalRunnerTestBase.h" +namespace facebook::velox::exec::test { + +void LocalRunnerTestBase::SetUp() { + HiveConnectorTestBase::SetUp(); + exec::ExchangeSource::factories().clear(); + exec::ExchangeSource::registerFactory(createLocalExchangeSource); + ensureTestData(); +} + +std::shared_ptr LocalRunnerTestBase::makeQueryCtx( + const std::string& queryId, + memory::MemoryPool* rootPool) { + auto config = config_; + auto hiveConfig = hiveConfig_; + std::unordered_map> + connectorConfigs; + auto copy = hiveConfig_; + connectorConfigs[kHiveConnectorId] = + std::make_shared(std::move(copy)); + + return core::QueryCtx::create( + schemaExecutor_.get(), + core::QueryConfig(config), + std::move(connectorConfigs), + cache::AsyncDataCache::getInstance(), + rootPool->shared_from_this(), + nullptr, + queryId); +} + +void LocalRunnerTestBase::ensureTestData() { + if (!files_) { + makeTables(testTables_, files_); + } + makeSchema(); + splitSourceFactory_ = + std::make_shared(schema_, 2); +} + +void LocalRunnerTestBase::makeSchema() { + auto schemaQueryCtx = makeQueryCtx("schema", rootPool_.get()); + common::SpillConfig spillConfig; + common::PrefixSortConfig prefixSortConfig(100, 130); + auto leafPool = schemaQueryCtx->pool()->addLeafChild("schemaReader"); + auto connectorQueryCtx = std::make_shared( + leafPool.get(), + schemaQueryCtx->pool(), + schemaQueryCtx->connectorSessionProperties(kHiveConnectorId), + &spillConfig, + prefixSortConfig, + std::make_unique( + schemaQueryCtx.get(), schemaPool_.get()), + schemaQueryCtx->cache(), + "scan_for_schema", + "schema", + "N/a", + 0, + schemaQueryCtx->queryConfig().sessionTimezone()); + auto connector = connector::getConnector(kHiveConnectorId); + schema_ = std::make_shared( + files_->getPath(), + dwio::common::FileFormat::DWRF, + reinterpret_cast(connector.get()), + connectorQueryCtx); +} + +void LocalRunnerTestBase::makeTables( + std::vector specs, + std::shared_ptr& directory) { + directory = exec::test::TempDirectoryPath::create(); + for (auto& spec : specs) { + auto tablePath = fmt::format("{}/{}", directory->getPath(), spec.name); + auto fs = filesystems::getFileSystem(tablePath, {}); + fs->mkdir(tablePath); + for (auto i = 0; i < spec.numFiles; ++i) { + auto vectors = HiveConnectorTestBase::makeVectors( + spec.columns, spec.numVectorsPerFile, spec.rowsPerVector); + if (spec.customizeData) { + for (auto& vector : vectors) { + spec.customizeData(vector); + } + } + writeToFile(fmt::format("{}/f{}", tablePath, i), vectors); + } + } +} + +std::vector readCursor( + std::shared_ptr runner) { + // 'result' borrows memory from cursor so the life cycle must be shorter. + std::vector result; + + while (auto rows = runner->next()) { + result.push_back(rows); + } + return result; +} + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/LocalRunnerTestBase.h b/velox/exec/tests/utils/LocalRunnerTestBase.h new file mode 100644 index 0000000000000..4558a4bdd6ffa --- /dev/null +++ b/velox/exec/tests/utils/LocalRunnerTestBase.h @@ -0,0 +1,99 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/exec/ExchangeSource.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/LocalExchangeSource.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/runner/LocalRunner.h" + +namespace facebook::velox::exec::test { + +struct TableSpec { + std::string name; + RowTypePtr columns; + int32_t rowsPerVector{10000}; + int32_t numVectorsPerFile{5}; + int32_t numFiles{5}; + + /// Function Applied to generated RowVectors for the table before writing. + /// May be used to insert non-random data on top of the random datafrom + /// HiveConnectorTestBase::makeVectors. + std::function customizeData; +}; + +/// Test helper class that manages a TestCase with a set of generated tables and +/// a LocalSchema and LocalSplitSource covering the test data. The lifetime of +/// the test data is the test case consisting of multiple TEST_F's. +class LocalRunnerTestBase : public HiveConnectorTestBase { + protected: + static void SetUpTestCase() { + HiveConnectorTestBase::SetUpTestCase(); + schemaExecutor_ = std::make_unique(4); + } + + static void TearDownTestCase() { + files_.reset(); + HiveConnectorTestBase::TearDownTestCase(); + } + + void SetUp() override; + + void ensureTestData(); + void makeSchema(); + + void makeTables( + std::vector specs, + std::shared_ptr& directory); + + std::shared_ptr splitSourceFactory( + const runner::LocalSchema& schema); + + // Creates a QueryCtx with 'pool'. 'pool' must be a root pool. + static std::shared_ptr makeQueryCtx( + const std::string& queryId, + memory::MemoryPool* pool); + + // Configs for creating QueryCtx. + inline static std::unordered_map config_; + inline static std::unordered_map hiveConfig_; + + // The specification of the test data. The data is created in ensureTestData() + // called from each SetUp()(. + inline static std::vector testTables_; + + // The top level directory with the test data. + inline static std::shared_ptr files_; + inline static std::unique_ptr schemaExecutor_; + + // The schema built from the data in 'files_'. + std::shared_ptr schema_; + + // Split source factory for making SplitSources that range over tables inside + // 'files_'. + std::shared_ptr splitSourceFactory_; + + // Leaf pool for schema. + std::shared_ptr schemaPool_; +}; + +/// Reads all results from 'runner'. +std::vector readCursor( + std::shared_ptr runner); + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index f380ef4f03c7d..5ccf3a2167756 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -105,6 +105,8 @@ class PlanBuilder { planNodeIdGenerator_{std::move(planNodeIdGenerator)}, pool_{pool} {} + virtual ~PlanBuilder() = default; + static constexpr const std::string_view kHiveDefaultConnectorId{"test-hive"}; static constexpr const std::string_view kTpchDefaultConnectorId{"test-tpch"}; @@ -1029,7 +1031,9 @@ class PlanBuilder { return *this; } - /// Return the latest plan node, e.g. the root node of the plan tree. + /// Return the latest plan node, e.g. the root node of the plan + /// tree. The DistributedPlanBuilder override additionally moves stage + /// information to a parent PlanBuilder. const core::PlanNodePtr& planNode() const { return planNode_; } @@ -1054,6 +1058,28 @@ class PlanBuilder { return *this; } + /// In a DistributedPlanBuilder, introduces a shuffle boundary. The plan so + /// far is shuffled and subsequent nodes consume the shuffle. Arguments are as + /// in partitionedOutput(). + virtual PlanBuilder& shuffle( + const std::vector& keys, + int numPartitions, + bool replicateNullsAndAny, + const std::vector& outputLayout = {}) { + VELOX_UNSUPPORTED("Needs DistributedPlanBuilder"); + } + + /// In a DistributedPlanBuilder, returns an Exchange on top of the plan built + /// so far and couples it to the current stage in the enclosing builder. + /// Arguments are as in shuffle(). + virtual core::PlanNodePtr shuffleResult( + const std::vector& keys, + int numPartitions, + bool replicateNullsAndAny, + const std::vector& outputLayout = {}) { + VELOX_UNSUPPORTED("Needs DistributedPlanBuilder"); + } + protected: // Users who create custom operators might want to extend the PlanBuilder to // customize extended plan builders. Those functions are needed in such @@ -1063,6 +1089,14 @@ class PlanBuilder { std::shared_ptr inferTypes( const core::ExprPtr& untypedExpr); + std::shared_ptr planNodeIdGenerator() const { + return planNodeIdGenerator_; + } + + memory::MemoryPool* pool() const { + return pool_; + } + private: std::shared_ptr field(column_index_t index); diff --git a/velox/runner/CMakeLists.txt b/velox/runner/CMakeLists.txt new file mode 100644 index 0000000000000..bbc716e7f541d --- /dev/null +++ b/velox/runner/CMakeLists.txt @@ -0,0 +1,27 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_subdirectory(tests) + +velox_add_library(velox_local_runner LocalRunner.cpp LocalSchema.cpp Runner.cpp) + +velox_link_libraries( + velox_local_runner + velox_common_base + velox_memory + velox_hive_connector + velox_dwio_common + velox_dwio_dwrf_writer + velox_exec + velox_cursor) diff --git a/velox/runner/LocalRunner.cpp b/velox/runner/LocalRunner.cpp new file mode 100644 index 0000000000000..d18a325284167 --- /dev/null +++ b/velox/runner/LocalRunner.cpp @@ -0,0 +1,291 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/runner/LocalRunner.h" +#include "velox/common/time/Timer.h" + +#include "velox/connectors/hive/HiveConnectorSplit.h" + +namespace facebook::velox::runner { +namespace { +std::shared_ptr remoteSplit( + const std::string& taskId) { + return std::make_shared(taskId); +} +} // namespace + +RowVectorPtr LocalRunner::next() { + if (!cursor_) { + start(); + } + bool hasNext = cursor_->moveNext(); + if (!hasNext) { + state_ = State::kFinished; + return nullptr; + } + return cursor_->current(); +} + +void LocalRunner::start() { + VELOX_CHECK_EQ(state_, State::kInitialized); + auto lastStage = makeStages(); + params_.planNode = plan_->fragments().back().fragment.planNode; + auto cursor = exec::test::TaskCursor::create(params_); + stages_.push_back({cursor->task()}); + // Add table scan splits to the final gathere stage. + for (auto& scan : fragments_.back().scans) { + auto source = splitSourceFactory_->splitSourceForScan(*scan); + for (;;) { + auto split = source->next(0); + if (!split.hasConnectorSplit()) { + break; + } + cursor->task()->addSplit(scan->id(), std::move(split)); + } + cursor->task()->noMoreSplits(scan->id()); + } + // If the plan only has the final gather stage, there are no shuffles between + // the last + // and previous stages to set up. + if (!lastStage.empty()) { + const auto finalStageConsumer = + fragments_.back().inputStages[0].consumerNodeId; + for (auto& remote : lastStage) { + cursor->task()->addSplit(finalStageConsumer, exec::Split(remote)); + } + cursor->task()->noMoreSplits(finalStageConsumer); + } + { + std::lock_guard l(mutex_); + if (!error_) { + cursor_ = std::move(cursor); + state_ = State::kRunning; + } + } + if (!cursor_) { + // The cursor was not set because previous fragments had an error. + abort(); + std::rethrow_exception(error_); + } +} + +void LocalRunner::abort() { + // If called without previous error, we set the error to be cancellation. + if (!error_) { + try { + state_ = State::kCancelled; + VELOX_FAIL("Query cancelled"); + } catch (const std::exception& e) { + error_ = std::current_exception(); + } + } + VELOX_CHECK(state_ != State::kInitialized); + // Setting errors is thred safe. The stages do not change after + // initialization. + for (auto& stage : stages_) { + for (auto& task : stage) { + task->setError(error_); + } + } + if (cursor_) { + cursor_->setError(error_); + } +} + +void LocalRunner::waitForCompletion(int32_t maxWaitUs) { + VELOX_CHECK_NE(state_, State::kInitialized); + std::vector futures; + { + std::lock_guard l(mutex_); + for (auto& stage : stages_) { + for (auto& task : stage) { + futures.push_back(task->taskDeletionFuture()); + } + stage.clear(); + } + } + auto startTime = getCurrentTimeMicro(); + for (auto& future : futures) { + auto& executor = folly::QueuedImmediateExecutor::instance(); + if (getCurrentTimeMicro() - startTime > maxWaitUs) { + VELOX_FAIL("LocalRunner did not finish within {} us", maxWaitUs); + } + std::move(future) + .within(std::chrono::microseconds(maxWaitUs)) + .via(&executor) + .wait(); + } +} + +std::vector> +LocalRunner::makeStages() { + std::unordered_map stageMap; + auto sharedRunner = shared_from_this(); + auto onError = [self = sharedRunner, this](std::exception_ptr error) { + { + std::lock_guard l(mutex_); + if (error_) { + return; + } + state_ = State::kError; + error_ = error; + } + if (cursor_) { + abort(); + } + }; + + for (auto fragmentIndex = 0; fragmentIndex < fragments_.size() - 1; + ++fragmentIndex) { + auto& fragment = fragments_[fragmentIndex]; + stageMap[fragment.taskPrefix] = stages_.size(); + stages_.emplace_back(); + for (auto i = 0; i < fragment.width; ++i) { + exec::Consumer consumer = nullptr; + auto task = exec::Task::create( + fmt::format( + "local://{}/{}.{}", + params_.queryCtx->queryId(), + fragment.taskPrefix, + i), + fragment.fragment, + i, + params_.queryCtx, + exec::Task::ExecutionMode::kParallel, + consumer, + onError); + stages_.back().push_back(task); + if (fragment.numBroadcastDestinations) { + // TODO: Add support for Arbitrary partition type. + task->updateOutputBuffers(fragment.numBroadcastDestinations, true); + } + task->start(options_.numDrivers); + } + } + + for (auto fragmentIndex = 0; fragmentIndex < fragments_.size() - 1; + ++fragmentIndex) { + auto& fragment = fragments_[fragmentIndex]; + for (auto& scan : fragment.scans) { + auto source = splitSourceFactory_->splitSourceForScan(*scan); + bool allDone = false; + do { + for (auto i = 0; i < stages_[fragmentIndex].size(); ++i) { + auto split = source->next(i); + if (!split.hasConnectorSplit()) { + allDone = true; + break; + } + stages_[fragmentIndex][i]->addSplit(scan->id(), std::move(split)); + } + } while (!allDone); + } + for (auto& scan : fragment.scans) { + for (auto i = 0; i < stages_[fragmentIndex].size(); ++i) { + stages_[fragmentIndex][i]->noMoreSplits(scan->id()); + } + } + + for (auto& input : fragment.inputStages) { + const auto sourceStage = stageMap[input.producerTaskPrefix]; + std::vector> sourceSplits; + for (auto i = 0; i < stages_[sourceStage].size(); ++i) { + sourceSplits.push_back(remoteSplit(stages_[sourceStage][i]->taskId())); + } + for (auto& task : stages_[fragmentIndex]) { + for (auto& remote : sourceSplits) { + task->addSplit(input.consumerNodeId, exec::Split(remote)); + } + task->noMoreSplits(input.consumerNodeId); + } + } + } + if (stages_.empty()) { + return {}; + } + std::vector> lastStage; + for (auto& task : stages_.back()) { + lastStage.push_back(remoteSplit(task->taskId())); + } + return lastStage; +} + +exec::Split LocalSplitSource::next(int32_t /*worker*/) { + if (currentFile_ >= static_cast(table_->files().size())) { + return exec::Split(); + } + + if (currentSplit_ >= fileSplits_.size()) { + fileSplits_.clear(); + ++currentFile_; + if (currentFile_ >= table_->files().size()) { + return exec::Split(); + } + + currentSplit_ = 0; + auto filePath = table_->files()[currentFile_]; + const auto fileSize = fs::file_size(filePath); + // Take the upper bound. + const int splitSize = std::ceil((fileSize) / splitsPerFile_); + for (int i = 0; i < splitsPerFile_; ++i) { + fileSplits_.push_back( + connector::hive::HiveConnectorSplitBuilder(filePath) + .connectorId(table_->schema()->connector()->connectorId()) + .fileFormat(table_->format()) + .start(i * splitSize) + .length(splitSize) + .build()); + } + } + return exec::Split(std::move(fileSplits_[currentSplit_++])); +} + +std::unique_ptr LocalSplitSourceFactory::splitSourceForScan( + const core::TableScanNode& tableScan) { + auto* tableHandle = dynamic_cast( + tableScan.tableHandle().get()); + VELOX_CHECK_NOT_NULL(tableHandle); + auto* table = reinterpret_cast( + schema_->findTable(tableHandle->tableName())); + + return std::make_unique(table, splitsPerFile_); +} + +std::vector LocalRunner::stats() const { + std::vector result; + std::lock_guard l(mutex_); + for (auto i = 0; i < stages_.size(); ++i) { + auto& tasks = stages_[i]; + VELOX_CHECK(!tasks.empty()); + auto stats = tasks[0]->taskStats(); + for (auto j = 1; j < tasks.size(); ++j) { + auto moreStats = tasks[j]->taskStats(); + for (auto pipeline = 0; pipeline < stats.pipelineStats.size(); + ++pipeline) { + for (auto op = 0; + op < stats.pipelineStats[pipeline].operatorStats.size(); + ++op) { + stats.pipelineStats[pipeline].operatorStats[op].add( + moreStats.pipelineStats[pipeline].operatorStats[op]); + } + } + } + result.push_back(std::move(stats)); + } + return result; +} + +} // namespace facebook::velox::runner diff --git a/velox/runner/LocalRunner.h b/velox/runner/LocalRunner.h new file mode 100644 index 0000000000000..f5d3bb5c211fc --- /dev/null +++ b/velox/runner/LocalRunner.h @@ -0,0 +1,109 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/connectors/Connector.h" +#include "velox/exec/Exchange.h" +#include "velox/exec/tests/utils/Cursor.h" +#include "velox/runner/LocalSchema.h" +#include "velox/runner/MultiFragmentPlan.h" +#include "velox/runner/Runner.h" + +namespace facebook::velox::runner { + +/// Runner for in-process execution of a distributed plan. +class LocalRunner : public Runner, + public std::enable_shared_from_this { + public: + LocalRunner( + MultiFragmentPlanPtr plan, + std::shared_ptr queryCtx, + std::shared_ptr splitSourceFactory) + : plan_(std::move(plan)), + fragments_(plan_->fragments()), + options_(plan_->options()), + splitSourceFactory_(std::move(splitSourceFactory)) { + params_.queryCtx = std::move(queryCtx); + } + + RowVectorPtr next() override; + + std::vector stats() const override; + + void abort() override; + + void waitForCompletion(int32_t maxWaitMicros) override; + + State state() const override { + return state_; + } + + private: + void start(); + + // Creates all stages except for the single worker final consumer stage. + std::vector> makeStages(); + + // Serializes 'cursor_' and 'error_'. + mutable std::mutex mutex_; + + const MultiFragmentPlanPtr plan_; + const std::vector fragments_; + const MultiFragmentPlan::Options& options_; + const std::shared_ptr splitSourceFactory_; + + exec::test::CursorParameters params_; + + tsan_atomic state_{State::kInitialized}; + + std::unique_ptr cursor_; + std::vector>> stages_; + std::exception_ptr error_; +}; + +/// Split source that produces splits from a LocalSchema. +class LocalSplitSource : public SplitSource { + public: + LocalSplitSource(const LocalTable* table, int32_t splitsPerFile) + : table_(table), splitsPerFile_(splitsPerFile) {} + + exec::Split next(int32_t worker) override; + + private: + const LocalTable* const table_; + const int32_t splitsPerFile_; + + std::vector> fileSplits_; + int32_t currentFile_{-1}; + int32_t currentSplit_{0}; +}; + +class LocalSplitSourceFactory : public SplitSourceFactory { + public: + LocalSplitSourceFactory( + std::shared_ptr schema, + int32_t splitsPerFile) + : schema_(std::move(schema)), splitsPerFile_(splitsPerFile) {} + + std::unique_ptr splitSourceForScan( + const core::TableScanNode& scan) override; + + private: + const std::shared_ptr schema_; + const int32_t splitsPerFile_; +}; + +} // namespace facebook::velox::runner diff --git a/velox/runner/LocalSchema.cpp b/velox/runner/LocalSchema.cpp new file mode 100644 index 0000000000000..b35b50b416df8 --- /dev/null +++ b/velox/runner/LocalSchema.cpp @@ -0,0 +1,321 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/runner/LocalSchema.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/dwio/common/BufferedInput.h" +#include "velox/dwio/common/Reader.h" +#include "velox/dwio/common/ReaderFactory.h" + +namespace facebook::velox::runner { + +LocalSchema::LocalSchema( + const std::string& path, + dwio::common::FileFormat format, + connector::hive::HiveConnector* hiveConnector, + std::shared_ptr ctx) + : Schema(path, ctx->memoryPool()), + hiveConnector_(hiveConnector), + connectorId_(hiveConnector_->connectorId()), + connectorQueryCtx_(ctx), + format_(format) { + initialize(path); +} + +void LocalSchema::initialize(const std::string& path) { + for (auto const& dirEntry : fs::directory_iterator{path}) { + if (!dirEntry.is_directory() || + dirEntry.path().filename().c_str()[0] == '.') { + continue; + } + loadTable(dirEntry.path().filename(), dirEntry.path()); + } +} +// Feeds the values in 'vector' into 'builder'. +template +void addStats( + velox::dwrf::StatisticsBuilder* builder, + const BaseVector& vector) { + auto* typedVector = vector.asUnchecked>(); + for (auto i = 0; i < typedVector->size(); ++i) { + if (!typedVector->isNullAt(i)) { + reinterpret_cast(builder)->addValues(typedVector->valueAt(i)); + } + } +} + +std::pair LocalTable::sample( + float pct, + const std::vector& fields, + velox::connector::hive::SubfieldFilters filters, + const velox::core::TypedExprPtr& remainingFilter, + HashStringAllocator* /*allocator*/, + std::vector>* + statsBuilders) { + dwrf::StatisticsBuilderOptions options( + /*stringLengthLimit=*/100, /*initialSize=*/0); + std::vector> builders; + auto tableHandle = std::make_shared( + schema_->connector()->connectorId(), + name_, + true, + std::move(filters), + remainingFilter); + + std::unordered_map< + std::string, + std::shared_ptr> + columnHandles; + std::vector names; + std::vector types; + for (auto& field : fields) { + auto& path = field.path(); + auto column = + dynamic_cast(path[0].get()) + ->name(); + const auto idx = type_->getChildIdx(column); + names.push_back(type_->nameOf(idx)); + types.push_back(type_->childAt(idx)); + columnHandles[names.back()] = + std::make_shared( + names.back(), + connector::hive::HiveColumnHandle::ColumnType::kRegular, + types.back(), + types.back()); + switch (types.back()->kind()) { + case TypeKind::BIGINT: + case TypeKind::INTEGER: + case TypeKind::SMALLINT: + builders.push_back( + std::make_unique(options)); + break; + case TypeKind::REAL: + case TypeKind::DOUBLE: + builders.push_back( + std::make_unique(options)); + break; + case TypeKind::VARCHAR: + builders.push_back( + std::make_unique(options)); + break; + + default: + builders.push_back(nullptr); + } + } + + const auto outputType = ROW(std::move(names), std::move(types)); + int64_t passingRows = 0; + int64_t scannedRows = 0; + for (auto& file : files_) { + auto dataSource = schema_->connector()->createDataSource( + outputType, + tableHandle, + columnHandles, + schema_->connectorQueryCtx().get()); + + auto split = connector::hive::HiveConnectorSplitBuilder(file) + .fileFormat(format_) + .connectorId(schema_->connector()->connectorId()) + .build(); + dataSource->addSplit(split); + constexpr int32_t kBatchSize = 1000; + for (;;) { + ContinueFuture ignore{ContinueFuture::makeEmpty()}; + + auto data = dataSource->next(kBatchSize, ignore).value(); + if (data == nullptr) { + break; + } + passingRows += data->size(); + for (auto column = 0; column < builders.size(); ++column) { + if (!builders[column]) { + continue; + } + auto* builder = builders[column].get(); + auto loadChild = [](RowVectorPtr data, int32_t column) { + data->childAt(column) = + BaseVector::loadedVectorShared(data->childAt(column)); + }; + switch (type_->childAt(column)->kind()) { + case TypeKind::SMALLINT: + loadChild(data, column); + addStats( + builder, *data->childAt(column)); + break; + case TypeKind::INTEGER: + loadChild(data, column); + addStats( + builder, *data->childAt(column)); + break; + case TypeKind::BIGINT: + loadChild(data, column); + addStats( + builder, *data->childAt(column)); + break; + case TypeKind::REAL: + loadChild(data, column); + addStats( + builder, *data->childAt(column)); + break; + case TypeKind::DOUBLE: + loadChild(data, column); + addStats( + builder, *data->childAt(column)); + break; + case TypeKind::VARCHAR: + loadChild(data, column); + addStats( + builder, *data->childAt(column)); + break; + + default: + break; + } + } + break; + } + scannedRows += dataSource->getCompletedRows(); + if (scannedRows > numRows_ * (pct / 100)) { + break; + } + } + if (statsBuilders) { + *statsBuilders = std::move(builders); + } + return std::pair(scannedRows, passingRows); +} + +void LocalSchema::loadTable( + const std::string& tableName, + const fs::path& tablePath) { + // open each file in the directory and check their type and add up the row + // counts. + RowTypePtr tableType; + LocalTable* table = nullptr; + + for (auto const& dirEntry : fs::directory_iterator{tablePath}) { + if (!dirEntry.is_regular_file()) { + continue; + } + // Ignore hidden files. + if (dirEntry.path().filename().c_str()[0] == '.') { + continue; + } + auto it = tables_.find(tableName); + if (it != tables_.end()) { + table = reinterpret_cast(it->second.get()); + } else { + tables_[tableName] = + std::make_unique(tableName, format_, this); + table = reinterpret_cast(tables_[tableName].get()); + } + dwio::common::ReaderOptions readerOptions{pool_}; + readerOptions.setFileFormat(format_); + auto input = std::make_unique( + std::make_shared(dirEntry.path().string()), + readerOptions.memoryPool()); + std::unique_ptr reader = + dwio::common::getReaderFactory(readerOptions.fileFormat()) + ->createReader(std::move(input), readerOptions); + const auto fileType = reader->rowType(); + if (!tableType) { + tableType = fileType; + } else if (fileType->size() > tableType->size()) { + // The larger type is the later since there is only addition of columns. + // TODO: Check the column types are compatible where they overlap. + tableType = fileType; + } + const auto rows = reader->numberOfRows(); + + if (rows.has_value()) { + table->numRows_ += rows.value(); + } + for (auto i = 0; i < fileType->size(); ++i) { + auto name = fileType->nameOf(i); + LocalColumn* column; + auto columnIt = table->columns().find(name); + if (columnIt != table->columns().end()) { + column = columnIt->second.get(); + } else { + table->columns()[name] = + std::make_unique(name, fileType->childAt(i)); + column = table->columns()[name].get(); + } + // Initialize the stats from the first file. + if (column->stats() == nullptr) { + column->setStats(reader->columnStatistics(i)); + } + } + table->files_.push_back(dirEntry.path()); + } + VELOX_CHECK_NOT_NULL(table, "Table directory {} is empty", tablePath); + + table->setType(tableType); + table->sampleNumDistincts(2, pool_); +} + +void LocalTable::sampleNumDistincts(float samplePct, memory::MemoryPool* pool) { + std::vector fields; + for (auto i = 0; i < type_->size(); ++i) { + fields.push_back(common::Subfield(type_->nameOf(i))); + } + + // Sample the table. Adjust distinct values according to the samples. + auto allocator = std::make_unique(pool); + std::vector> statsBuilders; + auto [sampled, passed] = + sample(samplePct, fields, {}, nullptr, allocator.get(), &statsBuilders); + numSampledRows_ = sampled; + for (auto i = 0; i < statsBuilders.size(); ++i) { + if (statsBuilders[i]) { + // TODO: Use HLL estimate of distinct values here after this is added to + // the stats builder. Now assume that all rows have a different value. + // Later refine this by observed min-max range. + int64_t approxNumDistinct = numRows_; + // For tiny tables the sample is 100% and the approxNumDistinct is + // accurate. For partial samples, the distinct estimate is left to be the + // distinct estimate of the sample if there are few distincts. This is an + // enumeration where values in unsampled rows are likely the same. If + // there are many distincts, we multiply by 1/sample rate assuming that + // unsampled rows will mostly have new values. + + if (numSampledRows_ < numRows_) { + if (approxNumDistinct > sampled / 50) { + float numDups = + numSampledRows_ / static_cast(approxNumDistinct); + approxNumDistinct = std::min(numRows_, numRows_ / numDups); + + // If the type is an integer type, num distincts cannot be larger than + // max - min. + if (auto* ints = dynamic_cast( + statsBuilders[i].get())) { + auto min = ints->getMinimum(); + auto max = ints->getMaximum(); + if (min.has_value() && max.has_value()) { + auto range = max.value() - min.value(); + approxNumDistinct = std::min(approxNumDistinct, range); + } + } + } + + columns()[type_->nameOf(i)]->setNumDistinct(approxNumDistinct); + } + } + } +} + +} // namespace facebook::velox::runner diff --git a/velox/runner/LocalSchema.h b/velox/runner/LocalSchema.h new file mode 100644 index 0000000000000..628dd71c738fe --- /dev/null +++ b/velox/runner/LocalSchema.h @@ -0,0 +1,115 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/base/Fs.h" +#include "velox/common/memory/HashStringAllocator.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/TableHandle.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/dwrf/writer/StatisticsBuilder.h" +#include "velox/runner/Schema.h" + +namespace facebook::velox::runner { + +class LocalColumn : public Column { + public: + LocalColumn(const std::string& name, TypePtr type) : Column(name, type) {} + + friend class LocalSchema; +}; + +class LocalSchema; + +class LocalTable : public Table { + public: + LocalTable( + const std::string& name, + dwio::common::FileFormat format, + Schema* schema) + : Table(name, format, schema) {} + + std::unordered_map>& columns() { + return columns_; + } + + LocalSchema* schema() const { + return reinterpret_cast(schema_); + } + + void setType(const RowTypePtr& type) { + type_ = type; + } + + std::pair sample( + float pct, + const std::vector& columns, + connector::hive::SubfieldFilters filters, + const core::TypedExprPtr& remainingFilter, + HashStringAllocator* allocator = nullptr, + std::vector>* statsBuilders = + nullptr) override; + + /// Samples 'samplePct' % rows of the table and sets the num distincts + /// estimate for the columns. uses 'pool' for temporary data. + void sampleNumDistincts(float samplePct, memory::MemoryPool* pool); + + const std::vector& files() const { + return files_; + } + + private: + // All columns. Filled by loadTable(). + std::unordered_map> columns_; + + std::vector files_; + int64_t numRows_{0}; + int64_t numSampledRows_{0}; + + friend class LocalSchema; +}; + +class LocalSchema : public Schema { + public: + /// 'path' is the directory containing a subdirectory per table. + LocalSchema( + const std::string& path, + dwio::common::FileFormat format, + connector::hive::HiveConnector* hiveConector, + std::shared_ptr ctx); + + connector::Connector* connector() const override { + return hiveConnector_; + } + + const std::shared_ptr& connectorQueryCtx() + const override { + return connectorQueryCtx_; + } + + private: + void initialize(const std::string& path); + + void loadTable(const std::string& tableName, const fs::path& tablePath); + + connector::hive::HiveConnector* const hiveConnector_; + const std::string connectorId_; + const std::shared_ptr connectorQueryCtx_; + const dwio::common::FileFormat format_; +}; + +} // namespace facebook::velox::runner diff --git a/velox/runner/MultiFragmentPlan.h b/velox/runner/MultiFragmentPlan.h new file mode 100644 index 0000000000000..3bd79907a9236 --- /dev/null +++ b/velox/runner/MultiFragmentPlan.h @@ -0,0 +1,96 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/exec/Task.h" + +namespace facebook::velox::runner { + +/// Describes an exchange source for an ExchangeNode a non-leaf stage. +struct InputStage { + // Id of ExchangeNode in the consumer fragment. + core::PlanNodeId consumerNodeId; + + /// Task prefix of producer stage. + std::string producerTaskPrefix; +}; + +/// Describes a fragment of a distributed plan. This allows a run +/// time to distribute fragments across workers and to set up +/// exchanges. A complete plan is a vector of these with the last +/// being the fragment that gathers results from the complete +/// plan. Different runtimes, e.g. local, streaming or +/// materialized shuffle can use this to describe exchange +/// parallel execution. Decisions on number of workers, location +/// of workers and mode of exchange are up to the runtime. +struct ExecutableFragment { + explicit ExecutableFragment(const std::string& taskPrefix) + : taskPrefix(taskPrefix) {} + std::string taskPrefix; + int32_t width{0}; + velox::core::PlanFragment fragment; + + /// Source fragments and Exchange node ids for remote shuffles producing input + /// for 'this'. + std::vector inputStages; + + /// Table scan nodes in 'this'. + std::vector> scans; + int32_t numBroadcastDestinations{0}; +}; + +/// Describes a distributed plan handed to a Runner for parallel/distributed +/// execution. The last element of 'fragments' is by convention the stage that +/// gathers the query result. Otherwise the order of 'fragments' is not +/// important since the producer-consumer relations are given by 'inputStages' +/// in each fragment. +class MultiFragmentPlan { + public: + /// Describes options for running a MultiFragmentPlan. + struct Options { + /// Query id used as a prefix for tasks ids. + std::string queryId; + + // Maximum Number of independent Tasks for one stage of execution. If 1, + // there are no exchanges. + int32_t numWorkers; + + // Number of threads in a fragment in a worker. If 1, there are no local + // exchanges. + int32_t numDrivers; + }; + + MultiFragmentPlan(std::vector fragments, Options options) + : fragments_(std::move(fragments)), options_(std::move(options)) {} + + const std::vector& fragments() const { + return fragments_; + } + + const Options& options() const { + return options_; + } + + std::string toString() const; + + private: + const std::vector fragments_; + const Options options_; +}; + +using MultiFragmentPlanPtr = std::shared_ptr; + +} // namespace facebook::velox::runner diff --git a/velox/runner/Runner.cpp b/velox/runner/Runner.cpp new file mode 100644 index 0000000000000..d4f18c29f404e --- /dev/null +++ b/velox/runner/Runner.cpp @@ -0,0 +1,59 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/runner/Runner.h" + +namespace facebook::velox::runner { + +// static +std::string Runner::stateString(Runner::State state) { + switch (state) { + case Runner::State::kInitialized: + return "initialized"; + case Runner::State::kRunning: + return "running"; + case Runner::State::kCancelled: + return "cancelled"; + case Runner::State::kError: + return "error"; + case Runner::State::kFinished: + return "finished"; + } + return fmt::format("invalid state {}", static_cast(state)); +} + +std::string MultiFragmentPlan::toString() const { + std::stringstream out; + for (auto i = 0; i < fragments_.size(); ++i) { + out << fmt::format( + "Fragment {}: {} numWorkers={}:\n", + i, + fragments_[i].taskPrefix, + fragments_[i].width); + out << fragments_[i].fragment.planNode->toString(true, true) << std::endl; + if (!fragments_[i].inputStages.empty()) { + out << "Inputs: "; + for (auto& input : fragments_[i].inputStages) { + out << fmt::format( + " {} <- {} ", input.consumerNodeId, input.producerTaskPrefix); + } + out << std::endl; + } + } + return out.str(); +} + +} // namespace facebook::velox::runner diff --git a/velox/runner/Runner.h b/velox/runner/Runner.h new file mode 100644 index 0000000000000..e570a0df1cf26 --- /dev/null +++ b/velox/runner/Runner.h @@ -0,0 +1,101 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/connectors/Connector.h" +#include "velox/exec/Exchange.h" +#include "velox/exec/tests/utils/Cursor.h" +#include "velox/runner/LocalSchema.h" +#include "velox/runner/MultiFragmentPlan.h" + +/// Base classes for multifragment Velox query execution. +namespace facebook::velox::runner { + +/// Iterator for obtaining splits for a scan. One is created for each table +/// scan. +class SplitSource { + public: + virtual ~SplitSource() = default; + /// Returns a split for 'worker'. This may implement soft affinity or strict + /// bucket to worker mapping. + virtual exec::Split next(int32_t worker) = 0; +}; + +/// A factory for getting a SplitSource for each TableScan. The splits produced +/// may depend on partition keys, buckets etc mentioned by each tableScan. +class SplitSourceFactory { + public: + virtual ~SplitSourceFactory() = default; + + /// Returns a splitSource for one TableScan across all Tasks of + /// the fragment. The source will be invoked to produce splits for + /// each individual worker running the scan. + virtual std::unique_ptr splitSourceForScan( + const core::TableScanNode& scan) = 0; +}; + +/// Base class for executing multifragment Velox queries. One instance +/// of a Runner coordinates the execution of one multifragment +/// query. Different derived classes can support different shuffles +/// and different scheduling either in process or in a cluster. Unless +/// otherwise stated, the member functions are thread safe as long as +/// the caller holds an owning reference to the runner. +class Runner { + public: + enum class State { kInitialized, kRunning, kFinished, kError, kCancelled }; + + static std::string stateString(Runner::State state); + + virtual ~Runner() = default; + + /// Returns the next batch of results. Returns nullptr when no more results. + /// Throws any execution time errors. The result is allocated in the pool of + /// QueryCtx given to the Runner implementation. The caller is responsible for + /// serializing calls from different threads. + virtual RowVectorPtr next() = 0; + + /// Returns Task stats for each fragment of the plan. The stats correspond 1:1 + /// to the stages in the MultiFragmentPlan. This may be called at any time. + /// before waitForCompletion() or abort(). + virtual std::vector stats() const = 0; + + /// Returns the state of execution. + virtual State state() const = 0; + + /// Cancels the possibly pending execution. Returns immediately, thus before + /// the execution is actually finished. Use waitForCompletion() to wait for + /// all execution resources to be freed. May be called from any thread without + /// serialization. + virtual void abort() = 0; + + /// Waits up to 'maxWaitMicros' for all activity of the execution to cease. + /// This is used in tests to ensure that all pools are empty and unreferenced + /// before teradown. + + virtual void waitForCompletion(int32_t maxWaitMicros) = 0; +}; + +} // namespace facebook::velox::runner + +template <> +struct fmt::formatter + : formatter { + auto format(facebook::velox::runner::Runner::State state, format_context& ctx) + const { + return formatter::format( + facebook::velox::runner::Runner::stateString(state), ctx); + } +}; diff --git a/velox/runner/Schema.h b/velox/runner/Schema.h new file mode 100644 index 0000000000000..a0d6f35dd9c49 --- /dev/null +++ b/velox/runner/Schema.h @@ -0,0 +1,168 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/base/Fs.h" +#include "velox/common/memory/HashStringAllocator.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/TableHandle.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/dwrf/writer/StatisticsBuilder.h" + +/// Base classes for schema elements used in execution. A Schema is a collection +/// of Tables. A Table is a collection of Columns. Tables and Columns have +/// statistics and Tables can be sampled. Derived classes connect to different +/// metadata stores and provide different metadata, e.g. order, partitioning, +/// bucketing etc. +namespace facebook::velox::runner { + +/// Base class for column. The column's name and type are immutable but the +/// stats may be set multiple times. +class Column { + public: + virtual ~Column() = default; + + Column(const std::string& name, TypePtr type) : name_(name), type_(type) {} + + dwio::common::ColumnStatistics* stats() const { + return latestStats_; + } + + /// Sets statistics. May be called multipl times if table contents change. + void setStats(std::unique_ptr stats) { + std::lock_guard l(mutex_); + allStats_.push_back(std::move(stats)); + latestStats_ = allStats_.back().get(); + } + + const std::string& name() const { + return name_; + } + + const TypePtr& type() const { + return type_; + } + + void setNumDistinct(int64_t numDistinct) { + approxNumDistinct_ = numDistinct; + } + + protected: + const std::string name_; + const TypePtr type_; + + // The latest element added to 'allStats_'. + tsan_atomic latestStats_{nullptr}; + + // All statistics recorded for this column. Old values can be purged when the + // containing Schema is not in use. + std::vector> allStats_; + + // Latest approximate count of distinct values. + std::optional approxNumDistinct_; + + private: + // Serializes changes to statistics. + std::mutex mutex_; +}; + +class Schema; + +/// Base class for table. This is used to identify a table for purposes of +/// Split generation, statistics, sampling etc. +class Table { + public: + virtual ~Table() = default; + + Table( + const std::string& name, + dwio::common::FileFormat format, + Schema* schema) + : schema_(schema), name_(name), format_(format) {} + + const std::string& name() const { + return name_; + } + + const RowTypePtr& rowType() const { + return type_; + } + + dwio::common::FileFormat format() const { + return format_; + } + + /// Samples 'pct' percent of rows for 'fields'. Applies 'filters' + /// before sampling. Returns {count of sampled, count matching filters}. + /// Returns statistics for the post-filtering values in 'stats' for each of + /// 'fields'. If 'fields' is empty, simply returns the number of + /// rows matching 'filter' in a sample of 'pct'% of the table. + /// + /// TODO: Introduce generic statistics builder in dwio/common. + virtual std::pair sample( + float pct, + const std::vector& columns, + connector::hive::SubfieldFilters filters, + const core::TypedExprPtr& remainingFilter, + HashStringAllocator* allocator = nullptr, + std::vector>* statsBuilders = + nullptr) { + VELOX_UNSUPPORTED("Table class does not support sampling."); + } + + protected: + Schema* const schema_; + const std::string name_; + + const dwio::common::FileFormat format_; + + // Discovered from data. In the event of different types, we take the + // latest (i.e. widest) table type. + RowTypePtr type_; +}; + +/// Base class for collection of tables. A query executes against a +/// Schema and its tables and columns are resolved against the +/// Schema. The schema is mutable and may acquire tables and the +/// tables may acquire stats during their lifetime. +class Schema { + public: + virtual ~Schema() = default; + + Schema(const std::string& name, memory::MemoryPool* pool) + : name_(name), pool_(std::move(pool)) {} + + Table* findTable(const std::string& name) { + auto it = tables_.find(name); + VELOX_CHECK(it != tables_.end(), "Table {} not found", name); + return it->second.get(); + } + + virtual connector::Connector* connector() const = 0; + + virtual const std::shared_ptr& + connectorQueryCtx() const = 0; + + protected: + const std::string name_; + + memory::MemoryPool* const pool_; + + std::unordered_map> tables_; +}; + +} // namespace facebook::velox::runner diff --git a/velox/runner/tests/CMakeLists.txt b/velox/runner/tests/CMakeLists.txt new file mode 100644 index 0000000000000..d9e8315d9b0f9 --- /dev/null +++ b/velox/runner/tests/CMakeLists.txt @@ -0,0 +1,27 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_local_runner_test LocalRunnerTest.cpp Main.cpp) + +add_test(velox_local_runner_test velox_local_runner_test) + +target_link_libraries( + velox_local_runner_test + velox_exec_runner_test_util + velox_exec_test_lib + velox_dwio_common + velox_dwio_dwrf_proto + velox_parse_parser + velox_parse_expression + GTest::gtest) diff --git a/velox/runner/tests/LocalRunnerTest.cpp b/velox/runner/tests/LocalRunnerTest.cpp new file mode 100644 index 0000000000000..ffd05cdc83f00 --- /dev/null +++ b/velox/runner/tests/LocalRunnerTest.cpp @@ -0,0 +1,175 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/tests/utils/DistributedPlanBuilder.h" +#include "velox/exec/tests/utils/LocalRunnerTestBase.h" +#include "velox/exec/tests/utils/QueryAssertions.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::runner; +using namespace facebook::velox::exec::test; + +class LocalRunnerTest : public LocalRunnerTestBase { + protected: + static constexpr int32_t kNumFiles = 5; + static constexpr int32_t kNumVectors = 5; + static constexpr int32_t kRowsPerVector = 10000; + static constexpr int32_t kNumRows = kNumFiles * kNumVectors * kRowsPerVector; + + static void SetUpTestCase() { + // The lambdas will be run after this scope returns, so make captures + // static. + static int32_t counter1; + // Clear 'counter1' so that --gtest_repeat runs get the same data. + counter1 = 0; + auto customize1 = [&](const RowVectorPtr& rows) { + makeAscending(rows, counter1); + }; + + static int32_t counter2; + counter2 = 0; + auto customize2 = [&](const RowVectorPtr& rows) { + makeAscending(rows, counter2); + }; + + rowType_ = ROW({"c0"}, {BIGINT()}); + testTables_ = { + TableSpec{ + .name = "T", + .columns = rowType_, + .rowsPerVector = kRowsPerVector, + .numVectorsPerFile = kNumVectors, + .numFiles = kNumFiles, + .customizeData = customize1}, + TableSpec{ + .name = "U", + .columns = rowType_, + .rowsPerVector = kRowsPerVector, + .numVectorsPerFile = kNumVectors, + .numFiles = kNumFiles, + .customizeData = customize2}}; + + // Creates the data and schema from 'testTables_'. These are created on the + // first test fixture initialization. + LocalRunnerTestBase::SetUpTestCase(); + } + + // Returns a plan with a table scan. This is a single stage if 'numWorkers' is + // 1, otherwise this is a scan stage plus shuffle to a stage that gathers the + // scan results. + MultiFragmentPlanPtr makeScanPlan(const std::string& id, int32_t numWorkers) { + MultiFragmentPlan::Options options = { + .queryId = id, .numWorkers = numWorkers, .numDrivers = 2}; + const int32_t width = 3; + + DistributedPlanBuilder rootBuilder(options, idGenerator_, pool_.get()); + rootBuilder.tableScan("T", rowType_); + if (numWorkers > 1) { + rootBuilder.shuffle({}, 1, false); + } + return std::make_shared( + rootBuilder.fragments(), std::move(options)); + } + + MultiFragmentPlanPtr makeJoinPlan(std::string project = "c0") { + MultiFragmentPlan::Options options = { + .queryId = "test.", .numWorkers = 4, .numDrivers = 2}; + const int32_t width = 3; + + DistributedPlanBuilder rootBuilder(options, idGenerator_, pool_.get()); + rootBuilder.tableScan("T", rowType_) + .project({project}) + .shuffle({"c0"}, 3, false) + .hashJoin( + {"c0"}, + {"b0"}, + DistributedPlanBuilder(rootBuilder) + .tableScan("U", rowType_) + .project({"c0 as b0"}) + .shuffleResult({"b0"}, width, false), + "", + {"c0", "b0"}) + .shuffle({}, 1, false) + .finalAggregation({}, {"count(1)"}, {{BIGINT()}}); + return std::make_shared( + rootBuilder.fragments(), std::move(options)); + } + + static void makeAscending(const RowVectorPtr& rows, int32_t& counter) { + auto ints = rows->childAt(0)->as>(); + for (auto i = 0; i < ints->size(); ++i) { + ints->set(i, counter + i); + } + counter += ints->size(); + } + + void checkScanCount(const std::string& id, int32_t numWorkers) { + auto scan = makeScanPlan(id, numWorkers); + auto localRunner = std::make_shared( + std::move(scan), + makeQueryCtx("q1", rootPool_.get()), + splitSourceFactory_); + auto results = readCursor(localRunner); + + int32_t count = 0; + for (auto& rows : results) { + count += rows->size(); + } + localRunner->waitForCompletion(5000); + EXPECT_EQ(250'000, count); + } + + std::shared_ptr idGenerator_{ + std::make_shared()}; + // The below are declared static to be scoped to TestCase so as to reuse the + // dataset between tests. + + inline static RowTypePtr rowType_; +}; + +TEST_F(LocalRunnerTest, count) { + auto join = makeJoinPlan(); + auto localRunner = std::make_shared( + std::move(join), + makeQueryCtx("q1", rootPool_.get()), + splitSourceFactory_); + auto results = readCursor(localRunner); + auto stats = localRunner->stats(); + EXPECT_EQ(1, results.size()); + EXPECT_EQ(1, results[0]->size()); + EXPECT_EQ( + kNumRows, results[0]->childAt(0)->as>()->valueAt(0)); + results.clear(); + EXPECT_EQ(Runner::State::kFinished, localRunner->state()); + localRunner->waitForCompletion(5000); +} + +TEST_F(LocalRunnerTest, error) { + auto join = makeJoinPlan("if (c0 = 111, c0 / 0, c0 + 1) as c0"); + auto localRunner = std::make_shared( + std::move(join), + makeQueryCtx("q1", rootPool_.get()), + splitSourceFactory_); + EXPECT_THROW(readCursor(localRunner), VeloxUserError); + EXPECT_EQ(Runner::State::kError, localRunner->state()); + localRunner->waitForCompletion(5000); +} + +TEST_F(LocalRunnerTest, scan) { + checkScanCount("s1", 1); + checkScanCount("s2", 3); +}