Skip to content

Commit

Permalink
some minor refine (pingcap#4)
Browse files Browse the repository at this point in the history
* minor refine

Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>

* more

Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>

---------

Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
  • Loading branch information
windtalker authored Mar 3, 2023
1 parent 3c8e32b commit e2e254e
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 20 deletions.
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void HashJoinProbeBlockInputStream::cancel(bool kill)
/// 2. the query is executed normally, and one of the data stream has read an empty block, the the data stream and all its children
/// will call `cancel(false)`, in this case, there is two sub-cases
/// a. the data stream read an empty block because of EOF, then it means there must be no threads waiting in Join, so cancel the join is safe
/// b. the data stream read an empty block because of early exit of some executor(like topN and limit), in this case, just wake the waiting
/// b. the data stream read an empty block because of early exit of some executor(like limit), in this case, just wake the waiting
/// threads is not 100% safe because if the probe thread is wake up when build is not finished yet, it may produce wrong results, for now
/// it is safe because when any of the data stream read empty block because of early exit, the execution framework ensures that no further
/// data will be used.
Expand Down
26 changes: 15 additions & 11 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2230,12 +2230,14 @@ void Join::finishOneBuild()
--active_build_concurrency;
if (active_build_concurrency == 0)
{
if (isEnableSpill())
{
/// don't need to add any locks here because at this time
/// all the related threads are waiting in `waitUntilAllBuildFinished()`
/// todo maybe we can spill probe partition in its own thread
trySpillBuildPartitions(true);
tryMarkBuildSpillFinish();
std::unique_lock p_lock(partitions_lock);
if (hasPartitionSpilled())
{
trySpillBuildPartitions(true);
tryMarkBuildSpillFinish();
}
}
build_cv.notify_all();
}
Expand Down Expand Up @@ -2263,13 +2265,15 @@ void Join::finishOneProbe()
{
if (isEnableSpill())
{
/// no need to add lock since all the other related threads is either finished or wait in `waitUntilAllProbeFinished`
/// todo maybe we can spill probe partition in its own thread
trySpillProbePartitions(true);
tryMarkProbeSpillFinish();
if (!needReturnNonJoinedData())
std::unique_lock p_lock(partitions_lock);
if (hasPartitionSpilled())
{
releaseAllPartitions();
trySpillProbePartitions(true);
tryMarkProbeSpillFinish();
if (!needReturnNonJoinedData())
{
releaseAllPartitions();
}
}
}
probe_cv.notify_all();
Expand Down
7 changes: 1 addition & 6 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,17 +259,12 @@ class Join

void cancel()
{
is_canceled = true;
std::unique_lock lk(build_probe_mutex);
is_canceled = true;
probe_cv.notify_all();
build_cv.notify_all();
}

bool isCanceled()
{
return is_canceled;
}

void finishOneBuild();
void waitUntilAllBuildFinished() const;

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ struct Settings
M(SettingUInt64, manual_compact_pool_size, 1, "The number of worker threads to handle manual compact requests.") \
M(SettingUInt64, manual_compact_max_concurrency, 10, "Max concurrent tasks. It should be larger than pool size.") \
M(SettingUInt64, manual_compact_more_until_ms, 60000, "Continuously compact more segments until reaching specified elapsed time. If 0 is specified, only one segment will be compacted each round.") \
M(SettingUInt64, max_bytes_before_external_join, 0, "max bytes used by join, 0 as the default value, 0 means no limit") \
M(SettingInt64, join_restore_concurrency, -1, "join restore concurrency, negative value means restore join serially, 0 means TiFlash choose restore concurrency automatically, -1 as the default value") \
M(SettingUInt64, max_bytes_before_external_join, 0, "max bytes used by join before spill, 0 as the default value, 0 means no limit") \
M(SettingInt64, join_restore_concurrency, 0, "join restore concurrency, negative value means restore join serially, 0 means TiFlash choose restore concurrency automatically, 0 as the default value") \
M(SettingUInt64, max_cached_data_bytes_in_spiller, 1024ULL * 1024 * 100, "Max cached data bytes in spiller before spilling, 100MB as the default value, 0 means no limit") \
M(SettingUInt64, max_spilled_rows_per_file, 200000, "Max spilled data rows per spill file, 200000 as the default value, 0 means no limit.") \
M(SettingUInt64, max_spilled_bytes_per_file, 0, "Max spilled data bytes per spill file, 0 as the default value, 0 means no limit.") \
Expand Down

0 comments on commit e2e254e

Please sign in to comment.