Skip to content

Commit

Permalink
Enhance tests
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed Feb 6, 2024
1 parent 25c9100 commit 7e3b18c
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 113 deletions.
14 changes: 13 additions & 1 deletion dbms/src/DataTypes/IDataType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,26 @@ size_t IDataType::getSizeOfValueInMemory() const

bool IDataType::isNullMap(const IDataType::SubstreamPath & path)
{
for (const Substream & elem : path)
for (const auto & elem : path)
{
if (elem.type == Substream::NullMap)
return true;
}
return false;
}

bool IDataType::isArraySizes(const SubstreamPath & path)
{
for (const auto & elem : path)
{
if (elem.type == IDataType::Substream::ArraySizes)
{
return true;
}
}
return false;
}

String IDataType::getFileNameForStream(const String & column_name, const IDataType::SubstreamPath & path)
{
String nested_table_name = Nested::extractTableName(column_name);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataTypes/IDataType.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ class IDataType : private boost::noncopyable
static String getFileNameForStream(const String & column_name, const SubstreamPath & path);

static bool isNullMap(const SubstreamPath & path);
static bool isArraySizes(const SubstreamPath & path);
};


Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/ColumnStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@ class ColumnReadStream
const LoggerPtr & log,
const ReadLimiterPtr & read_limiter);

double avg_size_hint;
MarksInCompressedFilePtr marks;

size_t getOffsetInFile(size_t i) const { return (*marks)[i].offset_in_compressed_file; }

size_t getOffsetInDecompressedBlock(size_t i) const { return (*marks)[i].offset_in_decompressed_block; }

double avg_size_hint;
MarksInCompressedFilePtr marks;
std::unique_ptr<CompressedSeekableReaderBuffer> buf;

private:
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ size_t DMFile::colDataSize(ColId id, bool is_null_map, bool is_array_map)
}
else
{
RUNTIME_CHECK_MSG(!is_array_map, "Can not get array map size, col_id={}", id);
auto namebase = is_null_map ? getFileNameBase(id, {IDataType::Substream::NullMap}) : getFileNameBase(id);
return colDataSizeByName(namebase);
}
Expand Down
79 changes: 55 additions & 24 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Columns/ColumnsCommon.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Common/MemoryTracker.h>
#include <Common/Stopwatch.h>
#include <Common/escapeForFileName.h>
Expand Down Expand Up @@ -540,31 +541,61 @@ void DMFileReader::readFromDisk(
bool force_seek)
{
const auto stream_name = DMFile::getFileNameBase(column_define.id);
if (auto iter = column_streams.find(stream_name); iter != column_streams.end())
{
auto & top_stream = iter->second;
bool should_seek = force_seek || shouldSeek(start_pack_id) || skip_packs > 0;
auto data_type = dmfile->getColumnStat(column_define.id).type;
data_type->deserializeBinaryBulkWithMultipleStreams( //
*column,
[&](const IDataType::SubstreamPath & substream_path) {
const auto substream_name = DMFile::getFileNameBase(column_define.id, substream_path);
auto & sub_stream = column_streams.at(substream_name);
auto iter = column_streams.find(stream_name);
#ifndef NDEBUG
RUNTIME_CHECK_MSG(
iter != column_streams.end(),
"Can not find column_stream, column_id={} stream_name={}",
column_define.id,
stream_name);

// FIXME: remove
LOG_INFO(Logger::get(), "reading stream, column_id={} stream_name={}", column_define.id, stream_name);
#endif
auto & top_stream = iter->second;
bool should_seek = force_seek || shouldSeek(start_pack_id) || skip_packs > 0;
auto data_type = dmfile->getColumnStat(column_define.id).type;
data_type->deserializeBinaryBulkWithMultipleStreams( //
*column,
[&](const IDataType::SubstreamPath & substream_path) {
const auto substream_name = DMFile::getFileNameBase(column_define.id, substream_path);
auto & sub_stream = column_streams.at(substream_name);
// FIXME: remove
LOG_INFO(
Logger::get(),
"reading substream, column_id={} stream_name={} substream_name={} should_seek={}",
column_define.id,
stream_name,
substream_name,
should_seek);

if (should_seek)
{
sub_stream->buf->seek(
sub_stream->getOffsetInFile(start_pack_id),
sub_stream->getOffsetInDecompressedBlock(start_pack_id));
}
return sub_stream->buf.get();
},
read_rows,
top_stream->avg_size_hint,
true,
{});
IDataType::updateAvgValueSizeHint(*column, top_stream->avg_size_hint);
}
if (should_seek)
{
LOG_INFO(
Logger::get(),
"reading substream, column_id={} substream_name={} seek_compress_off={} seek_decompress_off={}",
column_define.id,
substream_name,
sub_stream->getOffsetInFile(start_pack_id),
sub_stream->getOffsetInDecompressedBlock(start_pack_id));
sub_stream->buf->seek(
sub_stream->getOffsetInFile(start_pack_id),
sub_stream->getOffsetInDecompressedBlock(start_pack_id));
}
return sub_stream->buf.get();
},
read_rows,
top_stream->avg_size_hint,
true,
{});
// FIXME: remove
LOG_INFO(
Logger::get(),
"reading stream, column_id={}, stream_name={} size={}",
column_define.id,
stream_name,
column->size());
IDataType::updateAvgValueSizeHint(*column, top_stream->avg_size_hint);
}

