Skip to content

Commit

Permalink
Merge branch 'master' into fix-timestamp-after-generated
Browse files Browse the repository at this point in the history
  • Loading branch information
Lloyd-Pottiger authored May 16, 2023
2 parents 50fe23e + 0292765 commit 6d48424
Show file tree
Hide file tree
Showing 46 changed files with 904 additions and 352 deletions.
2 changes: 1 addition & 1 deletion contrib/poco
3 changes: 2 additions & 1 deletion dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@
M(DTFileCacheCapacity) \
M(DTFileCacheUsed) \
M(PageCacheCapacity) \
M(PageCacheUsed)
M(PageCacheUsed) \
M(ConnectionPoolSize)

namespace CurrentMetrics
{
Expand Down
20 changes: 15 additions & 5 deletions dbms/src/Common/PoolBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,32 +142,42 @@ class PoolBase : private boost::noncopyable
return Entry(*items.back());
}

LOG_INFO(log, "No free connections in pool. Waiting.");

if (timeout < 0)
{
LOG_INFO(log, "No free connections in pool. Waiting infinitely.");
available.wait(lock);
}
else
{
LOG_INFO(log, "No free connections in pool. Waiting {} ms.", timeout);
available.wait_for(lock, std::chrono::microseconds(timeout));
}
}
}

void reserve(size_t count)
{
std::lock_guard lock(mutex);
std::unique_lock lock(mutex);

while (items.size() < count)
items.emplace_back(std::make_shared<PooledObject>(allocObject(), *this));
}

size_t getPoolSize() const
{
std::unique_lock lock(mutex);
return items.size();
}

private:
/** The maximum size of the pool. */
unsigned max_items;
const unsigned max_items;

/** Pool. */
Objects items;

/** Lock to access the pool. */
std::mutex mutex;
mutable std::mutex mutex;
std::condition_variable available;

