Skip to content

Commit

Permalink
merge master to join spill branch (#1)
Browse files Browse the repository at this point in the history
* Refine `Spiller` to support append write (pingcap#6862)

ref pingcap#6528

* fix
 build

Signed-off-by: xufei <xufei@pingcap.com>

* fix

Signed-off-by: xufei <xufei@pingcap.com>

* fix error

Signed-off-by: xufei <xufei@pingcap.com>

---------

Signed-off-by: xufei <xufei@pingcap.com>
  • Loading branch information
windtalker authored Feb 22, 2023
1 parent f6ff85d commit 03f1874
Show file tree
Hide file tree
Showing 30 changed files with 591 additions and 195 deletions.
24 changes: 24 additions & 0 deletions dbms/src/Columns/ColumnAggregateFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@

#include <AggregateFunctions/AggregateFunctionState.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsCommon.h>
#include <Common/HashTable/Hash.h>
#include <Common/SipHash.h>
#include <Common/typeid_cast.h>
#include <DataStreams/ColumnGathererStream.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeNullable.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteBufferFromArena.h>
#include <fmt/format.h>

Expand Down Expand Up @@ -244,6 +248,26 @@ size_t ColumnAggregateFunction::allocatedBytes() const
return res;
}

size_t ColumnAggregateFunction::estimateByteSizeForSpill() const
{
static const std::unordered_set<String> trivial_agg_func_name{"sum", "min", "max", "count", "avg", "first_row", "any"};
if (trivial_agg_func_name.find(func->getName()) != trivial_agg_func_name.end())
{
size_t res = func->sizeOfData() * size();
/// For trivial agg, we can estimate each element's size as `func->sizeofData()`, and
/// if the result is String, use `APPROX_STRING_SIZE` as the average size of the String
if (removeNullable(func->getReturnType())->isString())
res += size() * ColumnString::APPROX_STRING_SIZE;
return res;
}
else
{
/// For non-trivial agg like uniqXXX/group_concat, can't estimate the memory usage, so just return byteSize(),
/// it will highly overestimates size of a column if it was produced in AggregatingBlockInputStream (it contains size of other columns)
return byteSize();
}
}

MutableColumnPtr ColumnAggregateFunction::cloneEmpty() const
{
return create(func, Arenas(1, std::make_shared<Arena>()));
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Columns/ColumnAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ class ColumnAggregateFunction final : public COWPtrHelper<IColumn, ColumnAggrega

size_t byteSize() const override;

size_t estimateByteSizeForSpill() const override;

size_t allocatedBytes() const override;

void insertRangeFrom(const IColumn & from, size_t start, size_t length) override;
Expand Down
5 changes: 0 additions & 5 deletions dbms/src/Columns/ColumnString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@
#include <common/memcpy.h>
#include <fmt/core.h>


/// Used in the `reserve` method, when the number of rows is known, but sizes of elements are not.
#define APPROX_STRING_SIZE 64


namespace DB
{
namespace ErrorCodes
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Columns/ColumnString.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ColumnString final : public COWPtrHelper<IColumn, ColumnString>
{
public:
using Chars_t = PaddedPODArray<UInt8>;
static const auto APPROX_STRING_SIZE = 64;

private:
friend class COWPtrHelper<IColumn, ColumnString>;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Columns/IColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ class IColumn : public COWPtr<IColumn>
/// Size of column data in memory (may be approximate) - for profiling. Zero, if could not be determined.
virtual size_t byteSize() const = 0;

/// Size of the column if it is spilled, the same as byteSize() except for ColumnAggregateFunction
virtual size_t estimateByteSizeForSpill() const { return byteSize(); }

/// Size of column data between [offset, offset+limit) in memory (may be approximate) - for profiling.
/// This method throws NOT_IMPLEMENTED exception if it is called with unimplemented subclass.
virtual size_t byteSize(size_t /*offset*/, size_t /*limit*/) const
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Poco/String.h>
#include <Poco/StringTokenizer.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <common/defines.h>
#include <common/logger_useful.h>

#include <boost/core/noncopyable.hpp>
#include <condition_variable>
#include <mutex>

namespace DB
{
Expand Down Expand Up @@ -69,7 +67,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(exception_after_drop_segment) \
M(exception_between_schema_change_in_the_same_diff) \
M(force_ps_wal_compact) \
M(pause_before_full_gc_prepare)
M(pause_before_full_gc_prepare) \
M(exception_during_spill)

#define APPLY_FOR_FAILPOINTS(M) \
M(skip_check_segment_update) \
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,15 @@ size_t Block::bytes() const
return res;
}

size_t Block::estimateBytesForSpill() const
{
size_t res = 0;
for (const auto & elem : data)
res += elem.column->estimateByteSizeForSpill();

return res;
}

size_t Block::bytes(size_t offset, size_t limit) const
{
size_t res = 0;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class Block
/// Approximate number of bytes in memory - for profiling and limits.
size_t bytes() const;

size_t estimateBytesForSpill() const;

/// Approximate number of bytes between [offset, offset+limit) in memory - for profiling and limits.
size_t bytes(size_t offset, size_t limit) const;

Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Core/SpillConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ bool needReplace(char c)
return std::isspace(c) || String::npos != forbidden_or_unusual_chars.find(c);
}
} // namespace
SpillConfig::SpillConfig(const DB::String & spill_dir_, const DB::String & spill_id_, size_t max_spilled_size_per_spill_, const FileProviderPtr & file_provider_)
SpillConfig::SpillConfig(const DB::String & spill_dir_, const DB::String & spill_id_, size_t max_cached_data_bytes_in_spiller_, size_t max_spilled_rows_per_file_, size_t max_spilled_bytes_per_file_, const FileProviderPtr & file_provider_)
: spill_dir(spill_dir_)
, spill_id(spill_id_)
, spill_id_as_file_name_prefix(spill_id)
, max_spilled_size_per_spill(max_spilled_size_per_spill_)
, max_cached_data_bytes_in_spiller(max_cached_data_bytes_in_spiller_)
, max_spilled_rows_per_file(max_spilled_rows_per_file_)
, max_spilled_bytes_per_file(max_spilled_bytes_per_file_)
, file_provider(file_provider_)
{
RUNTIME_CHECK_MSG(!spill_dir.empty(), "Spiller dir must be non-empty");
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Core/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@

namespace DB
{

class FileProvider;
using FileProviderPtr = std::shared_ptr<FileProvider>;

struct SpillConfig
{
public:
SpillConfig(const String & spill_dir_, const String & spill_id_, size_t max_spilled_size_per_spill_, const FileProviderPtr & file_provider_);
SpillConfig(const String & spill_dir_, const String & spill_id_, size_t max_cached_data_bytes_in_spiller_, size_t max_spilled_rows_per_file_, size_t max_spilled_bytes_per_file_, const FileProviderPtr & file_provider_);
String spill_dir;
String spill_id;
String spill_id_as_file_name_prefix;
size_t max_spilled_size_per_spill;
/// soft limit of the max cached data bytes in spiller(used in Spiller::spillBlocksUsingBlockInputStream)
size_t max_cached_data_bytes_in_spiller;
/// soft limit of the max rows per spilled file
UInt64 max_spilled_rows_per_file;
/// soft limit of the max bytes per spilled file
UInt64 max_spilled_bytes_per_file;
FileProviderPtr file_provider;
};
} // namespace DB
100 changes: 75 additions & 25 deletions dbms/src/Core/SpillHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/Stopwatch.h>
#include <Core/SpillHandler.h>

namespace DB
{
namespace FailPoints
{
extern const char exception_during_spill[];
} // namespace FailPoints

SpillHandler::SpillWriter::SpillWriter(const FileProviderPtr & file_provider, const String & file_name, const Block & header, size_t spill_version)
: file_buf(file_provider, file_name, EncryptionPath(file_name, ""))
SpillHandler::SpillWriter::SpillWriter(const FileProviderPtr & file_provider, const String & file_name, bool append_write, const Block & header, size_t spill_version)
: file_buf(file_provider, file_name, EncryptionPath(file_name, ""), true, nullptr, DBMS_DEFAULT_BUFFER_SIZE, append_write ? O_APPEND | O_WRONLY : -1)
, compressed_buf(file_buf)
{
/// note this implicitly assumes that a SpillWriter will always write to a new file,
/// if we support append write, don't need to write the spill version again
writeVarUInt(spill_version, compressed_buf);
if (!append_write)
writeVarUInt(spill_version, compressed_buf);
out = std::make_unique<NativeBlockOutputStream>(compressed_buf, spill_version, header);
out->writePrefix();
}
Expand All @@ -44,66 +48,107 @@ void SpillHandler::SpillWriter::write(const Block & block)
out->write(block);
}

SpillHandler::SpillHandler(Spiller * spiller_, std::unique_ptr<SpilledFile> && spilled_file, size_t partition_id_)
SpillHandler::SpillHandler(Spiller * spiller_, size_t partition_id_)
: spiller(spiller_)
, partition_id(partition_id_)
, current_spilled_file_index(-1)
, writer(nullptr)
{}

std::pair<size_t, size_t> SpillHandler::setUpNextSpilledFile()
{
assert(writer == nullptr);
auto [spilled_file, append_write] = spiller->getOrCreateSpilledFile(partition_id);
if (append_write)
prev_spill_details.merge(spilled_file->getSpillDetails());
current_spill_file_name = spilled_file->path();
current_spilled_file_index = 0;
spilled_files.push_back(std::move(spilled_file));
current_spilled_file_index = spilled_files.size() - 1;
writer = std::make_unique<SpillWriter>(spiller->config.file_provider, current_spill_file_name, append_write, spiller->input_schema, spiller->spill_version);
return std::make_pair(spilled_files[current_spilled_file_index]->getSpillDetails().rows, spilled_files[current_spilled_file_index]->getSpillDetails().data_bytes_uncompressed);
}

bool SpillHandler::isSpilledFileFull(UInt64 spilled_rows, UInt64 spilled_bytes)
{
return (spiller->config.max_spilled_rows_per_file > 0 && spilled_rows >= spiller->config.max_spilled_rows_per_file) || (spiller->config.max_spilled_bytes_per_file > 0 && spilled_bytes >= spiller->config.max_spilled_bytes_per_file);
}

void SpillHandler::spillBlocks(const Blocks & blocks)
{
/// todo 1. set max_file_size and spill to new file if needed
/// 2. check the disk usage
/// todo check the disk usage
if (unlikely(blocks.empty()))
return;
RUNTIME_CHECK_MSG(current_spilled_file_index >= 0, "{}: spill after the spill handler meeting error or finished.", spiller->config.spill_id);
RUNTIME_CHECK_MSG(current_spilled_file_index != INVALID_CURRENT_SPILLED_FILE_INDEX, "{}: spill after the spill handler meeting error or finished.", spiller->config.spill_id);
try
{
Stopwatch watch;
RUNTIME_CHECK_MSG(spiller->spill_finished == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
auto block_size = blocks.size();
LOG_INFO(spiller->logger, "Spilling {} blocks data into temporary file {}", block_size, current_spill_file_name);

FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_spill);

size_t total_rows = 0;
if (unlikely(writer == nullptr))
{
writer = std::make_unique<SpillWriter>(spiller->config.file_provider, current_spill_file_name, blocks[0].cloneEmpty(), spiller->spill_version);
}
size_t rows_in_file = 0;
size_t bytes_in_file = 0;
for (const auto & block : blocks)
{
total_rows += block.rows();
if (unlikely(!block || block.rows() == 0))
continue;
if (unlikely(writer == nullptr))
{
std::tie(rows_in_file, bytes_in_file) = setUpNextSpilledFile();
}
auto rows = block.rows();
total_rows += rows;
rows_in_file += rows;
bytes_in_file += block.estimateBytesForSpill();
writer->write(block);
if (spiller->enable_append_write && isSpilledFileFull(rows_in_file, bytes_in_file))
{
spilled_files[current_spilled_file_index]->updateSpillDetails(writer->finishWrite());
spilled_files[current_spilled_file_index]->markFull();
writer = nullptr;
}
}
double cost = watch.elapsedSeconds();
time_cost += cost;
LOG_INFO(spiller->logger, "Spilled {} rows from {} blocks into temporary file, time cost: {:.3f} sec.", total_rows, block_size, cost);
RUNTIME_CHECK_MSG(current_spilled_file_index >= 0, "{}: spill after the spill handler is finished.", spiller->config.spill_id);
RUNTIME_CHECK_MSG(spiller->spill_finished == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
RUNTIME_CHECK_MSG(current_spilled_file_index != INVALID_CURRENT_SPILLED_FILE_INDEX, "{}: spill after the spill handler is finished.", spiller->config.spill_id);
RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
return;
}
catch (...)
{
/// mark the spill handler invalid
writer = nullptr;
spilled_files.clear();
current_spilled_file_index = -1;
current_spilled_file_index = INVALID_CURRENT_SPILLED_FILE_INDEX;
throw Exception(fmt::format("Failed to spill blocks to disk for file {}, error: {}", current_spill_file_name, getCurrentExceptionMessage(false, false)));
}
}

void SpillHandler::finish()
{
if (likely(writer != nullptr))
/// it is guaranteed that once current_spilled_file_index >= 0, at least one block is written to spilled_files[current_spilled_file_index]
if (likely(current_spilled_file_index >= 0))
{
auto spill_details = writer->finishWrite();
spilled_files[current_spilled_file_index]->updateSpillDetails(spill_details);
if (writer != nullptr)
{
spilled_files[current_spilled_file_index]->updateSpillDetails(writer->finishWrite());
auto current_spill_details = spilled_files[current_spilled_file_index]->getSpillDetails();
if (!spiller->enable_append_write || isSpilledFileFull(current_spill_details.rows, current_spill_details.data_bytes_uncompressed))
{
/// always mark full if enable_append_write is false here, since if enable_append_write is false, all the files are treated as full file
spilled_files[current_spilled_file_index]->markFull();
}
}

auto gen_spill_detail_info = [&]() {
SpillDetails details{0, 0, 0};
for (Int64 i = 0; i <= current_spilled_file_index; i++)
details.merge(spilled_files[i]->getSpillDetails());
details.subtract(prev_spill_details);
return fmt::format("Commit spilled data, details: spill {} rows in {:.3f} sec,"
" {:.3f} MiB uncompressed, {:.3f} MiB compressed, {:.3f} uncompressed bytes per row, {:.3f} compressed bytes per row, "
"compression rate: {:.3f} ({:.3f} rows/sec., {:.3f} MiB/sec. uncompressed, {:.3f} MiB/sec. compressed)",
Expand All @@ -121,11 +166,16 @@ void SpillHandler::finish()
LOG_DEBUG(spiller->logger, gen_spill_detail_info());
std::unique_lock lock(spiller->spilled_files[partition_id]->spilled_files_mutex);
for (auto & spilled_file : spilled_files)
spiller->spilled_files[partition_id]->spilled_files.push_back(std::move(spilled_file));
{
if (!spilled_file->isFull())
spiller->spilled_files[partition_id]->mutable_spilled_files.push_back(std::move(spilled_file));
else
spiller->spilled_files[partition_id]->immutable_spilled_files.push_back(std::move(spilled_file));
}
spilled_files.clear();
spiller->has_spilled_data = true;
current_spilled_file_index = -1;
RUNTIME_CHECK_MSG(spiller->spill_finished == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
current_spilled_file_index = INVALID_CURRENT_SPILLED_FILE_INDEX;
RUNTIME_CHECK_MSG(spiller->isSpillFinished() == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
}
}

Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Core/SpillHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

namespace DB
{

class IBlockOutputStream;

/// SpillHandler is used to spill blocks, currently hidden behind `Spiller::spillBlocks`
Expand All @@ -32,15 +31,17 @@ class IBlockOutputStream;
class SpillHandler
{
public:
SpillHandler(Spiller * spiller_, std::unique_ptr<SpilledFile> && spilled_file, size_t partition_id_);
SpillHandler(Spiller * spiller_, size_t partition_id_);
void spillBlocks(const Blocks & blocks);
void finish();

private:
std::pair<size_t, size_t> setUpNextSpilledFile();
bool isSpilledFileFull(UInt64 spilled_rows, UInt64 spilled_bytes);
class SpillWriter
{
public:
SpillWriter(const FileProviderPtr & file_provider, const String & file_name, const Block & header, size_t spill_version);
SpillWriter(const FileProviderPtr & file_provider, const String & file_name, bool append_write, const Block & header, size_t spill_version);
SpillDetails finishWrite();
void write(const Block & block);

Expand All @@ -57,6 +58,8 @@ class SpillHandler
String current_spill_file_name;
std::unique_ptr<SpillWriter> writer;
double time_cost = 0;
SpillDetails prev_spill_details;
static const Int64 INVALID_CURRENT_SPILLED_FILE_INDEX = -10;
};

} // namespace DB
Loading

0 comments on commit 03f1874

Please sign in to comment.