void DMFileReader::readColumn(
Expand Down
108 changes: 44 additions & 64 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index)
{
auto callback = [&](const IDataType::SubstreamPath & substream_path) {
const auto stream_name = DMFile::getFileNameBase(col_id, substream_path);
bool substream_do_index
= do_index && !IDataType::isNullMap(substream_path) && !IDataType::isArraySizes(substream_path);
auto stream = std::make_unique<Stream>(
dmfile,
stream_name,
Expand All @@ -117,8 +119,7 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index)
options.max_compress_block_size,
file_provider,
write_limiter,
IDataType::isNullMap(substream_path) ? false : do_index // TODO: ignore array_size
);
substream_do_index);
column_streams.emplace(stream_name, std::move(stream));
};

Expand All @@ -128,6 +129,11 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index)

void DMFileWriter::write(const Block & block, const BlockProperty & block_property)
{
#ifndef NDEBUG
// In the prod env, the #rows is checked in upper level
block.checkNumberOfRows();
#endif

is_empty_file = false;
DMFile::PackStat stat{};
stat.rows = block.rows();
Expand All @@ -151,6 +157,8 @@ void DMFileWriter::write(const Block & block, const BlockProperty & block_proper
}

dmfile->addPack(stat);
// FIXME: remove
LOG_INFO(Logger::get(), "write pack, rows={}", stat.rows);

auto & properties = dmfile->getPackProperties();
auto * property = properties.add_property();
Expand Down Expand Up @@ -263,14 +271,8 @@ void DMFileWriter::writeColumn(

void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
{
size_t bytes_written = 0;
size_t data_bytes = 0;
size_t mark_bytes = 0;
size_t nullmap_data_bytes = 0;
size_t nullmap_mark_bytes = 0;
size_t index_bytes = 0;
size_t array_sizes_bytes = 0;
size_t array_sizes_mark_bytes = 0;
// Update column's bytes in memory
auto & col_stat = dmfile->column_stats.at(col_id);

#ifndef NDEBUG
auto examine_buffer_size = [&](auto & buf, auto & fp) {
Expand All @@ -283,33 +285,13 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
}
};
#endif
auto is_nullmap_stream = [](const IDataType::SubstreamPath & substream) {
for (const auto & s : substream)
{
if (s.type == IDataType::Substream::NullMap)
{
return true;
}
}
return false;
};
auto is_array_sizes_stream = [](const IDataType::SubstreamPath & substream) {
for (const auto & s : substream)
{
if (s.type == IDataType::Substream::ArraySizes)
{
return true;
}
}
return false;
};

auto callback = [&](const IDataType::SubstreamPath & substream) {
const auto stream_name = DMFile::getFileNameBase(col_id, substream);
auto & stream = column_streams.at(stream_name);

const bool is_null = is_nullmap_stream(substream);
const bool is_array = is_array_sizes_stream(substream);
const bool is_null = IDataType::isNullMap(substream);
const bool is_array = IDataType::isArraySizes(substream);

// FIXME: remove
LOG_INFO(Logger::get(), "stream_name={} is_null={} is_array={}", stream_name, is_null, is_array);
Expand All @@ -320,19 +302,19 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
stream->plain_file->next();
stream->plain_file->sync();

bytes_written += stream->plain_file->getMaterializedBytes();
col_stat.serialized_bytes += stream->plain_file->getMaterializedBytes();

if (is_null)
{
nullmap_data_bytes = stream->plain_file->getMaterializedBytes();
col_stat.nullmap_data_bytes = stream->plain_file->getMaterializedBytes();
}
else if (is_array)
{
array_sizes_bytes = stream->plain_file->getMaterializedBytes();
col_stat.array_sizes_bytes = stream->plain_file->getMaterializedBytes();
}
else
{
data_bytes = stream->plain_file->getMaterializedBytes();
col_stat.data_bytes = stream->plain_file->getMaterializedBytes();
}

#ifndef NDEBUG
Expand All @@ -353,11 +335,16 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)

stream->minmaxes->write(*type, *buffer);

index_bytes = buffer->getMaterializedBytes();
MergedSubFileInfo info{fname, merged_file.file_info.number, merged_file.file_info.size, index_bytes};
col_stat.index_bytes = buffer->getMaterializedBytes();

MergedSubFileInfo info{
fname,
merged_file.file_info.number,
merged_file.file_info.size,
col_stat.index_bytes};
dmfile->merged_sub_file_infos[fname] = info;

merged_file.file_info.size += index_bytes;
merged_file.file_info.size += col_stat.index_bytes;
buffer->next();
}

Expand Down Expand Up @@ -388,15 +375,15 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)

