Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-668/477] Predicator push down #329

Merged
merged 13 commits into from
Nov 26, 2019
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ add_headers_and_sources(dbms src/Storages)
add_headers_and_sources(dbms src/Storages/DeltaMerge)
add_headers_and_sources(dbms src/Storages/DeltaMerge/Index)
add_headers_and_sources(dbms src/Storages/DeltaMerge/Filter)
add_headers_and_sources(dbms src/Storages/DeltaMerge/FilterParser)
add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Storages/Transaction)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@
M(DMSegmentMergeNS) \
M(DMFlushDeltaCache) \
M(DMFlushDeltaCacheNS) \
M(DMWriteChunksWriteRows) \
M(DMWriteChunksCopyRows) \


namespace ProfileEvents
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ struct Settings
M(SettingUInt64, dm_segment_delta_cache_limit_rows, 1024, "Max rows of cache in segment's delta in DeltaMerge Engine")\
M(SettingUInt64, dm_segment_delta_cache_limit_bytes, 16777216, "Max bytes of cache in segment's delta in DeltaMerge Engine")\
M(SettingUInt64, dm_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaMerge Engine. By default '0' means no limit.")\
M(SettingBool, dm_enable_rough_set_filter, true, "whether to parse where expression as Rough Set Index filter or not") \
\
M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \
M(SettingUInt64, max_bytes_in_set, 0, "Maximum size of the set (in bytes in memory) resulting from the execution of the IN section.") \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Server/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@
<kvstore_path>/var/lib/clickhouse/kvstore</kvstore_path>
<regmap>/var/lib/clickhouse/regmap</regmap>
<pd_addr>http://127.0.0.1:13579</pd_addr>
<!--specify what engine we use. tmt or dm -->
<storage_engine>tmt</storage_engine>
<disable_bg_flush>false</disable_bg_flush>
</raft>

Expand Down
22 changes: 15 additions & 7 deletions dbms/src/Storages/DeltaMerge/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ class Chunk
void serialize(WriteBuffer & buf) const;
static Chunk deserialize(ReadBuffer & buf);

String info() const
{
if (likely(!is_delete_range))
return "Chunk[" + DB::toString(handle_start) + "," + DB::toString(handle_end) + "]";
else
return "DeleteRange[" + DB::toString(handle_start) + "," + DB::toString(handle_end) + ")";
}

private:
Handle handle_start;
Handle handle_end;
Expand All @@ -123,15 +131,15 @@ using GenPageId = std::function<PageId()>;
Chunk createRefChunk(const Chunk & chunk, const GenPageId & gen_data_page_id, WriteBatch & wb);
Chunks createRefChunks(const Chunks & chunks, const GenPageId & gen_data_page_id, WriteBatch & wb);

void serializeChunks(WriteBuffer & buf,
Chunks::const_iterator begin,
void serializeChunks(WriteBuffer & buf,
Chunks::const_iterator begin,
Chunks::const_iterator end,
const Chunk * extra1 = nullptr,
const Chunk * extra2 = nullptr);
void serializeChunks(WriteBuffer & buf, //
Chunks::const_iterator begin,
const Chunk * extra1 = nullptr,
const Chunk * extra2 = nullptr);
void serializeChunks(WriteBuffer & buf, //
Chunks::const_iterator begin,
Chunks::const_iterator end,
const Chunks & extr_chunks);
const Chunks & extr_chunks);

Chunks deserializeChunks(ReadBuffer & buf);

Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ class ChunkBlockInputStream final : public IBlockInputStream
}
}

~ChunkBlockInputStream()
{
size_t num_skipped = 0;
for (const auto & is_skip : skip_chunks)
num_skipped += is_skip;

LOG_TRACE(&Logger::get("ChunkBlockInputStream"), String("Skip: ") << num_skipped << " / " << chunks.size() << " chunks");
}

String getName() const override { return "Chunk"; }
Block getHeader() const override { return toEmptyBlock(read_columns); }

Expand Down
16 changes: 1 addition & 15 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class DMVersionFilterBlockInputStream : public IProfilingBlockInputStream

bool initNextBlock()
{
raw_block = readNextBlock();
raw_block = ::DB::DM::readNextBlock(children.back());
if (!raw_block)
{
handle_col_data = nullptr;
Expand All @@ -94,20 +94,6 @@ class DMVersionFilterBlockInputStream : public IProfilingBlockInputStream
}
}

/// This method guarantees that the returned valid block is not empty.
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
Block readNextBlock()
{
while (true)
{
Block res = children.back()->read();
if (!res)
return {};
if (!res.rows())
continue;
return res;
}
}

private:
UInt64 version_limit;
Block header;
Expand Down
22 changes: 21 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <Common/typeid_cast.h>
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionsConversion.h>
Expand Down Expand Up @@ -123,7 +124,12 @@ inline PaddedPODArray<T> const * getColumnVectorDataPtr(const Block & block, siz
return toColumnVectorDataPtr<T>(block.getByPosition(pos).column);
}