protected:
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@ namespace DB
F(type_delete_object, {{"type", "delete_object"}}, ExpBuckets{0.001, 2, 20}), \
F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20}), \
F(type_read_stream, {{"type", "read_stream"}}, ExpBuckets{0.0001, 2, 20})) \
M(tiflash_storage_s3_http_request_seconds, "S3 request duration breakdown in seconds", Histogram, \
F(type_dns, {{"type", "dns"}}, ExpBuckets{0.001, 2, 20}), \
F(type_connect, {{"type", "connect"}}, ExpBuckets{0.001, 2, 20}), \
F(type_request, {{"type", "request"}}, ExpBuckets{0.001, 2, 20}), \
F(type_response, {{"type", "response"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_pipeline_scheduler, "pipeline scheduler", Gauge, \
F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \
F(type_cpu_pending_tasks_count, {"type", "cpu_pending_tasks_count"}), \
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Common/tests/gtest_cpu_affinity_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

#include <Common/CPUAffinityManager.h>
#include <Common/Config/TOMLConfiguration.h>
#include <Common/Logger.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <boost_wrapper/string.h>
#include <common/logger_useful.h>
#include <gtest/gtest.h>
#include <unistd.h>

Expand Down Expand Up @@ -82,6 +84,15 @@ TEST(CPUAffinityManagerTest, CPUAffinityManager)
int ret = sched_getaffinity(0, sizeof(cpu_set), &cpu_set);
ASSERT_EQ(ret, 0) << strerror(errno);

auto n_cpu = std::thread::hardware_concurrency();
auto cpu_cores = cpu_affinity.cpuSetToVec(cpu_set);
if (n_cpu != cpu_cores.size())
{
LOG_INFO(Logger::get(), "n_cpu = {}, cpu_cores = {}, CPU number and CPU cores not match, don't not check CPUAffinityManager", n_cpu, cpu_cores);
return;
}
LOG_DEBUG(Logger::get(), "n_cpu = {}, cpu_cores = {}", n_cpu, cpu_cores);

cpu_affinity.bindSelfQueryThread();
cpu_set_t cpu_set0;
ret = sched_getaffinity(0, sizeof(cpu_set0), &cpu_set0);
Expand Down
21 changes: 13 additions & 8 deletions dbms/src/DataStreams/WindowBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,14 @@ Block WindowBlockInputStream::readImpl()
}

// Judge whether current_partition_row is end row of partition in current block
// How to judge?
// Compare data in previous partition with the new scanned data.
bool WindowTransformAction::isDifferentFromPrevPartition(UInt64 current_partition_row)
{
// prev_frame_start refers to the data in previous partition
const Columns & reference_columns = inputAt(prev_frame_start);

// partition_end refers to the new scanned data
const Columns & compared_columns = inputAt(partition_end);

for (size_t i = 0; i < partition_column_indices.size(); ++i)
Expand Down Expand Up @@ -299,9 +304,9 @@ void WindowTransformAction::advanceFrameStart()
}
}

bool WindowTransformAction::arePeers(const RowNumber & x, const RowNumber & y) const
bool WindowTransformAction::arePeers(const RowNumber & peer_group_last_row, const RowNumber & current_row) const
{
if (x == y)
if (peer_group_last_row == current_row)
{
// For convenience, a row is always its own peer.
return true;
Expand All @@ -324,18 +329,18 @@ bool WindowTransformAction::arePeers(const RowNumber & x, const RowNumber & y) c

for (size_t i = 0; i < n; ++i)
{
const auto * column_x = inputAt(x)[order_column_indices[i]].get();
const auto * column_y = inputAt(y)[order_column_indices[i]].get();
const auto * column_peer_last = inputAt(peer_group_last_row)[order_column_indices[i]].get();
const auto * column_current = inputAt(current_row)[order_column_indices[i]].get();
if (window_description.order_by[i].collator)
{
if (column_x->compareAt(x.row, y.row, *column_y, 1 /* nan_direction_hint */, *window_description.order_by[i].collator) != 0)
if (column_peer_last->compareAt(peer_group_last_row.row, current_row.row, *column_current, 1 /* nan_direction_hint */, *window_description.order_by[i].collator) != 0)
{
return false;
}
}
else
{
if (column_x->compareAt(x.row, y.row, *column_y, 1 /* nan_direction_hint */) != 0)
if (column_peer_last->compareAt(peer_group_last_row.row, current_row.row, *column_current, 1 /* nan_direction_hint */) != 0)
{
return false;
}
Expand Down Expand Up @@ -607,8 +612,8 @@ void WindowTransformAction::tryCalculate()
partition_start = partition_end;
advanceRowNumber(partition_end);
partition_ended = false;
// We have to reset the frame and other pointers when the new partition
// starts.

// We have to reset the frame and other pointers when the new partition starts.
frame_start = partition_start;
frame_end = partition_start;
prev_frame_start = partition_start;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/DataStreams/WindowBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct WindowTransformAction
void advancePartitionEnd();
bool isDifferentFromPrevPartition(UInt64 current_partition_row);

bool arePeers(const RowNumber & x, const RowNumber & y) const;
bool arePeers(const RowNumber & peer_group_last_row, const RowNumber & current_row) const;

void advanceFrameStart();
void advanceFrameEndCurrentRow();
Expand Down Expand Up @@ -202,6 +202,7 @@ struct WindowTransformAction

// The row for which we are now computing the window functions.
RowNumber current_row;

// The start of current peer group, needed for CURRENT ROW frame start.
// For ROWS frame, always equal to the current row, and for RANGE and GROUP
// frames may be earlier.
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Debug/MockExecutor/FuncSigMap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,6 @@ std::unordered_map<String, tipb::ExprType> window_func_name_to_sig({
{"DenseRank", tipb::ExprType::DenseRank},
{"Lead", tipb::ExprType::Lead},
{"Lag", tipb::ExprType::Lag},
{"FirstValue", tipb::ExprType::FirstValue},
});
} // namespace DB::tests
14 changes: 14 additions & 0 deletions dbms/src/Debug/MockExecutor/WindowBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <Debug/MockExecutor/FuncSigMap.h>
#include <Debug/MockExecutor/WindowBinder.h>
#include <Parsers/ASTFunction.h>
#include <tipb/expression.pb.h>


namespace DB::mock
{
Expand Down Expand Up @@ -73,6 +75,13 @@ bool WindowBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collat
ft->set_decimal(first_arg_type.decimal());
break;
}
case tipb::ExprType::FirstValue:
{
assert(window_expr->children_size() == 1);
const auto arg_type = window_expr->children(0).field_type();
(*ft) = arg_type;
break;
}
default:
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagBinary);
Expand Down Expand Up @@ -202,6 +211,11 @@ ExecutorBinderPtr compileWindow(ExecutorBinderPtr input, size_t & executor_index
}
break;
}
case tipb::ExprType::FirstValue:
{
ci = children_ci[0];
break;
}
default:
throw Exception(fmt::format("Unsupported window function {}", func->name), ErrorCodes::LOGICAL_ERROR);
}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const std::unordered_map<tipb::ExprType, String> window_func_map({
{tipb::ExprType::RowNumber, "row_number"},
{tipb::ExprType::Lead, "lead"},
{tipb::ExprType::Lag, "lag"},
{tipb::ExprType::FirstValue, "first_value"},
});

const std::unordered_map<tipb::ExprType, String> agg_func_map({
Expand Down Expand Up @@ -1030,10 +1031,10 @@ bool isWindowFunctionExpr(const tipb::Expr & expr)
case tipb::ExprType::DenseRank:
case tipb::ExprType::Lead:
case tipb::ExprType::Lag:
case tipb::ExprType::FirstValue:
// case tipb::ExprType::CumeDist:
// case tipb::ExprType::PercentRank:
// case tipb::ExprType::Ntile:
// case tipb::ExprType::FirstValue:
// case tipb::ExprType::LastValue:
// case tipb::ExprType::NthValue:
return true;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ void EstablishCallData::writeDone(String msg, const grpc::Status & status)

if (async_tunnel_sender)
{
LOG_INFO(async_tunnel_sender->getLogger(), "connection for {} cost {} ms, including {} ms to wait task.", async_tunnel_sender->getTunnelId(), stopwatch->elapsedMilliseconds(), waiting_task_time_ms);
LOG_INFO(async_tunnel_sender->getLogger(), "async connection for {} cost {} ms, including {} ms to wait task.", async_tunnel_sender->getTunnelId(), stopwatch->elapsedMilliseconds(), waiting_task_time_ms);

RUNTIME_ASSERT(!async_tunnel_sender->isConsumerFinished(), async_tunnel_sender->getLogger(), "tunnel {} consumer finished in advance", async_tunnel_sender->getTunnelId());

Expand Down
13 changes: 11 additions & 2 deletions dbms/src/Flash/Executor/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ ExecutionResult PipelineExecutor::execute(ResultHandler && result_handler)
scheduleEvents();
wait();
}
LOG_TRACE(log, "query finish with {}", status.getQueryProfileInfo().toJson());
return status.toExecutionResult();
}

Expand All @@ -119,8 +120,16 @@ int PipelineExecutor::estimateNewThreadCount()

RU PipelineExecutor::collectRequestUnit()
{
// TODO support collectRequestUnit
return 0;
// TODO Get cputime more accurately.
// Currently, it is assumed that
// - The size of the CPU task thread pool is equal to the number of CPU cores.
// - Most of the CPU computations are executed in the CPU task thread pool.
// Therefore, `query_profile_info.getCPUExecuteTimeNs()` is approximately equal to the actual CPU time of the query.
// However, once these two assumptions are broken, it will lead to inaccurate acquisition of CPU time.
// It may be necessary to obtain CPU time using a more accurate method, such as using system call `clock_gettime`.
const auto & query_profile_info = status.getQueryProfileInfo();
auto cpu_time_ns = query_profile_info.getCPUExecuteTimeNs();
return toRU(ceil(cpu_time_ns));
}

Block PipelineExecutor::getSampleBlock() const
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutorStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Flash/Executor/ExecutionResult.h>
#include <Flash/Executor/ResultHandler.h>
#include <Flash/Executor/ResultQueue.h>
#include <Flash/Pipeline/Schedule/Tasks/TaskProfileInfo.h>

#include <atomic>
#include <exception>
Expand Down Expand Up @@ -114,6 +115,16 @@ class PipelineExecutorStatus : private boost::noncopyable

ResultQueuePtr toConsumeMode(size_t queue_size) noexcept;

void update(const TaskProfileInfo & task_profile_info)
{
query_profile_info.merge(task_profile_info);
}

const QueryProfileInfo & getQueryProfileInfo() const
{
return query_profile_info;
}

private:
bool setExceptionPtr(const std::exception_ptr & exception_ptr_) noexcept;

Expand All @@ -136,5 +147,7 @@ class PipelineExecutorStatus : private boost::noncopyable

// `result_queue.finish` can only be called in `onEventFinish` because `result_queue.pop` cannot end until events end.
std::optional<ResultQueuePtr> result_queue;

QueryProfileInfo query_profile_info;
};
} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/Flash/Mpp/AsyncRequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ class AsyncRequestHandler : public AsyncRequestHandlerBase
// Do not change the order of these two clauses.
// We must ensure that status is set before pushing async handler into queue
stage = AsyncRequestStage::WAIT_REWRITE;
async_wait_rewrite_queue->push(std::make_pair(&kick_recv_tag, reader->getClientContext()->c_call()));
bool res = async_wait_rewrite_queue->push(std::make_pair(&kick_recv_tag, reader->getClientContext()->c_call()));
if (!res)
closeConnection("AsyncRequestHandlerWaitQueue has been closed");
}