if (is_null)
{
nullmap_mark_bytes = mark_size;
col_stat.nullmap_mark_bytes = mark_size;
}
else if (is_array)
{
array_sizes_mark_bytes = mark_size;
col_stat.array_sizes_mark_bytes = mark_size;
}
else
{
mark_bytes = mark_size;
col_stat.mark_bytes = mark_size;
}
}
}
Expand All @@ -411,32 +398,35 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
examine_buffer_size(*stream->mark_file, *this->file_provider);
examine_buffer_size(*stream->plain_file, *this->file_provider);
#endif
// TODO: these two branch can be merge.
if (!dmfile->configuration)
{ // v1
bytes_written += stream->plain_file->getMaterializedBytes() + stream->mark_file->getMaterializedBytes();
col_stat.serialized_bytes
+= stream->plain_file->getMaterializedBytes() + stream->mark_file->getMaterializedBytes();
if (is_null)
{
nullmap_data_bytes = stream->plain_file->getMaterializedBytes();
nullmap_mark_bytes = stream->mark_file->getMaterializedBytes();
col_stat.nullmap_data_bytes = stream->plain_file->getMaterializedBytes();
col_stat.nullmap_mark_bytes = stream->mark_file->getMaterializedBytes();
}
else
{
data_bytes = stream->plain_file->getMaterializedBytes();
mark_bytes = stream->mark_file->getMaterializedBytes();
col_stat.data_bytes = stream->plain_file->getMaterializedBytes();
col_stat.mark_bytes = stream->mark_file->getMaterializedBytes();
}
}
else
{ // v2
bytes_written += stream->plain_file->getMaterializedBytes() + stream->mark_file->getMaterializedBytes();
col_stat.serialized_bytes
+= stream->plain_file->getMaterializedBytes() + stream->mark_file->getMaterializedBytes();
if (is_null)
{
nullmap_data_bytes = stream->plain_file->getMaterializedBytes();
nullmap_mark_bytes = stream->mark_file->getMaterializedBytes();
col_stat.nullmap_data_bytes = stream->plain_file->getMaterializedBytes();
col_stat.nullmap_mark_bytes = stream->mark_file->getMaterializedBytes();
}
else
{
data_bytes = stream->plain_file->getMaterializedBytes();
mark_bytes = stream->mark_file->getMaterializedBytes();
col_stat.data_bytes = stream->plain_file->getMaterializedBytes();
col_stat.mark_bytes = stream->mark_file->getMaterializedBytes();
}
}

Expand All @@ -458,8 +448,8 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
// This is ok because the index file in this case is tiny, and we already ignore other small files like meta and pack stat file.
// The motivation to do this is to show a zero `stable_size_on_disk` for empty segments,
// and we cannot change the index file format for empty dmfile because of backward compatibility.
index_bytes = buf->getMaterializedBytes();
bytes_written += is_empty_file ? 0 : index_bytes;
col_stat.index_bytes = buf->getMaterializedBytes();
col_stat.serialized_bytes += is_empty_file ? 0 : col_stat.index_bytes;
#ifndef NDEBUG
if (dmfile->configuration)
{
Expand All @@ -471,16 +461,6 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
};
type->enumerateStreams(callback, {});

// Update column's bytes in disk
auto & col_stat = dmfile->column_stats.at(col_id);
col_stat.serialized_bytes = bytes_written;
col_stat.data_bytes = data_bytes;
col_stat.mark_bytes = mark_bytes;
col_stat.nullmap_data_bytes = nullmap_data_bytes;
col_stat.index_bytes = index_bytes;
col_stat.nullmap_mark_bytes = nullmap_mark_bytes;
col_stat.array_sizes_bytes = array_sizes_bytes;
col_stat.array_sizes_mark_bytes = array_sizes_mark_bytes;
// FIXME: remove
LOG_INFO(Logger::get(), "col_id={} stat={}", col_id, col_stat.toProto().ShortDebugString());
}
Expand Down
Loading

0 comments on commit 7e3b18c

Please sign in to comment.