inline void addColumnToBlock(Block & block, ColId col_id, const String & col_name, const DataTypePtr & col_type, const ColumnPtr & col, const Field & default_value = Field())
inline void addColumnToBlock(Block & block,
ColId col_id,
const String & col_name,
const DataTypePtr & col_type,
const ColumnPtr & col,
const Field & default_value = Field())
{
ColumnWithTypeAndName column(col, col_type, col_name, col_id, default_value);
block.insert(std::move(column));
Expand All @@ -137,6 +143,20 @@ inline Block toEmptyBlock(const ColumnDefines & columns)
return block;
}

/// This method guarantees that the returned valid block is not empty.
inline Block readNextBlock(const BlockInputStreamPtr & in)
{
while (true)
{
Block res = in->read();
if (!res)
return Block{};
if (!res.rows())
continue;
return res;
}
}

inline void convertColumn(Block & block, size_t pos, const DataTypePtr & to_type, const Context & context)
{
const IDataType * to_type_ptr = to_type.get();
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
const HandleRanges & sorted_ranges,
size_t num_streams,
UInt64 max_version,
const RSOperatorPtr & filter,
size_t expected_block_size)
{
SegmentReadTasks tasks;
Expand Down Expand Up @@ -417,7 +418,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
task.read_snapshot,
*storage_snapshot,
task.ranges,
{},
filter,
max_version,
std::max(expected_block_size, STABLE_CHUNK_ROWS));
};
Expand Down Expand Up @@ -995,7 +996,7 @@ inline void setColumnDefineDefaultValue(const AlterCommand & command, ColumnDefi
time_t time = 0;
ReadBufferFromMemory buf(date.data(), date.size());
readDateTimeText(time, buf);
return toField(time);
return toField((Int64)time);
}
case TypeIndex::Decimal32:
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class DeltaMergeStore : private boost::noncopyable
const HandleRanges & sorted_ranges,
size_t num_streams,
UInt64 max_version,
const RSOperatorPtr & filter,
size_t expected_block_size = STABLE_CHUNK_ROWS);

/// Force flush all data to disk.
Expand Down
124 changes: 117 additions & 7 deletions dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ namespace ProfileEvents
{
extern const Event DMFlushDeltaCache;
extern const Event DMFlushDeltaCacheNS;
extern const Event DMWriteChunksWriteRows;
extern const Event DMWriteChunksCopyRows;
} // namespace ProfileEvents

