diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt
index 286cc4cf37b..3baa1484adf 100644
--- a/dbms/CMakeLists.txt
+++ b/dbms/CMakeLists.txt
@@ -57,6 +57,7 @@ add_headers_and_sources(dbms src/Storages)
add_headers_and_sources(dbms src/Storages/DeltaMerge)
add_headers_and_sources(dbms src/Storages/DeltaMerge/Index)
add_headers_and_sources(dbms src/Storages/DeltaMerge/Filter)
+add_headers_and_sources(dbms src/Storages/DeltaMerge/FilterParser)
add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Storages/Transaction)
diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp
index 53edcb41565..d9642007deb 100644
--- a/dbms/src/Common/ProfileEvents.cpp
+++ b/dbms/src/Common/ProfileEvents.cpp
@@ -197,6 +197,8 @@
M(DMSegmentMergeNS) \
M(DMFlushDeltaCache) \
M(DMFlushDeltaCacheNS) \
+ M(DMWriteChunksWriteRows) \
+ M(DMWriteChunksCopyRows) \
namespace ProfileEvents
diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h
index 44fdf168c9e..2da14dfc435 100644
--- a/dbms/src/Interpreters/Settings.h
+++ b/dbms/src/Interpreters/Settings.h
@@ -244,6 +244,7 @@ struct Settings
M(SettingUInt64, dm_segment_delta_cache_limit_rows, 1024, "Max rows of cache in segment's delta in DeltaMerge Engine")\
M(SettingUInt64, dm_segment_delta_cache_limit_bytes, 16777216, "Max bytes of cache in segment's delta in DeltaMerge Engine")\
M(SettingUInt64, dm_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaMerge Engine. By default '0' means no limit.")\
+ M(SettingBool, dm_enable_rough_set_filter, true, "whether to parse where expression as Rough Set Index filter or not") \
\
M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \
M(SettingUInt64, max_bytes_in_set, 0, "Maximum size of the set (in bytes in memory) resulting from the execution of the IN section.") \
diff --git a/dbms/src/Server/config.xml b/dbms/src/Server/config.xml
index 8507e1c412a..6f49c2da5bb 100644
--- a/dbms/src/Server/config.xml
+++ b/dbms/src/Server/config.xml
@@ -316,6 +316,8 @@
/var/lib/clickhouse/kvstore
/var/lib/clickhouse/regmap
http://127.0.0.1:13579
+
+ tmt
false
diff --git a/dbms/src/Storages/DeltaMerge/Chunk.h b/dbms/src/Storages/DeltaMerge/Chunk.h
index d0b4dda2b99..4d215b1b4fa 100644
--- a/dbms/src/Storages/DeltaMerge/Chunk.h
+++ b/dbms/src/Storages/DeltaMerge/Chunk.h
@@ -108,6 +108,14 @@ class Chunk
void serialize(WriteBuffer & buf) const;
static Chunk deserialize(ReadBuffer & buf);
+ String info() const
+ {
+ if (likely(!is_delete_range))
+ return "Chunk[" + DB::toString(handle_start) + "," + DB::toString(handle_end) + "]";
+ else
+ return "DeleteRange[" + DB::toString(handle_start) + "," + DB::toString(handle_end) + ")";
+ }
+
private:
Handle handle_start;
Handle handle_end;
@@ -123,15 +131,15 @@ using GenPageId = std::function;
Chunk createRefChunk(const Chunk & chunk, const GenPageId & gen_data_page_id, WriteBatch & wb);
Chunks createRefChunks(const Chunks & chunks, const GenPageId & gen_data_page_id, WriteBatch & wb);
-void serializeChunks(WriteBuffer & buf,
- Chunks::const_iterator begin,
+void serializeChunks(WriteBuffer & buf,
+ Chunks::const_iterator begin,
Chunks::const_iterator end,
- const Chunk * extra1 = nullptr,
- const Chunk * extra2 = nullptr);
-void serializeChunks(WriteBuffer & buf, //
- Chunks::const_iterator begin,
+ const Chunk * extra1 = nullptr,
+ const Chunk * extra2 = nullptr);
+void serializeChunks(WriteBuffer & buf, //
+ Chunks::const_iterator begin,
Chunks::const_iterator end,
- const Chunks & extr_chunks);
+ const Chunks & extr_chunks);
Chunks deserializeChunks(ReadBuffer & buf);
diff --git a/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h b/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h
index 0f5f8635103..23e0eaa62cd 100644
--- a/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h
+++ b/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h
@@ -32,6 +32,15 @@ class ChunkBlockInputStream final : public IBlockInputStream
}
}
+ ~ChunkBlockInputStream()
+ {
+ size_t num_skipped = 0;
+ for (const auto & is_skip : skip_chunks)
+ num_skipped += is_skip;
+
+ LOG_TRACE(&Logger::get("ChunkBlockInputStream"), String("Skip: ") << num_skipped << " / " << chunks.size() << " chunks");
+ }
+
String getName() const override { return "Chunk"; }
Block getHeader() const override { return toEmptyBlock(read_columns); }
diff --git a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h
index fe1ea157cdc..31de950323e 100644
--- a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h
+++ b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h
@@ -77,7 +77,7 @@ class DMVersionFilterBlockInputStream : public IProfilingBlockInputStream
bool initNextBlock()
{
- raw_block = readNextBlock();
+ raw_block = ::DB::DM::readNextBlock(children.back());
if (!raw_block)
{
handle_col_data = nullptr;
@@ -94,20 +94,6 @@ class DMVersionFilterBlockInputStream : public IProfilingBlockInputStream
}
}
- /// This method guarantees that the returned valid block is not empty.
- Block readNextBlock()
- {
- while (true)
- {
- Block res = children.back()->read();
- if (!res)
- return {};
- if (!res.rows())
- continue;
- return res;
- }
- }
-
private:
UInt64 version_limit;
Block header;
diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h
index 5371f6209f2..bb6beb415ec 100644
--- a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h
+++ b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h
@@ -6,6 +6,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -123,7 +124,12 @@ inline PaddedPODArray const * getColumnVectorDataPtr(const Block & block, siz
return toColumnVectorDataPtr(block.getByPosition(pos).column);
}
-inline void addColumnToBlock(Block & block, ColId col_id, const String & col_name, const DataTypePtr & col_type, const ColumnPtr & col, const Field & default_value = Field())
+inline void addColumnToBlock(Block & block,
+ ColId col_id,
+ const String & col_name,
+ const DataTypePtr & col_type,
+ const ColumnPtr & col,
+ const Field & default_value = Field())
{
ColumnWithTypeAndName column(col, col_type, col_name, col_id, default_value);
block.insert(std::move(column));
@@ -137,6 +143,20 @@ inline Block toEmptyBlock(const ColumnDefines & columns)
return block;
}
+/// This method guarantees that the returned valid block is not empty.
+inline Block readNextBlock(const BlockInputStreamPtr & in)
+{
+ while (true)
+ {
+ Block res = in->read();
+ if (!res)
+ return Block{};
+ if (!res.rows())
+ continue;
+ return res;
+ }
+}
+
inline void convertColumn(Block & block, size_t pos, const DataTypePtr & to_type, const Context & context)
{
const IDataType * to_type_ptr = to_type.get();
diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
index 12ce6bd8d56..f88cffc1d3d 100644
--- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
+++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
@@ -323,6 +323,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
const HandleRanges & sorted_ranges,
size_t num_streams,
UInt64 max_version,
+ const RSOperatorPtr & filter,
size_t expected_block_size)
{
SegmentReadTasks tasks;
@@ -417,7 +418,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
task.read_snapshot,
*storage_snapshot,
task.ranges,
- {},
+ filter,
max_version,
std::max(expected_block_size, STABLE_CHUNK_ROWS));
};
@@ -995,7 +996,7 @@ inline void setColumnDefineDefaultValue(const AlterCommand & command, ColumnDefi
time_t time = 0;
ReadBufferFromMemory buf(date.data(), date.size());
readDateTimeText(time, buf);
- return toField(time);
+ return toField((Int64)time);
}
case TypeIndex::Decimal32:
{
diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
index 043fde899c6..57a7b5d0019 100644
--- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
+++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
@@ -189,6 +189,7 @@ class DeltaMergeStore : private boost::noncopyable
const HandleRanges & sorted_ranges,
size_t num_streams,
UInt64 max_version,
+ const RSOperatorPtr & filter,
size_t expected_block_size = STABLE_CHUNK_ROWS);
/// Force flush all data to disk.
diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp
index 716b3ec891b..0c361b51927 100644
--- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp
+++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp
@@ -14,6 +14,8 @@ namespace ProfileEvents
{
extern const Event DMFlushDeltaCache;
extern const Event DMFlushDeltaCacheNS;
+extern const Event DMWriteChunksWriteRows;
+extern const Event DMWriteChunksCopyRows;
} // namespace ProfileEvents
namespace DB
@@ -265,19 +267,127 @@ DiskValueSpacePtr DiskValueSpace::applyAppendTask(const OpContext & context, con
return {};
}
-Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputStreamPtr & input_stream, WriteBatch & wb)
+namespace
{
+size_t findCutOffsetInNextBlock(const Block & cur_block, const Block & next_block, const String & pk_column_name)
+{
+ assert(cur_block);
+ if (!next_block)
+ return 0;
+
+ auto cur_col = cur_block.getByName(pk_column_name).column;
+ const Int64 last_curr_pk = cur_col->getInt(cur_col->size() - 1);
+ auto next_col = next_block.getByName(pk_column_name).column;
+ size_t cut_offset = 0;
+ for (/* */; cut_offset < next_col->size(); ++cut_offset)
+ {
+ const Int64 next_pk = next_col->getInt(cut_offset);
+ if (next_pk != last_curr_pk)
+ {
+ if constexpr (DM_RUN_CHECK)
+ {
+ if (unlikely(next_pk < last_curr_pk))
+ throw Exception("InputStream is not sorted, pk in next block is smaller than current block: " + toString(next_pk)
+ + " < " + toString(last_curr_pk),
+ ErrorCodes::LOGICAL_ERROR);
+ }
+ break;
+ }
+ }
+ return cut_offset;
+}
+} // namespace
+
+Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputStreamPtr & sorted_input_stream, WriteBatch & wb)
+{
+ const String & pk_column_name = context.dm_context.handle_column.name;
+ if constexpr (DM_RUN_CHECK)
+ {
+ // Sanity check for existence of pk column
+ assert(EXTRA_HANDLE_COLUMN_TYPE->equals(*DataTypeFactory::instance().get("Int64")));
+ Block header = sorted_input_stream->getHeader();
+ if (!header.has(pk_column_name))
+ {
+ throw Exception("Try to write block to Chunk without pk column", ErrorCodes::LOGICAL_ERROR);
+ }
+ }
+
// TODO: investigate which way is better for scan: written by chunks vs written by columns.
Chunks chunks;
+ Block cur_block = ::DB::DM::readNextBlock(sorted_input_stream);
+ Block next_block;
while (true)
{
- Block block = input_stream->read();
- if (!block)
+ if (!cur_block)
break;
- if (!block.rows())
- continue;
- Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, block);
- chunks.push_back(std::move(chunk));
+
+ next_block = ::DB::DM::readNextBlock(sorted_input_stream);
+
+ const size_t cut_offset = findCutOffsetInNextBlock(cur_block, next_block, pk_column_name);
+ if (cut_offset != 0)
+ {
+ const size_t next_block_nrows = next_block.rows();
+ for (size_t col_idx = 0; col_idx != cur_block.columns(); ++col_idx)
+ {
+ auto & cur_col_with_name = cur_block.getByPosition(col_idx);
+ auto & next_col_with_name = next_block.getByPosition(col_idx);
+ auto * cur_col_raw = const_cast(cur_col_with_name.column.get());
+ cur_col_raw->insertRangeFrom(*next_col_with_name.column, 0, cut_offset);
+ if (cut_offset != next_block_nrows)
+ {
+ // TODO: we can track the valid range instead of copying data.
+ size_t nrows_to_copy = next_block_nrows - cut_offset;
+ ProfileEvents::increment(ProfileEvents::DMWriteChunksCopyRows, nrows_to_copy);
+ // Pop front `cut_offset` elems from `next_col_with_name`
+ assert(next_block_nrows == next_col_with_name.column->size());
+ MutableColumnPtr cutted_next_column = next_col_with_name.column->cloneEmpty();
+ cutted_next_column->insertRangeFrom(*next_col_with_name.column, cut_offset, nrows_to_copy);
+ next_col_with_name.column = cutted_next_column->getPtr();
+ }
+ }
+ if (cut_offset != next_block_nrows)
+ {
+ // We merge some rows to `cur_block`, make it as a chunk.
+ Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block);
+ ProfileEvents::increment(ProfileEvents::DMWriteChunksWriteRows, chunk.getRows());
+ chunks.emplace_back(std::move(chunk));
+ cur_block = next_block;
+ }
+ // else we merge all rows from `next_block` to `cur_block`, continue to check if we should merge more blocks.
+ }
+ else
+ {
+ // There is no pk overlap between `cur_block` and `next_block`, just write `cur_block`.
+ Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block);
+ ProfileEvents::increment(ProfileEvents::DMWriteChunksWriteRows, chunk.getRows());
+ chunks.emplace_back(std::move(chunk));
+ cur_block = next_block;
+ }
+ }
+ if constexpr (DM_RUN_CHECK)
+ {
+ // Sanity check
+ if (chunks.size() > 1)
+ {
+ for (size_t i = 1; i < chunks.size(); ++i)
+ {
+ const Chunk & prev = chunks[i - 1];
+ const Chunk & curr = chunks[i];
+ if (prev.isDeleteRange() || curr.isDeleteRange())
+ {
+ throw Exception("Unexpected DeleteRange in stable inputstream. prev:" + prev.info() + " curr: " + curr.info(),
+ ErrorCodes::LOGICAL_ERROR);
+ }
+
+ const HandlePair prev_handle = prev.getHandleFirstLast();
+ const HandlePair curr_handle = curr.getHandleFirstLast();
+ // pk should be increasing and no overlap between chunks
+ if (prev_handle.second >= curr_handle.first)
+ {
+ throw Exception("Overlap chunks between " + prev.info() + " and " + curr.info(), ErrorCodes::LOGICAL_ERROR);
+ }
+ }
+ }
}
return chunks;
}
diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.h b/dbms/src/Storages/DeltaMerge/DiskValueSpace.h
index 2b83dfeaf68..770938de26c 100644
--- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.h
+++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.h
@@ -148,9 +148,9 @@ class DiskValueSpace
AppendTaskPtr createAppendTask(const OpContext & context, WriteBatches & wbs, const BlockOrDelete & update) const;
DiskValueSpacePtr applyAppendTask(const OpContext & context, const AppendTaskPtr & task, const BlockOrDelete & update);
- /// Write the blocks from input_stream into underlying storage, the returned chunks can be added to
+ /// Write the blocks from sorted_input_stream into underlying storage, the returned chunks can be added to
/// specified value space instance by #setChunks or #appendChunkWithCache later.
- static Chunks writeChunks(const OpContext & context, const BlockInputStreamPtr & input_stream, WriteBatch & wb);
+ static Chunks writeChunks(const OpContext & context, const BlockInputStreamPtr & sorted_input_stream, WriteBatch & wb);
static Chunk writeDelete(const OpContext & context, const HandleRange & delete_range);
@@ -186,6 +186,7 @@ class DiskValueSpace
DiskValueSpacePtr tryFlushCache(const OpContext & context, WriteBatch & remove_data_wb, bool force = false);
+ // TODO: getInputStream can be removed
ChunkBlockInputStreamPtr getInputStream(const ColumnDefines & read_columns, const PageReader & page_reader) const;
MutableColumnMap cloneCache();
diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp
index a9d8afcec36..38eabad922e 100644
--- a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp
+++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp
@@ -33,7 +33,7 @@ RSOperatorPtr createNotEqual(const Attr & attr, const Field & value)
RSOperatorPtr createNotIn(const Attr & attr, const Fields & values) { return std::make_shared(attr, values); }
RSOperatorPtr createNotLike(const Attr & attr, const Field & value) { return std::make_shared(attr, value); }
RSOperatorPtr createOr(const RSOperators & children) { return std::make_shared(children); }
-RSOperatorPtr createUnsupported(const String & content, bool is_not) { return std::make_shared(content, is_not); }
+RSOperatorPtr createUnsupported(const String & content, const String & reason, bool is_not) { return std::make_shared(content, reason, is_not); }
// clang-format on
} // namespace DM
} // namespace DB
\ No newline at end of file
diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h
index c465835d419..e2885c0e85f 100644
--- a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h
+++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h
@@ -90,20 +90,25 @@ class LogicalOp : public RSOperator
return Some;
+// logical
+RSOperatorPtr createNot(const RSOperatorPtr & op);
+RSOperatorPtr createOr(const RSOperators & children);
RSOperatorPtr createAnd(const RSOperators & children);
+// compare
RSOperatorPtr createEqual(const Attr & attr, const Field & value);
+RSOperatorPtr createNotEqual(const Attr & attr, const Field & value);
RSOperatorPtr createGreater(const Attr & attr, const Field & value, int null_direction);
RSOperatorPtr createGreaterEqual(const Attr & attr, const Field & value, int null_direction);
-RSOperatorPtr createIn(const Attr & attr, const Fields & values);
RSOperatorPtr createLess(const Attr & attr, const Field & value, int null_direction);
RSOperatorPtr createLessEqual(const Attr & attr, const Field & value, int null_direction);
-RSOperatorPtr createLike(const Attr & attr, const Field & value);
-RSOperatorPtr createNot(const RSOperatorPtr & op);
-RSOperatorPtr createNotEqual(const Attr & attr, const Field & value);
+// set
+RSOperatorPtr createIn(const Attr & attr, const Fields & values);
RSOperatorPtr createNotIn(const Attr & attr, const Fields & values);
+//
+RSOperatorPtr createLike(const Attr & attr, const Field & value);
RSOperatorPtr createNotLike(const Attr & attr, const Field & values);
-RSOperatorPtr createOr(const RSOperators & children);
-RSOperatorPtr createUnsupported(const String & content, bool is_not);
+//
+RSOperatorPtr createUnsupported(const String & content, const String & reason, bool is_not);
} // namespace DM
diff --git a/dbms/src/Storages/DeltaMerge/Filter/Unsupported.h b/dbms/src/Storages/DeltaMerge/Filter/Unsupported.h
index d51167e4ce9..c7fa324bb15 100644
--- a/dbms/src/Storages/DeltaMerge/Filter/Unsupported.h
+++ b/dbms/src/Storages/DeltaMerge/Filter/Unsupported.h
@@ -11,23 +11,25 @@ namespace DM
class Unsupported : public RSOperator
{
String content;
+ String reason;
bool is_not;
public:
- Unsupported(const String & content_) : content(content_), is_not(false) {}
- Unsupported(const String & content_, bool is_not_) : content(content_), is_not(is_not_) {}
+ Unsupported(const String & content_, const String & reason_) : Unsupported(content_, reason_, false) {}
+ Unsupported(const String & content_, const String & reason_, bool is_not_) : content(content_), reason(reason_), is_not(is_not_) {}
String name() override { return "unsupported"; }
String toString() override
{
return R"({"op":")" + name() + //
+ R"(","reason":")" + reason + //
R"(","content":")" + content + //
R"(","is_not":")" + DB::toString(is_not) + "\"}";
}
RSResult roughCheck(const RSCheckParam & /*param*/) override { return Some; }
- RSOperatorPtr applyNot() override { return createUnsupported(content, !is_not); };
+ RSOperatorPtr applyNot() override { return createUnsupported(content, reason, !is_not); };
};
} // namespace DM
diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h
new file mode 100644
index 00000000000..4881c251ccb
--- /dev/null
+++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h
@@ -0,0 +1,60 @@
+#pragma once
+
+#include
+#include
+
+#include
+#include
+
+namespace DB
+{
+
+class ASTSelectQuery;
+
+struct DAGQueryInfo;
+
+namespace DM
+{
+
+class FilterParser
+{
+public:
+ /// From ast.
+ using AttrCreatorByColumnName = std::function;
+ static RSOperatorPtr parseSelectQuery(const ASTSelectQuery & query, AttrCreatorByColumnName && creator, Poco::Logger * log);
+
+public:
+ /// From dag.
+ using AttrCreatorByColumnID = std::function;
+ static RSOperatorPtr parseDAGQuery(const DAGQueryInfo & dag_info, AttrCreatorByColumnID && creator, Poco::Logger * log);
+
+ /// Some helper structur
+
+ enum RSFilterType
+ {
+ Unsupported = 0,
+
+ // logical
+ Not = 1,
+ Or,
+ And,
+ // compare
+ Equal,
+ NotEqual,
+ Greater,
+ GreaterEqual,
+ Less,
+ LessEuqal,
+
+ In,
+ NotIn,
+
+ Like,
+ NotLike,
+ };
+
+ static std::unordered_map scalar_func_rs_filter_map;
+};
+
+} // namespace DM
+} // namespace DB
diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp
new file mode 100644
index 00000000000..449e92f17c5
--- /dev/null
+++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_ast.cpp
@@ -0,0 +1,179 @@
+#include
+
+#include
+
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+extern const int COP_BAD_DAG_REQUEST;
+} // namespace ErrorCodes
+
+namespace DM
+{
+namespace ast
+{
+
+String astToDebugString(const IAST * const ast)
+{
+ std::stringstream ss;
+ ast->dumpTree(ss);
+ return ss.str();
+}
+
+RSOperatorPtr
+parseASTCompareFunction(const ASTFunction * const func, const FilterParser::AttrCreatorByColumnName & creator, Poco::Logger * /*log*/)
+{
+ if (unlikely(func->arguments->children.size() != 2))
+ return createUnsupported(astToDebugString(func),
+ func->name + " with " + DB::toString(func->arguments->children.size()) + " children is not supported",
+ false);
+
+ /// Only support `column` `op` `constant` now.
+
+ Attr attr;
+ Field value;
+ UInt32 state = 0x0;
+ constexpr UInt32 state_has_column = 0x1;
+ constexpr UInt32 state_has_literal = 0x2;
+ constexpr UInt32 state_finish = state_has_column | state_has_literal;
+ for (auto & child : func->arguments->children)
+ {
+ if (auto * id = dynamic_cast(child.get()); id != nullptr && id->kind == ASTIdentifier::Column)
+ {
+ state |= state_has_column;
+ const String & col_name = id->name;
+ attr = creator(col_name);
+ }
+ else if (auto * liter = dynamic_cast(child.get()); liter != nullptr)
+ {
+ state |= state_has_literal;
+ value = liter->value;
+ }
+ }
+
+ // TODO: null_direction
+ if (unlikely(state != state_finish))
+ return createUnsupported(astToDebugString(func), func->name + " with state " + DB::toString(state) + " is not supported", false);
+ else if (func->name == "equals")
+ return createEqual(attr, value);
+ else if (func->name == "notEquals")
+ return createNotEqual(attr, value);
+ else if (func->name == "greater")
+ return createGreater(attr, value, -1);
+ else if (func->name == "greaterOrEquals")
+ return createGreaterEqual(attr, value, -1);
+ else if (func->name == "less")
+ return createLess(attr, value, -1);
+ else if (func->name == "lessOrEquals")
+ return createLessEqual(attr, value, -1);
+ return createUnsupported(astToDebugString(func), "Unknown compare func: " + func->name, false);
+}
+
+RSOperatorPtr parseASTFunction(const ASTFunction * const func, const FilterParser::AttrCreatorByColumnName & creator, Poco::Logger * log)
+{
+ assert(func != nullptr);
+ RSOperatorPtr op = EMPTY_FILTER;
+
+ if (func->name == "equals" || func->name == "notEquals" //
+ || func->name == "greater" || func->name == "greaterOrEquals" //
+ || func->name == "less" || func->name == "lessOrEquals")
+ {
+ op = parseASTCompareFunction(func, creator, log);
+ }
+ else if (func->name == "or" || func->name == "and")
+ {
+ RSOperators children;
+ for (const auto & child : func->arguments->children)
+ {
+ ASTFunction * sub_func = dynamic_cast(child.get());
+ if (sub_func != nullptr)
+ {
+ children.emplace_back(parseASTFunction(sub_func, creator, log));
+ }
+ else
+ {
+ children.emplace_back(createUnsupported(astToDebugString(func), "child of logical operator is not function", false));
+ }
+ }
+ if (func->name == "or")
+ op = createOr(children);
+ else
+ op = createAnd(children);
+ }
+ else if (func->name == "not")
+ {
+ if (unlikely(func->arguments->children.size() != 1))
+ op = createUnsupported(
+ astToDebugString(func), "logical not with " + DB::toString(func->arguments->children.size()) + " children", false);
+ else
+ {
+ if (ASTFunction * sub_func = dynamic_cast(func->arguments->children[0].get()); sub_func != nullptr)
+ op = createNot(parseASTFunction(sub_func, creator, log));
+ else
+ op = createUnsupported(astToDebugString(func), "child of logical not is not function", false);
+ }
+ }
+#if 0
+ else if (func->name == "in")
+ {
+ }
+ else if (func->name == "notIn")
+ {
+ }
+ else if (func->name == "like")
+ {
+ }
+ else if (func->name == "notLike")
+ {
+ }
+#endif
+ else
+ {
+ op = createUnsupported(astToDebugString(func), "Function " + func->name + " is not supported", false);
+ }
+
+ return op;
+}
+
+} // namespace ast
+
+RSOperatorPtr FilterParser::parseSelectQuery(const ASTSelectQuery & query, AttrCreatorByColumnName && creator, Poco::Logger * log)
+{
+ RSOperatorPtr op = EMPTY_FILTER;
+ if (!query.where_expression)
+ return op;
+
+ const ASTFunction * where = dynamic_cast(query.where_expression.get());
+ if (!where)
+ {
+ const String debug_string = ast::astToDebugString(query.where_expression.get());
+ LOG_WARNING(log, "Where expression is not ASTFunction, can not parse to rough set index. Expr: " << debug_string);
+ return op;
+ }
+
+ if (log->trace())
+ {
+ std::string expr_tree = ast::astToDebugString(where);
+ LOG_TRACE(log, " where expr: " << expr_tree);
+ }
+
+ op = ast::parseASTFunction(where, creator, log);
+
+ return op;
+}
+
+} // namespace DM
+
+} // namespace DB
diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp
new file mode 100644
index 00000000000..abefa23e5d3
--- /dev/null
+++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser_dag.cpp
@@ -0,0 +1,793 @@
+#include
+
+#include
+
+#include
+
+#include
+#include
+#include
+#include
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+extern const int COP_BAD_DAG_REQUEST;
+} // namespace ErrorCodes
+
+namespace DM
+{
+
+namespace cop
+{
+
+ColumnID getColumnIDForColumnExpr(const tipb::Expr & expr, const std::vector & input_col_ids)
+{
+ assert(isColumnExpr(expr));
+ auto column_index = decodeDAGInt64(expr.val());
+ if (column_index < 0 || column_index >= static_cast(input_col_ids.size()))
+ {
+ throw Exception("Column index out of bound: " + DB::toString(column_index) + ", should in [0," + DB::toString(input_col_ids.size())
+ + ")",
+ ErrorCodes::COP_BAD_DAG_REQUEST);
+ }
+ return input_col_ids[column_index];
+}
+
+inline RSOperatorPtr parseTiCompareExpr( //
+ const tipb::Expr & expr,
+ const FilterParser::RSFilterType filter_type,
+ const std::vector & input_col_ids,
+ const FilterParser::AttrCreatorByColumnID & creator,
+ Poco::Logger * /* log */)
+{
+ if (unlikely(expr.children_size() != 2))
+ return createUnsupported(expr.DebugString(),
+ tipb::ScalarFuncSig_Name(expr.sig()) + " with " + DB::toString(expr.children_size())
+ + " children is not supported",
+ false);
+
+ /// Only support `column` `op` `constant` now.
+
+ Attr attr;
+ Field value;
+ UInt32 state = 0x0;
+ constexpr UInt32 state_has_column = 0x1;
+ constexpr UInt32 state_has_literal = 0x2;
+ constexpr UInt32 state_finish = state_has_column | state_has_literal;
+ for (const auto & child : expr.children())
+ {
+ if (isColumnExpr(child))
+ {
+ state |= state_has_column;
+ ColumnID id = getColumnIDForColumnExpr(child, input_col_ids);
+ attr = creator(id);
+ }
+ else if (isLiteralExpr(child))
+ {
+ state |= state_has_literal;
+ value = decodeLiteral(child);
+ }
+ }
+
+ if (unlikely(state != state_finish))
+ return createUnsupported(
+ expr.DebugString(), tipb::ScalarFuncSig_Name(expr.sig()) + " with state " + DB::toString(state) + " is not supported", false);
+ else
+ {
+ // TODO: null_direction
+ RSOperatorPtr op;
+ switch (filter_type)
+ {
+ case FilterParser::RSFilterType::Equal:
+ op = createEqual(attr, value);
+ break;
+ case FilterParser::RSFilterType::NotEqual:
+ op = createNotEqual(attr, value);
+ break;
+ case FilterParser::RSFilterType::Greater:
+ op = createGreater(attr, value, -1);
+ break;
+ case FilterParser::RSFilterType::GreaterEqual:
+ op = createGreaterEqual(attr, value, -1);
+ break;
+ case FilterParser::RSFilterType::Less:
+ op = createLess(attr, value, -1);
+ break;
+ case FilterParser::RSFilterType::LessEuqal:
+ op = createLessEqual(attr, value, -1);
+ break;
+ default:
+ op = createUnsupported(expr.DebugString(), "Unknown compare type: " + tipb::ExprType_Name(expr.tp()), false);
+ break;
+ }
+ return op;
+ }
+}
+
+RSOperatorPtr parseTiExpr(const tipb::Expr & expr,
+ const std::vector & input_colids,
+ const FilterParser::AttrCreatorByColumnID & creator,
+ Poco::Logger * log)
+{
+ assert(isFunctionExpr(expr));
+
+ RSOperatorPtr op = EMPTY_FILTER;
+ if (unlikely(isAggFunctionExpr(expr)))
+ {
+ op = createUnsupported(expr.DebugString(), "agg function: " + tipb::ExprType_Name(expr.tp()), false);
+ return op;
+ }
+
+
+ if (auto iter = FilterParser::scalar_func_rs_filter_map.find(expr.sig()); iter != FilterParser::scalar_func_rs_filter_map.end())
+ {
+ FilterParser::RSFilterType filter_type = iter->second;
+ switch (filter_type)
+ {
+ case FilterParser::RSFilterType::Not:
+ {
+ if (unlikely(expr.children_size() != 1))
+ op = createUnsupported(expr.DebugString(), "logical not with " + DB::toString(expr.children_size()) + " children", false);
+ else
+ {
+ const auto & child = expr.children(0);
+ if (likely(isFunctionExpr(child)))
+ op = createNot(parseTiExpr(child, input_colids, creator, log));
+ else
+ op = createUnsupported(child.DebugString(), "child of logical not is not function", false);
+ }
+ }
+ break;
+
+ case FilterParser::RSFilterType::And:
+ case FilterParser::RSFilterType::Or:
+ {
+ RSOperators children;
+ for (Int32 i = 0; i < expr.children_size(); ++i)
+ {
+ const auto & child = expr.children(i);
+ if (likely(isFunctionExpr(child)))
+ children.emplace_back(parseTiExpr(child, input_colids, creator, log));
+ else
+ children.emplace_back(createUnsupported(child.DebugString(), "child of logical operator is not function", false));
+ }
+ if (expr.sig() == tipb::ScalarFuncSig::LogicalAnd)
+ op = createAnd(children);
+ else
+ op = createOr(children);
+ }
+ break;
+
+ case FilterParser::RSFilterType::Equal:
+ case FilterParser::RSFilterType::NotEqual:
+ case FilterParser::RSFilterType::Greater:
+ case FilterParser::RSFilterType::GreaterEqual:
+ case FilterParser::RSFilterType::Less:
+ case FilterParser::RSFilterType::LessEuqal:
+ op = parseTiCompareExpr(expr, filter_type, input_colids, creator, log);
+ break;
+
+ case FilterParser::RSFilterType::In:
+ case FilterParser::RSFilterType::NotIn:
+ case FilterParser::RSFilterType::Like:
+ case FilterParser::RSFilterType::NotLike:
+ case FilterParser::RSFilterType::Unsupported:
+ op = createUnsupported(expr.DebugString(), tipb::ScalarFuncSig_Name(expr.sig()) + " is not supported", false);
+ break;
+ }
+ }
+ else
+ {
+ op = createUnsupported(expr.DebugString(), tipb::ScalarFuncSig_Name(expr.sig()) + " is not supported", false);
+ }
+
+ return op;
+}
+
+} // namespace cop
+
+
+RSOperatorPtr FilterParser::parseDAGQuery(const DAGQueryInfo & dag_info, FilterParser::AttrCreatorByColumnID && creator, Poco::Logger * log)
+{
+ RSOperatorPtr op = EMPTY_FILTER;
+ if (!dag_info.dag.hasSelection() || dag_info.dag.getSelection().conditions_size() == 0)
+ return op;
+
+ std::vector column_ids;
+ for (const tipb::ColumnInfo & col : dag_info.dag.getTS().columns())
+ {
+ ColumnID cid = col.column_id();
+ if (cid == -1)
+ // Column ID -1 means TiDB expects no specific column, mostly it is for cases like `select count(*)`.
+ continue;
+
+ column_ids.emplace_back(cid);
+ }
+
+ const auto & selection = dag_info.dag.getSelection();
+ if (selection.conditions_size() == 1)
+ op = cop::parseTiExpr(selection.conditions(0), column_ids, creator, log);
+ else
+ {
+ /// By default, multiple conditions with operator "and"
+ RSOperators children;
+ for (Int32 i = 0; i < selection.conditions_size(); ++i)
+ {
+ const auto & child = selection.conditions(i);
+ if (isFunctionExpr(child))
+ children.emplace_back(cop::parseTiExpr(child, column_ids, creator, log));
+ else
+ children.emplace_back(createUnsupported(child.DebugString(), "child of logical and is not function", false));
+ }
+ op = createAnd(children);
+ }
+ return op;
+}
+
+std::unordered_map FilterParser::scalar_func_rs_filter_map{
+ /*
+ {tipb::ScalarFuncSig::CastIntAsInt, "cast"},
+ {tipb::ScalarFuncSig::CastIntAsReal, "cast"},
+ {tipb::ScalarFuncSig::CastIntAsString, "cast"},
+ {tipb::ScalarFuncSig::CastIntAsDecimal, "cast"},
+ {tipb::ScalarFuncSig::CastIntAsTime, "cast"},
+ {tipb::ScalarFuncSig::CastIntAsDuration, "cast"},
+ {tipb::ScalarFuncSig::CastIntAsJson, "cast"},
+
+ {tipb::ScalarFuncSig::CastRealAsInt, "cast"},
+ {tipb::ScalarFuncSig::CastRealAsReal, "cast"},
+ {tipb::ScalarFuncSig::CastRealAsString, "cast"},
+ {tipb::ScalarFuncSig::CastRealAsDecimal, "cast"},
+ {tipb::ScalarFuncSig::CastRealAsTime, "cast"},
+ {tipb::ScalarFuncSig::CastRealAsDuration, "cast"},
+ {tipb::ScalarFuncSig::CastRealAsJson, "cast"},
+
+ {tipb::ScalarFuncSig::CastDecimalAsInt, "cast"},
+ {tipb::ScalarFuncSig::CastDecimalAsReal, "cast"},
+ {tipb::ScalarFuncSig::CastDecimalAsString, "cast"},
+ {tipb::ScalarFuncSig::CastDecimalAsDecimal, "cast"},
+ {tipb::ScalarFuncSig::CastDecimalAsTime, "cast"},
+ {tipb::ScalarFuncSig::CastDecimalAsDuration, "cast"},
+ {tipb::ScalarFuncSig::CastDecimalAsJson, "cast"},
+
+ {tipb::ScalarFuncSig::CastStringAsInt, "cast"},
+ {tipb::ScalarFuncSig::CastStringAsReal, "cast"},
+ {tipb::ScalarFuncSig::CastStringAsString, "cast"},
+ {tipb::ScalarFuncSig::CastStringAsDecimal, "cast"},
+ {tipb::ScalarFuncSig::CastStringAsTime, "cast"},
+ {tipb::ScalarFuncSig::CastStringAsDuration, "cast"},
+ {tipb::ScalarFuncSig::CastStringAsJson, "cast"},
+
+ {tipb::ScalarFuncSig::CastTimeAsInt, "cast"},
+ {tipb::ScalarFuncSig::CastTimeAsReal, "cast"},
+ {tipb::ScalarFuncSig::CastTimeAsString, "cast"},
+ {tipb::ScalarFuncSig::CastTimeAsDecimal, "cast"},
+ {tipb::ScalarFuncSig::CastTimeAsTime, "cast"},
+ {tipb::ScalarFuncSig::CastTimeAsDuration, "cast"},
+ {tipb::ScalarFuncSig::CastTimeAsJson, "cast"},
+
+ {tipb::ScalarFuncSig::CastDurationAsInt, "cast"},
+ {tipb::ScalarFuncSig::CastDurationAsReal, "cast"},
+ {tipb::ScalarFuncSig::CastDurationAsString, "cast"},
+ {tipb::ScalarFuncSig::CastDurationAsDecimal, "cast"},
+ {tipb::ScalarFuncSig::CastDurationAsTime, "cast"},
+ {tipb::ScalarFuncSig::CastDurationAsDuration, "cast"},
+ {tipb::ScalarFuncSig::CastDurationAsJson, "cast"},
+
+ {tipb::ScalarFuncSig::CastJsonAsInt, "cast"},
+ {tipb::ScalarFuncSig::CastJsonAsReal, "cast"},
+ {tipb::ScalarFuncSig::CastJsonAsString, "cast"},
+ {tipb::ScalarFuncSig::CastJsonAsDecimal, "cast"},
+ {tipb::ScalarFuncSig::CastJsonAsTime, "cast"},
+ {tipb::ScalarFuncSig::CastJsonAsDuration, "cast"},
+ {tipb::ScalarFuncSig::CastJsonAsJson, "cast"},
+
+ {tipb::ScalarFuncSig::CoalesceInt, "coalesce"},
+ {tipb::ScalarFuncSig::CoalesceReal, "coalesce"},
+ {tipb::ScalarFuncSig::CoalesceString, "coalesce"},
+ {tipb::ScalarFuncSig::CoalesceDecimal, "coalesce"},
+ {tipb::ScalarFuncSig::CoalesceTime, "coalesce"},
+ {tipb::ScalarFuncSig::CoalesceDuration, "coalesce"},
+ {tipb::ScalarFuncSig::CoalesceJson, "coalesce"},
+ */
+
+ {tipb::ScalarFuncSig::LTInt, FilterParser::RSFilterType::Less},
+ {tipb::ScalarFuncSig::LTReal, FilterParser::RSFilterType::Less},
+ {tipb::ScalarFuncSig::LTString, FilterParser::RSFilterType::Less},
+ {tipb::ScalarFuncSig::LTDecimal, FilterParser::RSFilterType::Less},
+ {tipb::ScalarFuncSig::LTTime, FilterParser::RSFilterType::Less},
+ {tipb::ScalarFuncSig::LTDuration, FilterParser::RSFilterType::Less},
+ {tipb::ScalarFuncSig::LTJson, FilterParser::RSFilterType::Less},
+
+ {tipb::ScalarFuncSig::LEInt, FilterParser::RSFilterType::LessEuqal},
+ {tipb::ScalarFuncSig::LEReal, FilterParser::RSFilterType::LessEuqal},
+ {tipb::ScalarFuncSig::LEString, FilterParser::RSFilterType::LessEuqal},
+ {tipb::ScalarFuncSig::LEDecimal, FilterParser::RSFilterType::LessEuqal},
+ {tipb::ScalarFuncSig::LETime, FilterParser::RSFilterType::LessEuqal},
+ {tipb::ScalarFuncSig::LEDuration, FilterParser::RSFilterType::LessEuqal},
+ {tipb::ScalarFuncSig::LEJson, FilterParser::RSFilterType::LessEuqal},
+
+ {tipb::ScalarFuncSig::GTInt, FilterParser::RSFilterType::Greater},
+ {tipb::ScalarFuncSig::GTReal, FilterParser::RSFilterType::Greater},
+ {tipb::ScalarFuncSig::GTString, FilterParser::RSFilterType::Greater},
+ {tipb::ScalarFuncSig::GTDecimal, FilterParser::RSFilterType::Greater},
+ {tipb::ScalarFuncSig::GTTime, FilterParser::RSFilterType::Greater},
+ {tipb::ScalarFuncSig::GTDuration, FilterParser::RSFilterType::Greater},
+ {tipb::ScalarFuncSig::GTJson, FilterParser::RSFilterType::Greater},
+
+ // {tipb::ScalarFuncSig::GreatestInt, "greatest"},
+ // {tipb::ScalarFuncSig::GreatestReal, "greatest"},
+ // {tipb::ScalarFuncSig::GreatestString, "greatest"},
+ // {tipb::ScalarFuncSig::GreatestDecimal, "greatest"},
+ // {tipb::ScalarFuncSig::GreatestTime, "greatest"},
+
+ // {tipb::ScalarFuncSig::LeastInt, "least"},
+ // {tipb::ScalarFuncSig::LeastReal, "least"},
+ // {tipb::ScalarFuncSig::LeastString, "least"},
+ // {tipb::ScalarFuncSig::LeastDecimal, "least"},
+ // {tipb::ScalarFuncSig::LeastTime, "least"},
+
+ //{tipb::ScalarFuncSig::IntervalInt, "cast"},
+ //{tipb::ScalarFuncSig::IntervalReal, "cast"},
+
+ {tipb::ScalarFuncSig::GEInt, FilterParser::RSFilterType::GreaterEqual},
+ {tipb::ScalarFuncSig::GEReal, FilterParser::RSFilterType::GreaterEqual},
+ {tipb::ScalarFuncSig::GEString, FilterParser::RSFilterType::GreaterEqual},
+ {tipb::ScalarFuncSig::GEDecimal, FilterParser::RSFilterType::GreaterEqual},
+ {tipb::ScalarFuncSig::GETime, FilterParser::RSFilterType::GreaterEqual},
+ {tipb::ScalarFuncSig::GEDuration, FilterParser::RSFilterType::GreaterEqual},
+ {tipb::ScalarFuncSig::GEJson, FilterParser::RSFilterType::GreaterEqual},
+
+ {tipb::ScalarFuncSig::EQInt, FilterParser::RSFilterType::Equal},
+ {tipb::ScalarFuncSig::EQReal, FilterParser::RSFilterType::Equal},
+ {tipb::ScalarFuncSig::EQString, FilterParser::RSFilterType::Equal},
+ {tipb::ScalarFuncSig::EQDecimal, FilterParser::RSFilterType::Equal},
+ {tipb::ScalarFuncSig::EQTime, FilterParser::RSFilterType::Equal},
+ {tipb::ScalarFuncSig::EQDuration, FilterParser::RSFilterType::Equal},
+ {tipb::ScalarFuncSig::EQJson, FilterParser::RSFilterType::Equal},
+
+ {tipb::ScalarFuncSig::NEInt, FilterParser::RSFilterType::NotEqual},
+ {tipb::ScalarFuncSig::NEReal, FilterParser::RSFilterType::NotEqual},
+ {tipb::ScalarFuncSig::NEString, FilterParser::RSFilterType::NotEqual},
+ {tipb::ScalarFuncSig::NEDecimal, FilterParser::RSFilterType::NotEqual},
+ {tipb::ScalarFuncSig::NETime, FilterParser::RSFilterType::NotEqual},
+ {tipb::ScalarFuncSig::NEDuration, FilterParser::RSFilterType::NotEqual},
+ {tipb::ScalarFuncSig::NEJson, FilterParser::RSFilterType::NotEqual},
+
+ //{tipb::ScalarFuncSig::NullEQInt, "cast"},
+ //{tipb::ScalarFuncSig::NullEQReal, "cast"},
+ //{tipb::ScalarFuncSig::NullEQString, "cast"},
+ //{tipb::ScalarFuncSig::NullEQDecimal, "cast"},
+ //{tipb::ScalarFuncSig::NullEQTime, "cast"},
+ //{tipb::ScalarFuncSig::NullEQDuration, "cast"},
+ //{tipb::ScalarFuncSig::NullEQJson, "cast"},
+
+ // {tipb::ScalarFuncSig::PlusReal, "plus"},
+ // {tipb::ScalarFuncSig::PlusDecimal, "plus"},
+ // {tipb::ScalarFuncSig::PlusInt, "plus"},
+
+ // {tipb::ScalarFuncSig::MinusReal, "minus"},
+ // {tipb::ScalarFuncSig::MinusDecimal, "minus"},
+ // {tipb::ScalarFuncSig::MinusInt, "minus"},
+
+ // {tipb::ScalarFuncSig::MultiplyReal, "multiply"},
+ // {tipb::ScalarFuncSig::MultiplyDecimal, "multiply"},
+ // {tipb::ScalarFuncSig::MultiplyInt, "multiply"},
+
+ // {tipb::ScalarFuncSig::DivideReal, "divide"},
+ // {tipb::ScalarFuncSig::DivideDecimal, "divide"},
+ // {tipb::ScalarFuncSig::IntDivideInt, "intDiv"},
+ // {tipb::ScalarFuncSig::IntDivideDecimal, "divide"},
+
+ // {tipb::ScalarFuncSig::ModReal, "modulo"},
+ // {tipb::ScalarFuncSig::ModDecimal, "modulo"},
+ // {tipb::ScalarFuncSig::ModInt, "modulo"},
+
+ // {tipb::ScalarFuncSig::MultiplyIntUnsigned, "multiply"},
+
+ // {tipb::ScalarFuncSig::AbsInt, "abs"},
+ // {tipb::ScalarFuncSig::AbsUInt, "abs"},
+ // {tipb::ScalarFuncSig::AbsReal, "abs"},
+ // {tipb::ScalarFuncSig::AbsDecimal, "abs"},
+
+ // {tipb::ScalarFuncSig::CeilIntToDec, "ceil"},
+ // {tipb::ScalarFuncSig::CeilIntToInt, "ceil"},
+ // {tipb::ScalarFuncSig::CeilDecToInt, "ceil"},
+ // {tipb::ScalarFuncSig::CeilDecToDec, "ceil"},
+ // {tipb::ScalarFuncSig::CeilReal, "ceil"},
+
+ // {tipb::ScalarFuncSig::FloorIntToDec, "floor"},
+ // {tipb::ScalarFuncSig::FloorIntToInt, "floor"},
+ // {tipb::ScalarFuncSig::FloorDecToInt, "floor"},
+ // {tipb::ScalarFuncSig::FloorDecToDec, "floor"},
+ // {tipb::ScalarFuncSig::FloorReal, "floor"},
+
+ //{tipb::ScalarFuncSig::RoundReal, "round"},
+ //{tipb::ScalarFuncSig::RoundInt, "round"},
+ //{tipb::ScalarFuncSig::RoundDec, "round"},
+ //{tipb::ScalarFuncSig::RoundWithFracReal, "cast"},
+ //{tipb::ScalarFuncSig::RoundWithFracInt, "cast"},
+ //{tipb::ScalarFuncSig::RoundWithFracDec, "cast"},
+
+ //{tipb::ScalarFuncSig::Log1Arg, "log"},
+ //{tipb::ScalarFuncSig::Log2Args, "cast"},
+ //{tipb::ScalarFuncSig::Log2, "log2"},
+ //{tipb::ScalarFuncSig::Log10, "log10"},
+
+ //{tipb::ScalarFuncSig::Rand, "rand"},
+ //{tipb::ScalarFuncSig::RandWithSeed, "cast"},
+
+ //{tipb::ScalarFuncSig::Pow, "pow"},
+ //{tipb::ScalarFuncSig::Conv, "cast"},
+ //{tipb::ScalarFuncSig::CRC32, "cast"},
+ //{tipb::ScalarFuncSig::Sign, "cast"},
+
+ //{tipb::ScalarFuncSig::Sqrt, "sqrt"},
+ //{tipb::ScalarFuncSig::Acos, "acos"},
+ //{tipb::ScalarFuncSig::Asin, "asin"},
+ //{tipb::ScalarFuncSig::Atan1Arg, "atan"},
+ //{tipb::ScalarFuncSig::Atan2Args, "cast"},
+ //{tipb::ScalarFuncSig::Cos, "cos"},
+ //{tipb::ScalarFuncSig::Cot, "cast"},
+ //{tipb::ScalarFuncSig::Degrees, "cast"},
+ //{tipb::ScalarFuncSig::Exp, "exp"},
+ //{tipb::ScalarFuncSig::PI, "cast"},
+ //{tipb::ScalarFuncSig::Radians, "cast"},
+ // {tipb::ScalarFuncSig::Sin, "sin"},
+ // {tipb::ScalarFuncSig::Tan, "tan"},
+ // {tipb::ScalarFuncSig::TruncateInt, "trunc"},
+ // {tipb::ScalarFuncSig::TruncateReal, "trunc"},
+ //{tipb::ScalarFuncSig::TruncateDecimal, "cast"},
+
+ {tipb::ScalarFuncSig::LogicalAnd, FilterParser::RSFilterType::And},
+ {tipb::ScalarFuncSig::LogicalOr, FilterParser::RSFilterType::Or},
+ // {tipb::ScalarFuncSig::LogicalXor, "xor"},
+ {tipb::ScalarFuncSig::UnaryNotDecimal, FilterParser::RSFilterType::Not},
+ {tipb::ScalarFuncSig::UnaryNotInt, FilterParser::RSFilterType::Not},
+ {tipb::ScalarFuncSig::UnaryNotReal, FilterParser::RSFilterType::Not},
+
+ // {tipb::ScalarFuncSig::UnaryMinusInt, "negate"},
+ // {tipb::ScalarFuncSig::UnaryMinusReal, "negate"},
+ // {tipb::ScalarFuncSig::UnaryMinusDecimal, "negate"},
+
+ // {tipb::ScalarFuncSig::DecimalIsNull, "isNull"},
+ // {tipb::ScalarFuncSig::DurationIsNull, "isNull"},
+ // {tipb::ScalarFuncSig::RealIsNull, "isNull"},
+ // {tipb::ScalarFuncSig::StringIsNull, "isNull"},
+ // {tipb::ScalarFuncSig::TimeIsNull, "isNull"},
+ // {tipb::ScalarFuncSig::IntIsNull, "isNull"},
+ // {tipb::ScalarFuncSig::JsonIsNull, "isNull"},
+
+ //{tipb::ScalarFuncSig::BitAndSig, "cast"},
+ //{tipb::ScalarFuncSig::BitOrSig, "cast"},
+ //{tipb::ScalarFuncSig::BitXorSig, "cast"},
+ //{tipb::ScalarFuncSig::BitNegSig, "cast"},
+ //{tipb::ScalarFuncSig::IntIsTrue, "cast"},
+ //{tipb::ScalarFuncSig::RealIsTrue, "cast"},
+ //{tipb::ScalarFuncSig::DecimalIsTrue, "cast"},
+ //{tipb::ScalarFuncSig::IntIsFalse, "cast"},
+ //{tipb::ScalarFuncSig::RealIsFalse, "cast"},
+ //{tipb::ScalarFuncSig::DecimalIsFalse, "cast"},
+
+ //{tipb::ScalarFuncSig::LeftShift, "cast"},
+ //{tipb::ScalarFuncSig::RightShift, "cast"},
+
+ //{tipb::ScalarFuncSig::BitCount, "cast"},
+ //{tipb::ScalarFuncSig::GetParamString, "cast"},
+ //{tipb::ScalarFuncSig::GetVar, "cast"},
+ //{tipb::ScalarFuncSig::RowSig, "cast"},
+ //{tipb::ScalarFuncSig::SetVar, "cast"},
+ //{tipb::ScalarFuncSig::ValuesDecimal, "cast"},
+ //{tipb::ScalarFuncSig::ValuesDuration, "cast"},
+ //{tipb::ScalarFuncSig::ValuesInt, "cast"},
+ //{tipb::ScalarFuncSig::ValuesJSON, "cast"},
+ //{tipb::ScalarFuncSig::ValuesReal, "cast"},
+ //{tipb::ScalarFuncSig::ValuesString, "cast"},
+ //{tipb::ScalarFuncSig::ValuesTime, "cast"},
+
+ // {tipb::ScalarFuncSig::InInt, "in"},
+ // {tipb::ScalarFuncSig::InReal, "in"},
+ // {tipb::ScalarFuncSig::InString, "in"},
+ // {tipb::ScalarFuncSig::InDecimal, "in"},
+ // {tipb::ScalarFuncSig::InTime, "in"},
+ // {tipb::ScalarFuncSig::InDuration, "in"},
+ // {tipb::ScalarFuncSig::InJson, "in"},
+
+ // {tipb::ScalarFuncSig::IfNullInt, "ifNull"},
+ // {tipb::ScalarFuncSig::IfNullReal, "ifNull"},
+ // {tipb::ScalarFuncSig::IfNullString, "ifNull"},
+ // {tipb::ScalarFuncSig::IfNullDecimal, "ifNull"},
+ // {tipb::ScalarFuncSig::IfNullTime, "ifNull"},
+ // {tipb::ScalarFuncSig::IfNullDuration, "ifNull"},
+ // {tipb::ScalarFuncSig::IfNullJson, "ifNull"},
+
+ // {tipb::ScalarFuncSig::IfInt, "if"},
+ // {tipb::ScalarFuncSig::IfReal, "if"},
+ // {tipb::ScalarFuncSig::IfString, "if"},
+ // {tipb::ScalarFuncSig::IfDecimal, "if"},
+ // {tipb::ScalarFuncSig::IfTime, "if"},
+ // {tipb::ScalarFuncSig::IfDuration, "if"},
+ // {tipb::ScalarFuncSig::IfJson, "if"},
+
+ //todo need further check for caseWithExpression and multiIf
+ //{tipb::ScalarFuncSig::CaseWhenInt, "caseWithExpression"},
+ //{tipb::ScalarFuncSig::CaseWhenReal, "caseWithExpression"},
+ //{tipb::ScalarFuncSig::CaseWhenString, "caseWithExpression"},
+ //{tipb::ScalarFuncSig::CaseWhenDecimal, "caseWithExpression"},
+ //{tipb::ScalarFuncSig::CaseWhenTime, "caseWithExpression"},
+ //{tipb::ScalarFuncSig::CaseWhenDuration, "caseWithExpression"},
+ //{tipb::ScalarFuncSig::CaseWhenJson, "caseWithExpression"},
+
+ //{tipb::ScalarFuncSig::AesDecrypt, "cast"},
+ //{tipb::ScalarFuncSig::AesEncrypt, "cast"},
+ //{tipb::ScalarFuncSig::Compress, "cast"},
+ //{tipb::ScalarFuncSig::MD5, "cast"},
+ //{tipb::ScalarFuncSig::Password, "cast"},
+ //{tipb::ScalarFuncSig::RandomBytes, "cast"},
+ //{tipb::ScalarFuncSig::SHA1, "cast"},
+ //{tipb::ScalarFuncSig::SHA2, "cast"},
+ //{tipb::ScalarFuncSig::Uncompress, "cast"},
+ //{tipb::ScalarFuncSig::UncompressedLength, "cast"},
+
+ //{tipb::ScalarFuncSig::Database, "cast"},
+ //{tipb::ScalarFuncSig::FoundRows, "cast"},
+ //{tipb::ScalarFuncSig::CurrentUser, "cast"},
+ //{tipb::ScalarFuncSig::User, "cast"},
+ //{tipb::ScalarFuncSig::ConnectionID, "cast"},
+ //{tipb::ScalarFuncSig::LastInsertID, "cast"},
+ //{tipb::ScalarFuncSig::LastInsertIDWithID, "cast"},
+ //{tipb::ScalarFuncSig::Version, "cast"},
+ //{tipb::ScalarFuncSig::TiDBVersion, "cast"},
+ //{tipb::ScalarFuncSig::RowCount, "cast"},
+
+ //{tipb::ScalarFuncSig::Sleep, "cast"},
+ //{tipb::ScalarFuncSig::Lock, "cast"},
+ //{tipb::ScalarFuncSig::ReleaseLock, "cast"},
+ //{tipb::ScalarFuncSig::DecimalAnyValue, "cast"},
+ //{tipb::ScalarFuncSig::DurationAnyValue, "cast"},
+ //{tipb::ScalarFuncSig::IntAnyValue, "cast"},
+ //{tipb::ScalarFuncSig::JSONAnyValue, "cast"},
+ //{tipb::ScalarFuncSig::RealAnyValue, "cast"},
+ //{tipb::ScalarFuncSig::StringAnyValue, "cast"},
+ //{tipb::ScalarFuncSig::TimeAnyValue, "cast"},
+ //{tipb::ScalarFuncSig::InetAton, "cast"},
+ //{tipb::ScalarFuncSig::InetNtoa, "cast"},
+ //{tipb::ScalarFuncSig::Inet6Aton, "cast"},
+ //{tipb::ScalarFuncSig::Inet6Ntoa, "cast"},
+ //{tipb::ScalarFuncSig::IsIPv4, "cast"},
+ //{tipb::ScalarFuncSig::IsIPv4Compat, "cast"},
+ //{tipb::ScalarFuncSig::IsIPv4Mapped, "cast"},
+ //{tipb::ScalarFuncSig::IsIPv6, "cast"},
+ //{tipb::ScalarFuncSig::UUID, "cast"},
+
+ // {tipb::ScalarFuncSig::LikeSig, "like3Args"},
+ //{tipb::ScalarFuncSig::RegexpBinarySig, "cast"},
+ //{tipb::ScalarFuncSig::RegexpSig, "cast"},
+
+ //{tipb::ScalarFuncSig::JsonExtractSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonUnquoteSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonTypeSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonSetSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonInsertSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonReplaceSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonRemoveSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonMergeSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonObjectSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonArraySig, "cast"},
+ //{tipb::ScalarFuncSig::JsonValidJsonSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonContainsSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonArrayAppendSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonArrayInsertSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonMergePatchSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonMergePreserveSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonContainsPathSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonPrettySig, "cast"},
+ //{tipb::ScalarFuncSig::JsonQuoteSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonSearchSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonStorageSizeSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonDepthSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonKeysSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonLengthSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonKeys2ArgsSig, "cast"},
+ //{tipb::ScalarFuncSig::JsonValidStringSig, "cast"},
+
+ //{tipb::ScalarFuncSig::DateFormatSig, "cast"},
+ //{tipb::ScalarFuncSig::DateLiteral, "cast"},
+ //{tipb::ScalarFuncSig::DateDiff, "cast"},
+ //{tipb::ScalarFuncSig::NullTimeDiff, "cast"},
+ //{tipb::ScalarFuncSig::TimeStringTimeDiff, "cast"},
+ //{tipb::ScalarFuncSig::DurationDurationTimeDiff, "cast"},
+ //{tipb::ScalarFuncSig::DurationDurationTimeDiff, "cast"},
+ //{tipb::ScalarFuncSig::StringTimeTimeDiff, "cast"},
+ //{tipb::ScalarFuncSig::StringDurationTimeDiff, "cast"},
+ //{tipb::ScalarFuncSig::StringStringTimeDiff, "cast"},
+ //{tipb::ScalarFuncSig::TimeTimeTimeDiff, "cast"},
+
+ //{tipb::ScalarFuncSig::Date, "cast"},
+ //{tipb::ScalarFuncSig::Hour, "cast"},
+ //{tipb::ScalarFuncSig::Minute, "cast"},
+ //{tipb::ScalarFuncSig::Second, "cast"},
+ //{tipb::ScalarFuncSig::MicroSecond, "cast"},
+ //{tipb::ScalarFuncSig::Month, "cast"},
+ //{tipb::ScalarFuncSig::MonthName, "cast"},
+
+ //{tipb::ScalarFuncSig::NowWithArg, "cast"},
+ //{tipb::ScalarFuncSig::NowWithoutArg, "cast"},
+
+ //{tipb::ScalarFuncSig::DayName, "cast"},
+ //{tipb::ScalarFuncSig::DayOfMonth, "cast"},
+ //{tipb::ScalarFuncSig::DayOfWeek, "cast"},
+ //{tipb::ScalarFuncSig::DayOfYear, "cast"},
+
+ //{tipb::ScalarFuncSig::WeekWithMode, "cast"},
+ //{tipb::ScalarFuncSig::WeekWithoutMode, "cast"},
+ //{tipb::ScalarFuncSig::WeekDay, "cast"},
+ //{tipb::ScalarFuncSig::WeekOfYear, "cast"},
+
+ //{tipb::ScalarFuncSig::Year, "cast"},
+ //{tipb::ScalarFuncSig::YearWeekWithMode, "cast"},
+ //{tipb::ScalarFuncSig::YearWeekWithoutMode, "cast"},
+
+ //{tipb::ScalarFuncSig::GetFormat, "cast"},
+ //{tipb::ScalarFuncSig::SysDateWithFsp, "cast"},
+ //{tipb::ScalarFuncSig::SysDateWithoutFsp, "cast"},
+ //{tipb::ScalarFuncSig::CurrentDate, "cast"},
+ //{tipb::ScalarFuncSig::CurrentTime0Arg, "cast"},
+ //{tipb::ScalarFuncSig::CurrentTime1Arg, "cast"},
+
+ //{tipb::ScalarFuncSig::Time, "cast"},
+ //{tipb::ScalarFuncSig::TimeLiteral, "cast"},
+ //{tipb::ScalarFuncSig::UTCDate, "cast"},
+ //{tipb::ScalarFuncSig::UTCTimestampWithArg, "cast"},
+ //{tipb::ScalarFuncSig::UTCTimestampWithoutArg, "cast"},
+
+ //{tipb::ScalarFuncSig::AddDatetimeAndDuration, "cast"},
+ //{tipb::ScalarFuncSig::AddDatetimeAndString, "cast"},
+ //{tipb::ScalarFuncSig::AddTimeDateTimeNull, "cast"},
+ //{tipb::ScalarFuncSig::AddStringAndDuration, "cast"},
+ //{tipb::ScalarFuncSig::AddStringAndString, "cast"},
+ //{tipb::ScalarFuncSig::AddTimeStringNull, "cast"},
+ //{tipb::ScalarFuncSig::AddDurationAndDuration, "cast"},
+ //{tipb::ScalarFuncSig::AddDurationAndString, "cast"},
+ //{tipb::ScalarFuncSig::AddTimeDurationNull, "cast"},
+ //{tipb::ScalarFuncSig::AddDateAndDuration, "cast"},
+ //{tipb::ScalarFuncSig::AddDateAndString, "cast"},
+
+ //{tipb::ScalarFuncSig::SubDateAndDuration, "cast"},
+ //{tipb::ScalarFuncSig::SubDateAndString, "cast"},
+ //{tipb::ScalarFuncSig::SubTimeDateTimeNull, "cast"},
+ //{tipb::ScalarFuncSig::SubStringAndDuration, "cast"},
+ //{tipb::ScalarFuncSig::SubStringAndString, "cast"},
+ //{tipb::ScalarFuncSig::SubTimeStringNull, "cast"},
+ //{tipb::ScalarFuncSig::SubDurationAndDuration, "cast"},
+ //{tipb::ScalarFuncSig::SubDurationAndString, "cast"},
+ //{tipb::ScalarFuncSig::SubDateAndDuration, "cast"},
+ //{tipb::ScalarFuncSig::SubDateAndString, "cast"},
+
+ //{tipb::ScalarFuncSig::UnixTimestampCurrent, "cast"},
+ //{tipb::ScalarFuncSig::UnixTimestampInt, "cast"},
+ //{tipb::ScalarFuncSig::UnixTimestampDec, "cast"},
+
+ //{tipb::ScalarFuncSig::ConvertTz, "cast"},
+ //{tipb::ScalarFuncSig::MakeDate, "cast"},
+ //{tipb::ScalarFuncSig::MakeTime, "cast"},
+ //{tipb::ScalarFuncSig::PeriodAdd, "cast"},
+ //{tipb::ScalarFuncSig::PeriodDiff, "cast"},
+ //{tipb::ScalarFuncSig::Quarter, "cast"},
+
+ //{tipb::ScalarFuncSig::SecToTime, "cast"},
+ //{tipb::ScalarFuncSig::TimeToSec, "cast"},
+ //{tipb::ScalarFuncSig::TimestampAdd, "cast"},
+ //{tipb::ScalarFuncSig::ToDays, "cast"},
+ //{tipb::ScalarFuncSig::ToSeconds, "cast"},
+ //{tipb::ScalarFuncSig::UTCTimeWithArg, "cast"},
+ //{tipb::ScalarFuncSig::UTCTimestampWithoutArg, "cast"},
+ //{tipb::ScalarFuncSig::Timestamp1Arg, "cast"},
+ //{tipb::ScalarFuncSig::Timestamp2Args, "cast"},
+ //{tipb::ScalarFuncSig::TimestampLiteral, "cast"},
+
+ //{tipb::ScalarFuncSig::LastDay, "cast"},
+ //{tipb::ScalarFuncSig::StrToDateDate, "cast"},
+ //{tipb::ScalarFuncSig::StrToDateDatetime, "cast"},
+ //{tipb::ScalarFuncSig::StrToDateDuration, "cast"},
+ //{tipb::ScalarFuncSig::FromUnixTime1Arg, "cast"},
+ //{tipb::ScalarFuncSig::FromUnixTime2Arg, "cast"},
+ //{tipb::ScalarFuncSig::ExtractDatetime, "cast"},
+ //{tipb::ScalarFuncSig::ExtractDuration, "cast"},
+
+ //{tipb::ScalarFuncSig::AddDateStringString, "cast"},
+ //{tipb::ScalarFuncSig::AddDateStringInt, "cast"},
+ //{tipb::ScalarFuncSig::AddDateStringDecimal, "cast"},
+ //{tipb::ScalarFuncSig::AddDateIntString, "cast"},
+ //{tipb::ScalarFuncSig::AddDateIntInt, "cast"},
+ //{tipb::ScalarFuncSig::AddDateDatetimeString, "cast"},
+ //{tipb::ScalarFuncSig::AddDateDatetimeInt, "cast"},
+
+ //{tipb::ScalarFuncSig::SubDateStringString, "cast"},
+ //{tipb::ScalarFuncSig::SubDateStringInt, "cast"},
+ //{tipb::ScalarFuncSig::SubDateStringDecimal, "cast"},
+ //{tipb::ScalarFuncSig::SubDateIntString, "cast"},
+ //{tipb::ScalarFuncSig::SubDateIntInt, "cast"},
+ //{tipb::ScalarFuncSig::SubDateDatetimeString, "cast"},
+ //{tipb::ScalarFuncSig::SubDateDatetimeInt, "cast"},
+
+ //{tipb::ScalarFuncSig::FromDays, "cast"},
+ //{tipb::ScalarFuncSig::TimeFormat, "cast"},
+ //{tipb::ScalarFuncSig::TimestampDiff, "cast"},
+
+ //{tipb::ScalarFuncSig::BitLength, "cast"},
+ //{tipb::ScalarFuncSig::Bin, "cast"},
+ //{tipb::ScalarFuncSig::ASCII, "cast"},
+ //{tipb::ScalarFuncSig::Char, "cast"},
+ // {tipb::ScalarFuncSig::CharLength, "lengthUTF8"},
+ //{tipb::ScalarFuncSig::Concat, "cast"},
+ //{tipb::ScalarFuncSig::ConcatWS, "cast"},
+ //{tipb::ScalarFuncSig::Convert, "cast"},
+ //{tipb::ScalarFuncSig::Elt, "cast"},
+ //{tipb::ScalarFuncSig::ExportSet3Arg, "cast"},
+ //{tipb::ScalarFuncSig::ExportSet4Arg, "cast"},
+ //{tipb::ScalarFuncSig::ExportSet5Arg, "cast"},
+ //{tipb::ScalarFuncSig::FieldInt, "cast"},
+ //{tipb::ScalarFuncSig::FieldReal, "cast"},
+ //{tipb::ScalarFuncSig::FieldString, "cast"},
+
+ //{tipb::ScalarFuncSig::FindInSet, "cast"},
+ //{tipb::ScalarFuncSig::Format, "cast"},
+ //{tipb::ScalarFuncSig::FormatWithLocale, "cast"},
+ //{tipb::ScalarFuncSig::FromBase64, "cast"},
+ //{tipb::ScalarFuncSig::HexIntArg, "cast"},
+ //{tipb::ScalarFuncSig::HexStrArg, "cast"},
+ //{tipb::ScalarFuncSig::Insert, "cast"},
+ //{tipb::ScalarFuncSig::InsertBinary, "cast"},
+ //{tipb::ScalarFuncSig::Instr, "cast"},
+ //{tipb::ScalarFuncSig::InstrBinary, "cast"},
+
+ // {tipb::ScalarFuncSig::LTrim, "ltrim"},
+ //{tipb::ScalarFuncSig::Left, "cast"},
+ //{tipb::ScalarFuncSig::LeftBinary, "cast"},
+ // {tipb::ScalarFuncSig::Length, "length"},
+ //{tipb::ScalarFuncSig::Locate2Args, "cast"},
+ //{tipb::ScalarFuncSig::Locate3Args, "cast"},
+ //{tipb::ScalarFuncSig::LocateBinary2Args, "cast"},
+ //{tipb::ScalarFuncSig::LocateBinary3Args, "cast"},
+
+ // {tipb::ScalarFuncSig::Lower, "lower"},
+ //{tipb::ScalarFuncSig::Lpad, "cast"},
+ //{tipb::ScalarFuncSig::LpadBinary, "cast"},
+ //{tipb::ScalarFuncSig::MakeSet, "cast"},
+ //{tipb::ScalarFuncSig::OctInt, "cast"},
+ //{tipb::ScalarFuncSig::OctString, "cast"},
+ //{tipb::ScalarFuncSig::Ord, "cast"},
+ //{tipb::ScalarFuncSig::Quote, "cast"},
+ // {tipb::ScalarFuncSig::RTrim, "rtrim"},
+ //{tipb::ScalarFuncSig::Repeat, "cast"},
+ //{tipb::ScalarFuncSig::Replace, "cast"},
+ //{tipb::ScalarFuncSig::Reverse, "cast"},
+ //{tipb::ScalarFuncSig::ReverseBinary, "cast"},
+ //{tipb::ScalarFuncSig::Right, "cast"},
+ //{tipb::ScalarFuncSig::RightBinary, "cast"},
+ //{tipb::ScalarFuncSig::Rpad, "cast"},
+ //{tipb::ScalarFuncSig::RpadBinary, "cast"},
+ //{tipb::ScalarFuncSig::Space, "cast"},
+ //{tipb::ScalarFuncSig::Strcmp, "cast"},
+ //{tipb::ScalarFuncSig::Substring2Args, "cast"},
+ //{tipb::ScalarFuncSig::Substring3Args, "cast"},
+ //{tipb::ScalarFuncSig::SubstringBinary2Args, "cast"},
+ //{tipb::ScalarFuncSig::SubstringBinary3Args, "cast"},
+ //{tipb::ScalarFuncSig::SubstringIndex, "cast"},
+
+ //{tipb::ScalarFuncSig::ToBase64, "cast"},
+ //{tipb::ScalarFuncSig::Trim1Arg, "cast"},
+ //{tipb::ScalarFuncSig::Trim2Args, "cast"},
+ //{tipb::ScalarFuncSig::Trim3Args, "cast"},
+ //{tipb::ScalarFuncSig::UnHex, "cast"},
+ // {tipb::ScalarFuncSig::Upper, "upper"},
+};
+
+} // namespace DM
+
+} // namespace DB
diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMax.h b/dbms/src/Storages/DeltaMerge/Index/MinMax.h
index 23db275bf53..4641aa3242d 100644
--- a/dbms/src/Storages/DeltaMerge/Index/MinMax.h
+++ b/dbms/src/Storages/DeltaMerge/Index/MinMax.h
@@ -1,5 +1,7 @@
#pragma once
+#include
+
#include
#include
#include
@@ -93,6 +95,8 @@ struct MinMaxValue
virtual RSResult checkEqual(const Field & value, const DataTypePtr & type) = 0;
virtual RSResult checkGreater(const Field & value, const DataTypePtr & type) = 0;
virtual RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type) = 0;
+
+ virtual String toString() const = 0;
};
/// Number types.
@@ -168,6 +172,13 @@ struct MinMaxValueFixed : public MinMaxValue
RSResult checkGreater(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreater(value, type, min, max); }
RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreaterEqual(value, type, min, max); }
// clang-format on
+
+ String toString() const override
+ {
+ std::stringstream ss;
+ ss << "{\"type\":\"fixed\",\"min\":\"" << DB::toString(min) << "\",\"max\":\"" << DB::toString(max) << "\"}";
+ return ss.str();
+ }
};
/// String type only.
@@ -244,6 +255,13 @@ struct MinMaxValueString : public MinMaxValue
RSResult checkGreater(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreater(value, type, min, max); }
RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreaterEqual(value, type, min, max); }
// clang-format on
+
+ String toString() const override
+ {
+ std::stringstream ss;
+ ss << "{\"type\":\"string\",\"min\":\"" << min << "\",\"max\":\"" << max << "\"}";
+ return ss.str();
+ }
};
/// Other types.
@@ -314,6 +332,14 @@ struct MinMaxValueDataGeneric : public MinMaxValue
RSResult checkGreater(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreater(value, type, min, max); }
RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreaterEqual(value, type, min, max); }
// clang-format on
+
+ String toString() const override
+ {
+ std::stringstream ss;
+ ss << "{\"type\":\"generic\",\"min\":\"" << applyVisitor(FieldVisitorToString(), min) << "\",\"max\":\""
+ << applyVisitor(FieldVisitorToString(), max) << "\"}";
+ return ss.str();
+ }
};
diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp
index 5ad3fe8d8d2..d3c26a2e3eb 100644
--- a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp
+++ b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp
@@ -114,5 +114,10 @@ RSResult MinMaxIndex::checkGreaterEqual(const Field & value, const DataTypePtr &
return minmax->checkGreaterEqual(value, type);
}
+String MinMaxIndex::toString() const
+{
+ return this->minmax->toString();
+}
+
} // namespace DM
} // namespace DB
\ No newline at end of file
diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h
index 050c294e288..c29d2492a4e 100644
--- a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h
+++ b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h
@@ -41,6 +41,9 @@ class MinMaxIndex
RSResult checkEqual(const Field & value, const DataTypePtr & type);
RSResult checkGreater(const Field & value, const DataTypePtr & type, int nan_direction);
RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type, int nan_direction);
+
+ String toString() const;
+
};
diff --git a/dbms/src/Storages/DeltaMerge/Index/ValueComparison.h b/dbms/src/Storages/DeltaMerge/Index/ValueComparison.h
index 9220e695b55..1214e06b4e3 100644
--- a/dbms/src/Storages/DeltaMerge/Index/ValueComparison.h
+++ b/dbms/src/Storages/DeltaMerge/Index/ValueComparison.h
@@ -268,27 +268,27 @@ struct ValueComparision
if (is_date)
{
- if constexpr (std::is_same_v)
+ if constexpr (std::is_same_v)
{
DayNum_t date;
ReadBufferFromMemory in(left.data(), left.size());
readDateText(date, in);
if (!in.eof())
throw Exception("String is too long for Date: " + left);
- res = Op::apply(date, right);
+ res = Op::apply(date, right);
return true;
}
}
else if (is_date_time)
{
- if constexpr (std::is_same_v)
+ if constexpr (std::is_same_v)
{
time_t date_time;
ReadBufferFromMemory in(left.data(), left.size());
readDateTimeText(date_time, in);
if (!in.eof())
throw Exception("String is too long for DateTime: " + left);
- res = Op::apply(date_time, right);
+ res = Op::apply(date_time, right);
return true;
}
}
@@ -307,7 +307,7 @@ struct ValueComparision
}
else if (is_enum8)
{
- if constexpr (std::is_same_v)
+ if constexpr (std::is_same_v)
{
auto type = static_cast(right_type.get());
auto left_enum_value = type->getValue(left);
@@ -317,7 +317,7 @@ struct ValueComparision
}
else if (is_enum16)
{
- if constexpr (std::is_same_v)
+ if constexpr (std::is_same_v)
{
auto type = static_cast(right_type.get());
auto left_enum_value = type->getValue(left);
diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp
index 64ae9165c87..4ea5c5660e3 100644
--- a/dbms/src/Storages/DeltaMerge/Segment.cpp
+++ b/dbms/src/Storages/DeltaMerge/Segment.cpp
@@ -139,6 +139,38 @@ SegmentPtr Segment::restoreSegment(DMContext & context, PageId segment_id)
segment->delta->restore(OpContext::createForLogStorage(context));
segment->stable->restore(OpContext::createForDataStorage(context));
+#if 0
+ /// Dump the min-max index info
+ if (unlikely(segment->log->trace()))
+ {
+ for (const auto & chunk : segment->delta->getChunks())
+ {
+ for (auto && [col_id, meta] : chunk.getMetas())
+ {
+ (void)col_id;
+ if (meta.minmax)
+ {
+ LOG_TRACE(segment->log,
+ "Segment[" << segment->segmentId() << "] VS[delta] " << chunk.info() << " Col[" << meta.col_id << "] MinMax["
+ << meta.minmax->toString() << "]");
+ }
+ }
+ }
+ for (const auto & chunk : segment->stable->getChunks())
+ {
+ for (auto && [col_id, meta] : chunk.getMetas())
+ {
+ (void)col_id;
+ if (meta.minmax)
+ {
+ LOG_TRACE(segment->log,
+ "Segment[" << segment->segmentId() << "] VS[stable] " << chunk.info() << " Col[" << meta.col_id << "] MinMax["
+ << meta.minmax->toString() << "]");
+ }
+ }
+ }
+ }
+#endif
return segment;
}
@@ -578,7 +610,6 @@ Segment::ReadInfo Segment::getReadInfo(const DMContext & dm_context,
auto new_read_columns = arrangeReadColumns(dm_context.handle_column, read_columns);
DeltaValueSpacePtr delta_value_space;
- ChunkBlockInputStreamPtr stable_input_stream;
DeltaIndexPtr delta_index;
auto delta_block = segment_snap.delta->read(new_read_columns, storage_snap.log_reader, 0, segment_snap.delta_rows);
diff --git a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h
index 0c8306895a6..fbf85efe913 100644
--- a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h
+++ b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h
@@ -114,6 +114,60 @@ class DMTestEnv
return block;
}
+ /**
+ * Create a simple block with 3 columns:
+ * * `pk` - Int64 / `version` / `tag`
+ * @param pk `pk`'s value
+ * @param tso_beg `tso`'s value begin
+ * @param tso_end `tso`'s value end (not included)
+ * @return
+ */
+ static Block prepareBlockWithIncreasingTso(Int64 pk, size_t tso_beg, size_t tso_end)
+ {
+ Block block;
+ const size_t num_rows = (tso_end - tso_beg);
+ {
+ ColumnWithTypeAndName col1(std::make_shared(), pk_name);
+ {
+ IColumn::MutablePtr m_col = col1.type->createColumn();
+ // insert form large to small
+ for (size_t i = 0; i < num_rows; i++)
+ {
+ Field field = Int64(pk);
+ m_col->insert(field);
+ }
+ col1.column = std::move(m_col);
+ }
+ block.insert(col1);
+
+ ColumnWithTypeAndName version_col(VERSION_COLUMN_TYPE, VERSION_COLUMN_NAME);
+ {
+ IColumn::MutablePtr m_col = version_col.type->createColumn();
+ for (size_t i = 0; i < num_rows; ++i)
+ {
+ Field field = Int64(tso_beg + i);
+ m_col->insert(field);
+ }
+ version_col.column = std::move(m_col);
+ }
+ block.insert(version_col);
+
+ ColumnWithTypeAndName tag_col(TAG_COLUMN_TYPE, TAG_COLUMN_NAME);
+ {
+ IColumn::MutablePtr m_col = tag_col.type->createColumn();
+ auto & column_data = typeid_cast &>(*m_col).getData();
+ column_data.resize(num_rows);
+ for (size_t i = 0; i < num_rows; ++i)
+ {
+ column_data[i] = 0;
+ }
+ tag_col.column = std::move(m_col);
+ }
+ block.insert(tag_col);
+ }
+ return block;
+ }
+
/// prepare a row like this:
/// {"pk":pk, "version":tso, "delete_mark":mark, "colname":value}
static Block prepareOneRowBlock(Int64 pk, UInt64 tso, UInt8 mark, const String & colname, const String & value)
diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
index e74df034321..1cdeef1d770 100644
--- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
+++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
@@ -191,6 +191,7 @@ TEST_F(DeltaMergeStore_test, SimpleWriteRead)
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
@@ -279,6 +280,7 @@ TEST_F(DeltaMergeStore_test, DeleteRead)
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
@@ -314,6 +316,7 @@ TEST_F(DeltaMergeStore_test, DeleteRead)
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
@@ -359,6 +362,7 @@ TEST_F(DeltaMergeStore_test, WriteMultipleBlock)
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
@@ -402,6 +406,7 @@ TEST_F(DeltaMergeStore_test, WriteMultipleBlock)
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
@@ -431,6 +436,7 @@ TEST_F(DeltaMergeStore_test, WriteMultipleBlock)
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ UInt64(1),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
@@ -482,6 +488,7 @@ TEST_F(DeltaMergeStore_test, DISABLED_WriteLargeBlock)
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
while (Block block = in->read())
@@ -519,6 +526,7 @@ TEST_F(DeltaMergeStore_test, DISABLED_WriteLargeBlock)
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;
// block_num represents index of current segment
@@ -574,6 +582,7 @@ TEST_F(DeltaMergeStore_test, ReadWithSpecifyTso)
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1UL);
BlockInputStreamPtr in = ins[0];
@@ -595,6 +604,7 @@ TEST_F(DeltaMergeStore_test, ReadWithSpecifyTso)
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ tso2,
+ EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1UL);
BlockInputStreamPtr in = ins[0];
@@ -616,6 +626,7 @@ TEST_F(DeltaMergeStore_test, ReadWithSpecifyTso)
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ tso1,
+ EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1UL);
BlockInputStreamPtr in = ins[0];
@@ -637,6 +648,7 @@ TEST_F(DeltaMergeStore_test, ReadWithSpecifyTso)
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ tso1 - 1,
+ EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1UL);
BlockInputStreamPtr in = ins[0];
@@ -683,6 +695,7 @@ try
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1UL);
BlockInputStreamPtr in = ins[0];
@@ -802,6 +815,7 @@ try
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1UL);
BlockInputStreamPtr & in = ins[0];
@@ -924,6 +938,7 @@ try
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1UL);
BlockInputStreamPtr & in = ins[0];
@@ -1027,6 +1042,7 @@ try
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1UL);
BlockInputStreamPtr & in = ins[0];
@@ -1137,6 +1153,7 @@ try
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024)[0];
in->readPrefix();
@@ -1222,6 +1239,7 @@ try
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024)[0];
in->readPrefix();
@@ -1326,6 +1344,7 @@ try
{HandleRange::newAll()},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits::max(),
+ EMPTY_FILTER,
/* expected_block_size= */ 1024);
ASSERT_EQ(ins.size(), 1UL);
BlockInputStreamPtr & in = ins[0];
diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp
index a151fc29d14..8ef7dcac27a 100644
--- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp
+++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp
@@ -36,7 +36,7 @@ class DiskValueSpace_test : public ::testing::Test
storage_pool = std::make_unique("test.t1", path);
Context & context = DMTestEnv::getContext();
- table_handle_define = ColumnDefine(1, "pk", std::make_shared());
+ table_handle_define = ColumnDefine(1, DMTestEnv::pk_name, std::make_shared());
table_columns.clear();
table_columns.emplace_back(table_handle_define);
table_columns.emplace_back(getVersionColumnDefine());
@@ -52,9 +52,9 @@ class DiskValueSpace_test : public ::testing::Test
.handle_column = table_handle_define,
.min_version = 0,
- .not_compress = settings.not_compress_columns,
+ .not_compress = settings.not_compress_columns,
- .segment_limit_rows = context.getSettingsRef().dm_segment_limit_rows,
+ .segment_limit_rows = context.getSettingsRef().dm_segment_limit_rows,
.delta_limit_rows = context.getSettingsRef().dm_segment_delta_limit_rows,
.delta_limit_bytes = context.getSettingsRef().dm_segment_delta_limit_bytes,
@@ -188,6 +188,214 @@ TEST_F(DiskValueSpace_test, LogStorageWriteRead)
}
}
+TEST_F(DiskValueSpace_test, writeChunks_OneBlock)
+{
+ const Int64 pk_min = 20, pk_max = 40;
+ Block block = DMTestEnv::prepareSimpleWriteBlock(pk_min, pk_max, false);
+ auto in = std::make_shared(block);
+
+ WriteBatch wb;
+ auto opc = DiskValueSpace::OpContext::createForDataStorage(*dm_context);
+ auto chunks = DiskValueSpace::writeChunks(opc, in, wb);
+ ASSERT_EQ(chunks.size(), 1UL);
+
+ const Chunk & chunk = chunks[0];
+ ASSERT_EQ(chunk.getRows(), size_t(pk_max - pk_min));
+ ASSERT_EQ(chunk.getHandleFirstLast(), HandlePair(pk_min, pk_max - 1));
+}
+
+TEST_F(DiskValueSpace_test, writeChunks_NonOverlapBlocks)
+{
+ const Int64 pk_min = 20, pk_max = 40;
+ const Int64 pk_span = pk_max - pk_min;
+ BlockInputStreamPtr in = {};
+ {
+ BlocksList blocks;
+ // First block [20, 30)
+ Block block = DMTestEnv::prepareSimpleWriteBlock(pk_min, pk_min + pk_span / 2, false);
+ {
+ auto col = block.getByName(DMTestEnv::pk_name);
+ EXPECT_EQ(col.column->getInt(0), pk_min);
+ EXPECT_EQ(col.column->getInt(col.column->size() - 1), pk_min + pk_span / 2 - 1);
+ EXPECT_EQ(block.rows(), size_t(10));
+ }
+ blocks.emplace_back(block);
+ // First block [30, 40)
+ block = DMTestEnv::prepareSimpleWriteBlock(pk_min + pk_span / 2, pk_max, false);
+ {
+ auto col = block.getByName(DMTestEnv::pk_name);
+ EXPECT_EQ(col.column->getInt(0), pk_min + pk_span / 2);
+ EXPECT_EQ(col.column->getInt(col.column->size() - 1), pk_max - 1);
+ EXPECT_EQ(block.rows(), size_t(10));
+ }
+ blocks.emplace_back(block);
+ in = std::make_shared(std::move(blocks));
+ }
+
+ WriteBatch wb;
+ auto opc = DiskValueSpace::OpContext::createForDataStorage(*dm_context);
+ auto chunks = DiskValueSpace::writeChunks(opc, in, wb);
+ ASSERT_EQ(chunks.size(), 2UL);
+
+ {
+ const Chunk & chunk = chunks[0];
+ ASSERT_EQ(chunk.getRows(), size_t(pk_span / 2));
+ EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(pk_min, pk_min + pk_span / 2 - 1));
+ }
+ {
+ const Chunk & chunk = chunks[1];
+ ASSERT_EQ(chunk.getRows(), size_t(pk_span / 2));
+ EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(pk_min + pk_span / 2, pk_max - 1));
+ }
+}
+
+TEST_F(DiskValueSpace_test, writeChunks_OverlapBlocks)
+{
+ BlockInputStreamPtr in = {};
+ {
+ BlocksList blocks;
+ // First block [20, 30]
+ Block block = DMTestEnv::prepareSimpleWriteBlock(20, 31, false);
+ {
+ auto col = block.getByName(DMTestEnv::pk_name);
+ EXPECT_EQ(col.column->getInt(0), 20);
+ EXPECT_EQ(col.column->getInt(col.column->size() - 1), 30);
+ EXPECT_EQ(block.rows(), size_t(11));
+ }
+ blocks.emplace_back(block);
+
+ // Second block [30, 40), and pk=30 have multiple version
+ {
+ // version [100, 110) for pk=30
+ Block block_of_multi_versions = DMTestEnv::prepareBlockWithIncreasingTso(30, 100, 110);
+ // pk [31,40]
+ Block block_2 = DMTestEnv::prepareSimpleWriteBlock(31, 41, false);
+ DM::concat(block_of_multi_versions, block_2);
+ block = block_of_multi_versions;
+ {
+ auto col = block.getByName(DMTestEnv::pk_name);
+ EXPECT_EQ(col.column->getInt(0), 30);
+ EXPECT_EQ(col.column->getInt(col.column->size() - 1), 40);
+ EXPECT_EQ(block.rows(), size_t(10 + 10));
+ }
+ }
+ blocks.emplace_back(block);
+
+ // Third block [40, 50), and pk=40 have multiple version
+ {
+ // version [300, 305) for pk=40
+ Block block_of_multi_versions = DMTestEnv::prepareBlockWithIncreasingTso(40, 300, 305);
+ // pk [41,50)
+ Block block_2 = DMTestEnv::prepareSimpleWriteBlock(41, 50, false);
+ DM::concat(block_of_multi_versions, block_2);
+ block = block_of_multi_versions;
+ {
+ auto col = block.getByName(DMTestEnv::pk_name);
+ EXPECT_EQ(col.column->getInt(0), 40);
+ EXPECT_EQ(col.column->getInt(col.column->size() - 1), 49);
+ EXPECT_EQ(block.rows(), size_t(5 + 9));
+ }
+ }
+ blocks.emplace_back(block);
+
+ in = std::make_shared(std::move(blocks));
+ }
+
+ WriteBatch wb;
+ auto opc = DiskValueSpace::OpContext::createForDataStorage(*dm_context);
+ auto chunks = DiskValueSpace::writeChunks(opc, in, wb);
+ ASSERT_EQ(chunks.size(), 3UL);
+
+ {
+ const Chunk & chunk = chunks[0];
+ // should be [20, 30], and pk=30 with 11 versions
+ ASSERT_EQ(chunk.getRows(), size_t(11 + 10));
+ EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(20, 30));
+ }
+ {
+ const Chunk & chunk = chunks[1];
+ // should be [31, 40], and pk=40 with 6 versions
+ ASSERT_EQ(chunk.getRows(), size_t(9 + 6));
+ EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(31, 40));
+ }
+ {
+ const Chunk & chunk = chunks[2];
+ // should be [41, 50)
+ ASSERT_EQ(chunk.getRows(), size_t(9));
+ EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(41, 49));
+ }
+}
+
+TEST_F(DiskValueSpace_test, writeChunks_OverlapBlocksMerged)
+{
+ BlockInputStreamPtr in = {};
+ {
+ BlocksList blocks;
+ // First block [20, 30]
+ Block block = DMTestEnv::prepareSimpleWriteBlock(20, 31, false);
+ {
+ auto col = block.getByName(DMTestEnv::pk_name);
+ EXPECT_EQ(col.column->getInt(0), 20);
+ EXPECT_EQ(col.column->getInt(col.column->size() - 1), 30);
+ EXPECT_EQ(block.rows(), size_t(11));
+ }
+ blocks.emplace_back(block);
+
+ // Second block [30, 31), and pk=30 have multiple version
+ {
+ // version [100, 110) for pk=30
+ Block block_of_multi_versions = DMTestEnv::prepareBlockWithIncreasingTso(30, 100, 110);
+ block = block_of_multi_versions;
+ {
+ auto col = block.getByName(DMTestEnv::pk_name);
+ EXPECT_EQ(col.column->getInt(0), 30);
+ EXPECT_EQ(col.column->getInt(col.column->size() - 1), 30);
+ EXPECT_EQ(block.rows(), size_t(10));
+ }
+ }
+ blocks.emplace_back(block);
+
+ // Third block [30, 50), and pk=30 have multiple version
+ {
+ // version [300, 305) for pk=30
+ Block block_of_multi_versions = DMTestEnv::prepareBlockWithIncreasingTso(30, 300, 305);
+ // pk [41,50)
+ Block block_2 = DMTestEnv::prepareSimpleWriteBlock(31, 50, false);
+ DM::concat(block_of_multi_versions, block_2);
+ block = block_of_multi_versions;
+ {
+ auto col = block.getByName(DMTestEnv::pk_name);
+ EXPECT_EQ(col.column->getInt(0), 30);
+ EXPECT_EQ(col.column->getInt(col.column->size() - 1), 49);
+ EXPECT_EQ(block.rows(), size_t(5 + 19));
+ }
+ }
+ blocks.emplace_back(block);
+
+ in = std::make_shared(std::move(blocks));
+ }
+
+ WriteBatch wb;
+ auto opc = DiskValueSpace::OpContext::createForDataStorage(*dm_context);
+ auto chunks = DiskValueSpace::writeChunks(opc, in, wb);
+ // Second block is merge to the first
+ ASSERT_EQ(chunks.size(), 2UL);
+
+ {
+ const Chunk & chunk = chunks[0];
+ // should be [20, 30], and pk=30 with 1 + 10 + 5 versions
+ ASSERT_EQ(chunk.getRows(), size_t(11 + 10 + 5));
+ EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(20, 30));
+ }
+ {
+ const Chunk & chunk = chunks[1];
+ // should be [31, 50)
+ ASSERT_EQ(chunk.getRows(), size_t(19));
+ EXPECT_EQ(chunk.getHandleFirstLast(), HandlePair(31, 49));
+ }
+}
+
+
} // namespace tests
} // namespace DM
} // namespace DB
diff --git a/dbms/src/Storages/Page/PageFile.cpp b/dbms/src/Storages/Page/PageFile.cpp
index 95414564309..ce19fca591c 100644
--- a/dbms/src/Storages/Page/PageFile.cpp
+++ b/dbms/src/Storages/Page/PageFile.cpp
@@ -183,9 +183,13 @@ std::pair analyzeMetaFile( //
// check the checksum of WriteBatch
const auto wb_bytes_without_checksum = wb_bytes - sizeof(Checksum);
const auto wb_checksum = PageUtil::get(wb_start_pos + wb_bytes_without_checksum);
- if (wb_checksum != CityHash_v1_0_2::CityHash64(wb_start_pos, wb_bytes_without_checksum))
+ const auto checksum_calc = CityHash_v1_0_2::CityHash64(wb_start_pos, wb_bytes_without_checksum);
+ if (wb_checksum != checksum_calc)
{
- throw Exception("Write batch checksum not match, path: " + path + ", offset: " + DB::toString(wb_start_pos - meta_data),
+ std::stringstream ss;
+ ss << "expected: " << std::hex << wb_checksum << ", but: " << checksum_calc;
+ throw Exception("Write batch checksum not match, path: " + path + ", offset: " + DB::toString(wb_start_pos - meta_data)
+ + ", bytes: " + DB::toString(wb_bytes) + ", " + ss.str(),
ErrorCodes::CHECKSUM_DOESNT_MATCH);
}
diff --git a/dbms/src/Storages/Page/PageFile.h b/dbms/src/Storages/Page/PageFile.h
index 699a37d2e4e..9fa99c743cb 100644
--- a/dbms/src/Storages/Page/PageFile.h
+++ b/dbms/src/Storages/Page/PageFile.h
@@ -124,11 +124,11 @@ class PageFile : public Allocator
bool isExist() const;
void removeDataIfExists() const;
+ String folderPath() const;
private:
/// Create a new page file.
PageFile(PageFileId file_id_, UInt32 level_, const String & parent_path, bool is_tmp_, bool is_create, Poco::Logger * log);
- String folderPath() const;
String dataPath() const { return folderPath() + "/page"; }
String metaPath() const { return folderPath() + "/meta"; }
diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp
index 0f006e0370f..71e0731ce07 100644
--- a/dbms/src/Storages/Page/PageStorage.cpp
+++ b/dbms/src/Storages/Page/PageStorage.cpp
@@ -83,16 +83,26 @@ PageStorage::PageStorage(String name, const String & storage_path_, const Config
#ifdef DELTA_VERSION_SET
for (auto & page_file : page_files)
{
- PageEntriesEdit edit;
- const_cast(page_file).readAndSetPageMetas(edit);
+ try
+ {
+ PageEntriesEdit edit;
+ const_cast(page_file).readAndSetPageMetas(edit);
- // Only level 0 is writable.
- if (page_file.getLevel() == 0)
+ // Only level 0 is writable.
+ if (page_file.getLevel() == 0)
+ {
+ write_file = page_file;
+ }
+
+ // apply edit to new version
+ versioned_page_entries.apply(edit);
+ }
+ catch (Exception & e)
{
- write_file = page_file;
+ /// Better diagnostics.
+ e.addMessage("(while applying edit from " + page_file.folderPath() + " to PageStorage: " + storage_name + ")");
+ throw;
}
- // apply edit to new version
- versioned_page_entries.apply(edit);
}
#else
auto snapshot = versioned_page_entries.getSnapshot();
diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp
index a37089db8f2..694ca48b65e 100644
--- a/dbms/src/Storages/StorageDeltaMerge.cpp
+++ b/dbms/src/Storages/StorageDeltaMerge.cpp
@@ -7,6 +7,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -536,8 +537,49 @@ BlockInputStreams StorageDeltaMerge::read( //
}
}
- return store->read(
- context, context.getSettingsRef(), to_read, ranges, num_streams, /*max_version=*/mvcc_query_info.read_tso, max_block_size);
+ DM::RSOperatorPtr rs_operator = DM::EMPTY_FILTER;
+ const bool enable_rs_filter = context.getSettingsRef().dm_enable_rough_set_filter;
+ if (enable_rs_filter)
+ {
+ if (likely(query_info.dag_query))
+ {
+ /// Query from TiDB / TiSpark
+ auto create_attr_by_column_id = [this](ColumnID column_id) -> Attr {
+ const ColumnDefines & defines = this->store->getTableColumns();
+ auto iter = std::find_if(
+ defines.begin(), defines.end(), [column_id](const ColumnDefine & d) -> bool { return d.id == column_id; });
+ if (iter != defines.end())
+ return Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type};
+ else
+ // Maybe throw an exception? Or check if `type` is nullptr before creating filter?
+ return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}};
+ };
+ rs_operator = FilterParser::parseDAGQuery(*query_info.dag_query, std::move(create_attr_by_column_id), log);
+ }
+ else
+ {
+ // Query from ch client
+ auto create_attr_by_column_id = [this](const String &col_name) -> Attr {
+ const ColumnDefines & defines = this->store->getTableColumns();
+ auto iter = std::find_if(
+ defines.begin(), defines.end(), [&col_name](const ColumnDefine & d) -> bool { return d.name == col_name; });
+ if (iter != defines.end())
+ return Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type};
+ else
+ // Maybe throw an exception? Or check if `type` is nullptr before creating filter?
+ return Attr{.col_name = col_name, .col_id = 0, .type = DataTypePtr{}};
+ };
+ rs_operator = FilterParser::parseSelectQuery(select_query, std::move(create_attr_by_column_id), log);
+ }
+ if (likely(rs_operator != DM::EMPTY_FILTER))
+ LOG_DEBUG(log, "Rough set filter: " << rs_operator->toString());
+ }
+ else
+ LOG_DEBUG(log, "Rough set filter is disabled.");
+
+
+ return store->read(context, context.getSettingsRef(), to_read, ranges, num_streams, /*max_version=*/mvcc_query_info.read_tso,
+ rs_operator, max_block_size);
}
}
@@ -549,7 +591,7 @@ void StorageDeltaMerge::deleteRows(const Context & context, size_t rows)
{
ColumnDefines to_read{getExtraHandleColumnDefine()};
- auto stream = store->read(context, context.getSettingsRef(), to_read, {DM::HandleRange::newAll()}, 1, MAX_UINT64)[0];
+ auto stream = store->read(context, context.getSettingsRef(), to_read, {DM::HandleRange::newAll()}, 1, MAX_UINT64, EMPTY_FILTER)[0];
stream->readPrefix();
Block block;
while ((block = stream->read()))
@@ -563,7 +605,7 @@ void StorageDeltaMerge::deleteRows(const Context & context, size_t rows)
DM::HandleRange range = DM::HandleRange::newAll();
{
ColumnDefines to_read{getExtraHandleColumnDefine()};
- auto stream = store->read(context, context.getSettingsRef(), to_read, {DM::HandleRange::newAll()}, 1, MAX_UINT64)[0];
+ auto stream = store->read(context, context.getSettingsRef(), to_read, {DM::HandleRange::newAll()}, 1, MAX_UINT64, EMPTY_FILTER)[0];
stream->readPrefix();
Block block;
size_t index = 0;