Skip to content

Commit

Permalink
Aggregator use spiller (#6643)
Browse files Browse the repository at this point in the history
ref #6528
  • Loading branch information
windtalker authored Jan 17, 2023
1 parent e7d7cea commit d477e2c
Show file tree
Hide file tree
Showing 42 changed files with 511 additions and 407 deletions.
11 changes: 11 additions & 0 deletions dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,17 @@ Block mergeBlocks(Blocks && blocks)
return first_block.cloneWithColumns(std::move(dst_columns));
}

Block popBlocksListFront(BlocksList & blocks)
{
if (!blocks.empty())
{
Block out_block = blocks.front();
blocks.pop_front();
return out_block;
}
return {};
}

bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
{
return checkBlockStructure<bool>(lhs, rhs, {});
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ using BucketBlocksListMap = std::map<Int32, BlocksList>;

Block mergeBlocks(Blocks && blocks);

Block popBlocksListFront(BlocksList & blocks);

/// Compare number of columns, data types, column types, column names, and values of constant columns.
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs);

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Core/SpillConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Core/SpillConfig.h>
#include <Encryption/FileProvider.h>
#include <Poco/Path.h>
Expand All @@ -33,6 +34,8 @@ SpillConfig::SpillConfig(const DB::String & spill_dir_, const DB::String & spill
, max_spilled_size_per_spill(max_spilled_size_per_spill_)
, file_provider(file_provider_)
{
RUNTIME_CHECK_MSG(!spill_dir.empty(), "Spiller dir must be non-empty");
RUNTIME_CHECK_MSG(!spill_id.empty(), "Spiller id must be non-empty");
if (spill_dir.at(spill_dir.size() - 1) != Poco::Path::separator())
{
spill_dir += Poco::Path::separator();
Expand Down
61 changes: 55 additions & 6 deletions dbms/src/Core/SpillHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,38 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Stopwatch.h>
#include <Core/SpillHandler.h>

namespace DB
{

SpillHandler::SpillWriter::SpillWriter(const FileProviderPtr & file_provider, const String & file_name, const Block & header, size_t spill_version)
: file_buf(file_provider, file_name, EncryptionPath(file_name, ""))
, compressed_buf(file_buf)
{
/// note this implicitly assumes that a SpillWriter will always write to a new file,
/// if we support append write, don't need to write the spill version again
writeVarUInt(spill_version, compressed_buf);
out = std::make_unique<NativeBlockOutputStream>(compressed_buf, spill_version, header);
out->writePrefix();
}

SpillDetails SpillHandler::SpillWriter::finishWrite()
{
out->flush();
compressed_buf.next();
file_buf.next();
out->writeSuffix();
return {written_rows, compressed_buf.count(), file_buf.count()};
}

void SpillHandler::SpillWriter::write(const Block & block)
{
written_rows += block.rows();
out->write(block);
}

SpillHandler::SpillHandler(Spiller * spiller_, std::unique_ptr<SpilledFile> && spilled_file, size_t partition_id_)
: spiller(spiller_)
, partition_id(partition_id_)
Expand All @@ -35,22 +62,23 @@ void SpillHandler::spillBlocks(const Blocks & blocks)
RUNTIME_CHECK_MSG(current_spilled_file_index >= 0, "{}: spill after the spill handler meeting error or finished.", spiller->config.spill_id);
try
{
Stopwatch watch;
RUNTIME_CHECK_MSG(spiller->spill_finished == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
LOG_INFO(spiller->logger, "Spilling {} blocks data into temporary file {}", blocks.size(), current_spill_file_name);
size_t spilled_data_size = 0;
if (unlikely(writer == nullptr))
{
writer = std::make_unique<SpillWriter>(spiller->config.file_provider, current_spill_file_name, blocks[0].cloneEmpty());
writer->out->writePrefix();
writer = std::make_unique<SpillWriter>(spiller->config.file_provider, current_spill_file_name, blocks[0].cloneEmpty(), spiller->spill_version);
}
for (const auto & block : blocks)
{
auto block_bytes_size = block.bytes();
writer->out->write(block);
spilled_files[current_spilled_file_index]->addSpilledDataSize(block_bytes_size);
writer->write(block);
spilled_data_size += block_bytes_size;
}
LOG_INFO(spiller->logger, "Finish Spilling data into temporary file {}, spilled data size: {}", current_spill_file_name, spilled_data_size);
double cost = watch.elapsedSeconds();
time_cost += cost;
LOG_INFO(spiller->logger, "Finish Spilling data into temporary file {}, spilled data size: {}, time cost: {:.3f} sec.", current_spill_file_name, spilled_data_size, cost);
RUNTIME_CHECK_MSG(current_spilled_file_index >= 0, "{}: spill after the spill handler is finished.", spiller->config.spill_id);
RUNTIME_CHECK_MSG(spiller->spill_finished == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
return;
Expand All @@ -69,7 +97,28 @@ void SpillHandler::finish()
{
if (likely(writer != nullptr))
{
writer->out->writeSuffix();
auto spill_details = writer->finishWrite();
spilled_files[current_spilled_file_index]->updateSpillDetails(spill_details);

auto gen_spill_detail_info = [&]() {
SpillDetails details{0, 0, 0};
for (Int64 i = 0; i <= current_spilled_file_index; i++)
details.merge(spilled_files[i]->getSpillDetails());
return fmt::format("Spill {} rows in {:.3f} sec,"
" {:.3f} MiB uncompressed, {:.3f} MiB compressed, {:.3f} uncompressed bytes per row, {:.3f} compressed bytes per row, "
"compression rate: {:.3f} ({:.3f} rows/sec., {:.3f} MiB/sec. uncompressed, {:.3f} MiB/sec. compressed)",
details.rows,
time_cost,
(details.data_bytes_uncompressed / 1048576.0),
(details.data_bytes_compressed / 1048576.0),
(details.data_bytes_uncompressed / static_cast<double>(details.rows)),
(details.data_bytes_compressed / static_cast<double>(details.rows)),
(details.data_bytes_uncompressed / static_cast<double>(details.data_bytes_compressed)),
(details.rows / time_cost),
(details.data_bytes_uncompressed / time_cost / 1048576.0),
(details.data_bytes_compressed / time_cost / 1048576.0));
};
LOG_DEBUG(spiller->logger, gen_spill_detail_info());
std::unique_lock lock(spiller->spilled_files[partition_id]->spilled_files_mutex);
for (auto & spilled_file : spilled_files)
spiller->spilled_files[partition_id]->spilled_files.push_back(std::move(spilled_file));
Expand Down
17 changes: 10 additions & 7 deletions dbms/src/Core/SpillHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <DataStreams/NativeBlockOutputStream.h>
#include <Encryption/WriteBufferFromFileProvider.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/VarInt.h>

namespace DB
{
Expand All @@ -36,24 +37,26 @@ class SpillHandler
void finish();

private:
struct SpillWriter
class SpillWriter
{
SpillWriter(const FileProviderPtr & file_provider, const String & file_name, const Block & header)
: file_buf(file_provider, file_name, EncryptionPath(file_name, ""))
, compressed_buf(file_buf)
, out(std::make_unique<NativeBlockOutputStream>(compressed_buf, 0, header))
{
}
public:
SpillWriter(const FileProviderPtr & file_provider, const String & file_name, const Block & header, size_t spill_version);
SpillDetails finishWrite();
void write(const Block & block);

private:
WriteBufferFromFileProvider file_buf;
CompressedWriteBuffer<> compressed_buf;
std::unique_ptr<IBlockOutputStream> out;
size_t written_rows = 0;
};
Spiller * spiller;
std::vector<std::unique_ptr<SpilledFile>> spilled_files;
size_t partition_id;
Int64 current_spilled_file_index;
String current_spill_file_name;
std::unique_ptr<SpillWriter> writer;
double time_cost = 0;
};

} // namespace DB
18 changes: 12 additions & 6 deletions dbms/src/Core/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace DB
{
SpilledFile::SpilledFile(const String & file_name_, const FileProviderPtr & file_provider_)
: Poco::File(file_name_)
, details(0, 0, 0)
, file_provider(file_provider_)
{}

Expand All @@ -46,12 +47,13 @@ SpilledFile::~SpilledFile()
}
}

Spiller::Spiller(const SpillConfig & config_, bool is_input_sorted_, size_t partition_num_, const Block & input_schema_, const LoggerPtr & logger_)
Spiller::Spiller(const SpillConfig & config_, bool is_input_sorted_, size_t partition_num_, const Block & input_schema_, const LoggerPtr & logger_, Int64 spill_version_)
: config(config_)
, is_input_sorted(is_input_sorted_)
, partition_num(partition_num_)
, input_schema(input_schema_)
, logger(logger_)
, spill_version(spill_version_)
{
for (size_t i = 0; i < partition_num; ++i)
spilled_files.push_back(std::make_unique<SpilledFiles>());
Expand Down Expand Up @@ -111,45 +113,49 @@ BlockInputStreams Spiller::restoreBlocks(size_t partition_id, size_t max_stream_
max_stream_size = spilled_files[partition_id]->spilled_files.size();
if (is_input_sorted && spilled_files[partition_id]->spilled_files.size() > max_stream_size)
LOG_WARNING(logger, "sorted spilled data restore does not take max_stream_size into account");
SpillDetails details{0, 0, 0};
BlockInputStreams ret;
if (is_input_sorted)
{
for (const auto & file : spilled_files[partition_id]->spilled_files)
{
RUNTIME_CHECK_MSG(file->exists(), "Spill file {} does not exists", file->path());
details.merge(file->getSpillDetails());
std::vector<String> files{file->path()};
ret.push_back(std::make_shared<SpilledFilesInputStream>(files, input_schema, config.file_provider));
ret.push_back(std::make_shared<SpilledFilesInputStream>(files, input_schema, config.file_provider, spill_version));
}
}
else
{
size_t return_stream_num = std::min(max_stream_size, spilled_files[partition_id]->spilled_files.size());
std::vector<std::vector<String>> files(return_stream_num);
// todo balance based on SpilledDataSize
// todo balance based on SpilledRows
for (size_t i = 0; i < spilled_files[partition_id]->spilled_files.size(); ++i)
{
const auto & file = spilled_files[partition_id]->spilled_files[i];
RUNTIME_CHECK_MSG(file->exists(), "Spill file {} does not exists", file->path());
details.merge(file->getSpillDetails());
files[i % return_stream_num].push_back(file->path());
}
for (size_t i = 0; i < return_stream_num; ++i)
{
if (likely(!files[i].empty()))
ret.push_back(std::make_shared<SpilledFilesInputStream>(files[i], input_schema, config.file_provider));
ret.push_back(std::make_shared<SpilledFilesInputStream>(files[i], input_schema, config.file_provider, spill_version));
}
}
LOG_DEBUG(logger, "Will restore {} rows from file of size {:.3f} MiB compressed, {:.3f} MiB uncompressed.", details.rows, (details.data_bytes_compressed / 1048576.0), (details.data_bytes_uncompressed / 1048576.0));
if (ret.empty())
ret.push_back(std::make_shared<NullBlockInputStream>(input_schema));
return ret;
}

size_t Spiller::spilledBlockDataSize(size_t partition_id)
size_t Spiller::spilledRows(size_t partition_id)
{
RUNTIME_CHECK_MSG(partition_id < partition_num, "{}: partition id {} exceeds partition num {}.", config.spill_id, partition_id, partition_num);
RUNTIME_CHECK_MSG(spill_finished, "{}: spilledBlockDataSize must be called when the spiller is finished.", config.spill_id);
size_t ret = 0;
for (auto & file : spilled_files[partition_id]->spilled_files)
ret += file->getSpilledDataSize();
ret += file->getSpilledRows();
return ret;
}

Expand Down
30 changes: 25 additions & 5 deletions dbms/src/Core/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,35 @@ using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
class SpillHandler;

struct SpillDetails
{
size_t rows;
size_t data_bytes_uncompressed;
size_t data_bytes_compressed;
SpillDetails() = default;
SpillDetails(size_t rows_, size_t data_bytes_uncompressed_, size_t data_bytes_compressed_)
: rows(rows_)
, data_bytes_uncompressed(data_bytes_uncompressed_)
, data_bytes_compressed(data_bytes_compressed_)
{}
void merge(const SpillDetails & other)
{
rows += other.rows;
data_bytes_uncompressed += other.data_bytes_uncompressed;
data_bytes_compressed += other.data_bytes_compressed;
}
};
class SpilledFile : public Poco::File
{
public:
SpilledFile(const String & file_name, const FileProviderPtr & file_provider_);
~SpilledFile() override;
void addSpilledDataSize(size_t added_size) { spilled_data_size += added_size; }
size_t getSpilledDataSize() const { return spilled_data_size; }
size_t getSpilledRows() const { return details.rows; }
const SpillDetails & getSpillDetails() const { return details; }
void updateSpillDetails(const SpillDetails & other_details) { details.merge(other_details); }

private:
size_t spilled_data_size = 0;
SpillDetails details;
FileProviderPtr file_provider;
};

Expand All @@ -48,13 +67,13 @@ struct SpilledFiles
class Spiller
{
public:
Spiller(const SpillConfig & config, bool is_input_sorted, size_t partition_num, const Block & input_schema, const LoggerPtr & logger);
Spiller(const SpillConfig & config, bool is_input_sorted, size_t partition_num, const Block & input_schema, const LoggerPtr & logger, Int64 spill_version = 1);
void spillBlocks(const Blocks & blocks, size_t partition_id);
/// spill blocks by reading from BlockInputStream, this is more memory friendly compared to spillBlocks
void spillBlocksUsingBlockInputStream(IBlockInputStream & block_in, size_t partition_id, const std::function<bool()> & is_cancelled);
/// max_stream_size == 0 means the spiller choose the stream size automatically
BlockInputStreams restoreBlocks(size_t partition_id, size_t max_stream_size = 0);
size_t spilledBlockDataSize(size_t partition_id);
size_t spilledRows(size_t partition_id);
void finishSpill() { spill_finished = true; };
bool hasSpilledData() { return has_spilled_data; };

Expand All @@ -73,6 +92,7 @@ class Spiller
std::atomic<bool> has_spilled_data{false};
static std::atomic<Int64> tmp_file_index;
std::vector<std::unique_ptr<SpilledFiles>> spilled_files;
Int64 spill_version = 1;
};

} // namespace DB
6 changes: 3 additions & 3 deletions dbms/src/Core/tests/gtest_spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ String SpillerTest::spill_dir = DB::tests::TiFlashTestEnv::getTemporaryPath("spi
TEST_F(SpillerTest, SpilledFileAutoRemove)
try
{
auto file_name = spill_dir + "spilled_file_auto_remove";
auto file_name = spill_config_ptr->spill_dir + "spilled_file_auto_remove";
{
SpilledFile test_file(file_name, spill_config_ptr->file_provider);
test_file.createFile();
Expand Down Expand Up @@ -166,11 +166,11 @@ try
{
auto blocks = generateBlocks(3);
for (const auto & block : blocks)
ref += block.bytes();
ref += block.rows();
spiller.spillBlocks(blocks, 0);
}
spiller.finishSpill();
GTEST_ASSERT_EQ(ref, spiller.spilledBlockDataSize(0));
GTEST_ASSERT_EQ(ref, spiller.spilledRows(0));
}
CATCH

Expand Down
23 changes: 5 additions & 18 deletions dbms/src/DataStreams/AggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ Block AggregatingBlockInputStream::readImpl()
};
aggregator.setCancellationHook(hook);

aggregator.execute(children.back(), *data_variants, file_provider);
aggregator.execute(children.back(), *data_variants);

if (!aggregator.hasTemporaryFiles())
if (!aggregator.hasSpilledData())
{
ManyAggregatedDataVariants many_data{data_variants};
impl = aggregator.mergeAndConvertToBlocks(many_data, final, 1);
Expand All @@ -52,23 +52,10 @@ Block AggregatingBlockInputStream::readImpl()
{
/// Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data.
if (!data_variants->empty())
aggregator.writeToTemporaryFile(*data_variants, file_provider);
aggregator.spill(*data_variants);
}

const auto & files = aggregator.getTemporaryFiles();
BlockInputStreams input_streams;
for (const auto & file : files.files)
{
temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path(), file_provider));
input_streams.emplace_back(temporary_inputs.back()->block_in);
}

LOG_TRACE(log,
"Will merge {} temporary files of size {:.2f} MiB compressed, {:.2f} MiB uncompressed.",
files.files.size(),
(files.sum_size_compressed / 1048576.0),
(files.sum_size_uncompressed / 1048576.0));

aggregator.finishSpill();
BlockInputStreams input_streams = aggregator.restoreSpilledData();
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(input_streams, params, final, 1, 1, log->identifier());
}
}
Expand Down
Loading

0 comments on commit d477e2c

Please sign in to comment.