Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce physical plan and add switch #4820

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Flash/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ include(${TiFlash_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(flash_service .)
add_headers_and_sources(flash_service ./Coprocessor)
add_headers_and_sources(flash_service ./Mpp)
add_headers_and_sources(flash_service ./Planner)
add_headers_and_sources(flash_service ./Planner/plans)
add_headers_and_sources(flash_service ./Statistics)

add_library(flash_service ${flash_service_headers} ${flash_service_sources})
Expand Down
25 changes: 19 additions & 6 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Planner/Planner.h>
#include <Interpreters/Context.h>

namespace DB
Expand Down Expand Up @@ -62,12 +63,24 @@ BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block)
BlockInputStreams child_streams = executeQueryBlock(*child);
input_streams_vec.push_back(child_streams);
}
DAGQueryBlockInterpreter query_block_interpreter(
context,
input_streams_vec,
query_block,
max_streams);
return query_block_interpreter.execute();
if (context.getSettingsRef().enable_planner && Planner::isSupported(query_block))
{
Planner planner(
windtalker marked this conversation as resolved.
Show resolved Hide resolved
context,
input_streams_vec,
query_block,
max_streams);
return planner.execute();
}
else
{
DAGQueryBlockInterpreter query_block_interpreter(
context,
input_streams_vec,
query_block,
max_streams);
return query_block_interpreter.execute();
}
}

BlockIO InterpreterDAG::execute()
Expand Down
62 changes: 62 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FmtUtils.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Planner/PhysicalPlan.h>
#include <Flash/Planner/PhysicalPlanHelper.h>
#include <Interpreters/Context.h>

namespace DB
{
String PhysicalPlan::toString()
{
auto schema_to_string = [&]() {
FmtBuffer buffer;
buffer.joinStr(
schema.cbegin(),
schema.cend(),
[](const auto & item, FmtBuffer & buf) { buf.fmtAppend("<{}, {}>", item.name, item.type->getName()); },
", ");
return buffer.toString();
};
return fmt::format(
"type: {}, executor_id: {}, is_record_profile_streams: {}, schema: {}",
DB::toString(type),
executor_id,
is_record_profile_streams,
schema_to_string());
}

void PhysicalPlan::finalize()
{
finalize(PhysicalPlanHelper::schemaToNames(schema));
}

void PhysicalPlan::recordProfileStreams(DAGPipeline & pipeline, const Context & context)
{
if (is_record_profile_streams)
{
auto & profile_streams = context.getDAGContext()->getProfileStreamsMap()[executor_id];
pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); });
}
}

void PhysicalPlan::transform(DAGPipeline & pipeline, Context & context, size_t max_streams)
{
transformImpl(pipeline, context, max_streams);
recordProfileStreams(pipeline, context);
}
} // namespace DB
84 changes: 84 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlan.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Logger.h>
#include <Core/Block.h>
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <Flash/Planner/PlanType.h>

#include <memory>

namespace DB
{
struct DAGPipeline;
class Context;
class DAGContext;

class PhysicalPlan;
using PhysicalPlanPtr = std::shared_ptr<PhysicalPlan>;

class PhysicalPlan
{
public:
PhysicalPlan(const String & executor_id_, const PlanType & type_, const NamesAndTypes & schema_, const String & req_id)
: executor_id(executor_id_)
, type(type_)
, schema(schema_)
, log(Logger::get(DB::toString(type_), req_id))
{}

virtual ~PhysicalPlan() = default;

virtual PhysicalPlanPtr children(size_t /*i*/) const = 0;

virtual void setChild(size_t /*i*/, const PhysicalPlanPtr & /*new_child*/) = 0;

const PlanType & tp() const { return type; }

const String & execId() const { return executor_id; }

const NamesAndTypes & getSchema() const { return schema; }

virtual void appendChild(const PhysicalPlanPtr & /*new_child*/) = 0;

virtual size_t childrenSize() const = 0;

virtual void transform(DAGPipeline & pipeline, Context & context, size_t max_streams);

virtual void finalize(const Names & parent_require) = 0;
void finalize();

/// Obtain a sample block that contains the names and types of result columns.
virtual const Block & getSampleBlock() const = 0;

void disableRecordProfileStreams() { is_record_profile_streams = false; }

String toString();

protected:
virtual void transformImpl(DAGPipeline & /*pipeline*/, Context & /*context*/, size_t /*max_streams*/){};

void recordProfileStreams(DAGPipeline & pipeline, const Context & context);

String executor_id;
PlanType type;
NamesAndTypes schema;
bool is_record_profile_streams = true;

LoggerPtr log;
};
} // namespace DB
10 changes: 10 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include <Flash/Planner/PhysicalPlanBuilder.h>
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
#include <Flash/Planner/plans/PhysicalSource.h>

namespace DB
{
void PhysicalPlanBuilder::buildSource(const Block & sample_block)
{
cur_plans.push_back(PhysicalSource::build(sample_block, log->identifier()));
}
} // namespace DB
47 changes: 47 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanBuilder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Flash/Planner/PhysicalPlan.h>
#include <common/logger_useful.h>

namespace DB
{
class PhysicalPlanBuilder
{
public:
explicit PhysicalPlanBuilder(Context & context_, const String & req_id)
: context(context_)
, log(Logger::get("PhysicalPlanBuilder", req_id))
{}

void buildSource(const Block & sample_block);

PhysicalPlanPtr getResult() const
{
RUNTIME_ASSERT(cur_plans.size() == 1, log, "There can only be one plan output, but here are {}", cur_plans.size());
return cur_plans.back();
}

private:
std::vector<PhysicalPlanPtr> cur_plans;

[[maybe_unused]] Context & context;

LoggerPtr log;
};
} // namespace DB
13 changes: 13 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanHelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#include <Flash/Planner/PhysicalPlanHelper.h>

namespace DB::PhysicalPlanHelper
{
Names schemaToNames(const NamesAndTypes & schema)
{
Names names;
names.reserve(schema.size());
for (const auto & column : schema)
names.push_back(column.name);
return names;
}
} // namespace DB::PhysicalPlanHelper
8 changes: 8 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanHelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#pragma once

#include <Core/NamesAndTypes.h>

namespace DB::PhysicalPlanHelper
{
Names schemaToNames(const NamesAndTypes & schema);
} // namespace DB::PhysicalPlanHelper
30 changes: 30 additions & 0 deletions dbms/src/Flash/Planner/PlanType.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Flash/Planner/PlanType.h>

namespace DB
{
String toString(const PlanType & plan_type)
{
switch (plan_type)
{
case Source:
return "Source";
default:
throw Exception("Unknown PlanType");
}
}
} // namespace DB
27 changes: 27 additions & 0 deletions dbms/src/Flash/Planner/PlanType.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <common/types.h>

namespace DB
{
enum PlanType
{
Source,
};

String toString(const PlanType & plan_type);
} // namespace DB
Loading