namespace DB
Expand Down Expand Up @@ -265,19 +267,127 @@ DiskValueSpacePtr DiskValueSpace::applyAppendTask(const OpContext & context, con
return {};
}

Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputStreamPtr & input_stream, WriteBatch & wb)
namespace
{
size_t findCutOffsetInNextBlock(const Block & cur_block, const Block & next_block, const String & pk_column_name)
{
assert(cur_block);
if (!next_block)
return 0;

auto cur_col = cur_block.getByName(pk_column_name).column;
const Int64 last_curr_pk = cur_col->getInt(cur_col->size() - 1);
auto next_col = next_block.getByName(pk_column_name).column;
size_t cut_offset = 0;
for (/* */; cut_offset < next_col->size(); ++cut_offset)
{
const Int64 next_pk = next_col->getInt(cut_offset);
if (next_pk != last_curr_pk)
{
if constexpr (DM_RUN_CHECK)
{
if (unlikely(next_pk < last_curr_pk))
throw Exception("InputStream is not sorted, pk in next block is smaller than current block: " + toString(next_pk)
+ " < " + toString(last_curr_pk),
ErrorCodes::LOGICAL_ERROR);
}
break;
}
}
return cut_offset;
}
} // namespace

Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputStreamPtr & sorted_input_stream, WriteBatch & wb)
{
const String & pk_column_name = context.dm_context.handle_column.name;
if constexpr (DM_RUN_CHECK)
{
// Sanity check for existence of pk column
assert(EXTRA_HANDLE_COLUMN_TYPE->equals(*DataTypeFactory::instance().get("Int64")));
Block header = sorted_input_stream->getHeader();
if (!header.has(pk_column_name))
{
throw Exception("Try to write block to Chunk without pk column", ErrorCodes::LOGICAL_ERROR);
}
}

// TODO: investigate which way is better for scan: written by chunks vs written by columns.
Chunks chunks;
Block cur_block = ::DB::DM::readNextBlock(sorted_input_stream);
Block next_block;
while (true)
{
Block block = input_stream->read();
if (!block)
if (!cur_block)
break;
if (!block.rows())
continue;
Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, block);
chunks.push_back(std::move(chunk));

next_block = ::DB::DM::readNextBlock(sorted_input_stream);

const size_t cut_offset = findCutOffsetInNextBlock(cur_block, next_block, pk_column_name);
if (cut_offset != 0)
{
const size_t next_block_nrows = next_block.rows();
for (size_t col_idx = 0; col_idx != cur_block.columns(); ++col_idx)
{
auto & cur_col_with_name = cur_block.getByPosition(col_idx);
auto & next_col_with_name = next_block.getByPosition(col_idx);
auto * cur_col_raw = const_cast<IColumn *>(cur_col_with_name.column.get());
cur_col_raw->insertRangeFrom(*next_col_with_name.column, 0, cut_offset);
if (cut_offset != next_block_nrows)
{
// TODO: we can track the valid range instead of copying data.
size_t nrows_to_copy = next_block_nrows - cut_offset;
ProfileEvents::increment(ProfileEvents::DMWriteChunksCopyRows, nrows_to_copy);
// Pop front `cut_offset` elems from `next_col_with_name`
assert(next_block_nrows == next_col_with_name.column->size());
MutableColumnPtr cutted_next_column = next_col_with_name.column->cloneEmpty();
cutted_next_column->insertRangeFrom(*next_col_with_name.column, cut_offset, nrows_to_copy);
next_col_with_name.column = cutted_next_column->getPtr();
}
}
if (cut_offset != next_block_nrows)
{
// We merge some rows to `cur_block`, make it as a chunk.
Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block);
ProfileEvents::increment(ProfileEvents::DMWriteChunksWriteRows, chunk.getRows());
chunks.emplace_back(std::move(chunk));
cur_block = next_block;
}
// else we merge all rows from `next_block` to `cur_block`, continue to check if we should merge more blocks.
}
else
{
// There is no pk overlap between `cur_block` and `next_block`, just write `cur_block`.
Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block);
ProfileEvents::increment(ProfileEvents::DMWriteChunksWriteRows, chunk.getRows());
chunks.emplace_back(std::move(chunk));
cur_block = next_block;
}
}
if constexpr (DM_RUN_CHECK)
{
// Sanity check
if (chunks.size() > 1)
{
for (size_t i = 1; i < chunks.size(); ++i)
{
const Chunk & prev = chunks[i - 1];
const Chunk & curr = chunks[i];
if (prev.isDeleteRange() || curr.isDeleteRange())
{
throw Exception("Unexpected DeleteRange in stable inputstream. prev:" + prev.info() + " curr: " + curr.info(),
ErrorCodes::LOGICAL_ERROR);
}

const HandlePair prev_handle = prev.getHandleFirstLast();
const HandlePair curr_handle = curr.getHandleFirstLast();
// pk should be increasing and no overlap between chunks
if (prev_handle.second >= curr_handle.first)
{
throw Exception("Overlap chunks between " + prev.info() + " and " + curr.info(), ErrorCodes::LOGICAL_ERROR);
}
}
}
}
return chunks;
}
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/DiskValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ class DiskValueSpace
AppendTaskPtr createAppendTask(const OpContext & context, WriteBatches & wbs, const BlockOrDelete & update) const;
DiskValueSpacePtr applyAppendTask(const OpContext & context, const AppendTaskPtr & task, const BlockOrDelete & update);

/// Write the blocks from input_stream into underlying storage, the returned chunks can be added to
/// Write the blocks from sorted_input_stream into underlying storage, the returned chunks can be added to
/// specified value space instance by #setChunks or #appendChunkWithCache later.
static Chunks writeChunks(const OpContext & context, const BlockInputStreamPtr & input_stream, WriteBatch & wb);
static Chunks writeChunks(const OpContext & context, const BlockInputStreamPtr & sorted_input_stream, WriteBatch & wb);

static Chunk writeDelete(const OpContext & context, const HandleRange & delete_range);

Expand Down Expand Up @@ -186,6 +186,7 @@ class DiskValueSpace

DiskValueSpacePtr tryFlushCache(const OpContext & context, WriteBatch & remove_data_wb, bool force = false);

// TODO: getInputStream can be removed
ChunkBlockInputStreamPtr getInputStream(const ColumnDefines & read_columns, const PageReader & page_reader) const;

MutableColumnMap cloneCache();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ RSOperatorPtr createNotEqual(const Attr & attr, const Field & value)
RSOperatorPtr createNotIn(const Attr & attr, const Fields & values) { return std::make_shared<NotIn>(attr, values); }
RSOperatorPtr createNotLike(const Attr & attr, const Field & value) { return std::make_shared<NotLike>(attr, value); }
RSOperatorPtr createOr(const RSOperators & children) { return std::make_shared<Or>(children); }
RSOperatorPtr createUnsupported(const String & content, bool is_not) { return std::make_shared<Unsupported>(content, is_not); }
RSOperatorPtr createUnsupported(const String & content, const String & reason, bool is_not) { return std::make_shared<Unsupported>(content, reason, is_not); }
// clang-format on
} // namespace DM
} // namespace DB
Loading