void closeGrpcConnection()
Expand Down
20 changes: 19 additions & 1 deletion dbms/src/Flash/Mpp/GRPCReceiveQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,14 @@ class AsyncRequestHandlerWaitQueue
return wait_retry_queue.empty();
}

void push(AsyncRetryConnection conn)
// Return false when AsyncRequestHandlerWaitQueue is closed
bool push(AsyncRetryConnection conn)
{
std::lock_guard lock(mu);
if unlikely (is_closed)
return false;
wait_retry_queue.push(conn);
return true;
}

AsyncRetryConnection pop()
Expand All @@ -99,9 +103,17 @@ class AsyncRequestHandlerWaitQueue
return ret_conn;
}

// AsyncRequestHandlerWaitQueue should not receive any tag after close.
void close()
{
std::lock_guard lock(mu);
is_closed = true;
}

private:
std::mutex mu;
std::queue<AsyncRetryConnection> wait_retry_queue;
bool is_closed = false;
};

using AsyncRequestHandlerWaitQueuePtr = std::shared_ptr<AsyncRequestHandlerWaitQueue>;
Expand Down Expand Up @@ -148,6 +160,9 @@ class GRPCReceiveQueue
bool cancelWith(const String & reason)
{
auto ret = recv_queue->cancelWith(reason);
// We should close conn_wait_queue in advance in case newer tag is pushed after handleRemainingTags()
conn_wait_queue->close();

if (ret)
handleRemainingTags();
return ret;
Expand All @@ -161,6 +176,9 @@ class GRPCReceiveQueue
bool finish()
{
auto ret = recv_queue->finish();
// We should close conn_wait_queue in advance in case newer tag is pushed after handleRemainingTags()
conn_wait_queue->close();

if (ret)
handleRemainingTags();
return ret;
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,16 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request, const Settings
return is_supported;
case tipb::ExecType::TypeJoin:
// TODO support spill.
is_supported = (settings.max_bytes_before_external_join == 0);
// If force_enable_pipeline is true, it will return true, even if the join does not actually support spill.
is_supported = (settings.max_bytes_before_external_join == 0 || settings.force_enable_pipeline);
return is_supported;
default:
is_supported = false;
return false;
}
});
if (settings.force_enable_pipeline && !is_supported)
throw Exception("There is an unsupported operator, and an error is reported because the setting force_enable_pipeline is true.");
return is_supported;
}
} // namespace DB
Loading

0 comments on commit 6d48424

Please sign in to comment.