Skip to content

Commit

Permalink
feat: iot table (#3944)
Browse files Browse the repository at this point in the history
* feat: iot table

* fix

* fix

* fix delete key entry

* fix comment

* ut

* ut test

* fix ut

* sleep more for truncate

* sleep 16

* tool pytest fix and swig fix

* fix

* clean

* move to base

* fix

* fix coverage ut

* fix

---------

Co-authored-by: Huang Wei <huangwei@4paradigm.com>
  • Loading branch information
vagetablechicken and Huang Wei authored Jul 1, 2024
1 parent 25bd745 commit 1c1e213
Show file tree
Hide file tree
Showing 50 changed files with 3,082 additions and 309 deletions.
6 changes: 6 additions & 0 deletions docs/zh/openmldb_sql/ddl/CREATE_INDEX_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ CREATE INDEX index3 ON t5 (col3) OPTIONS (ts=ts1, ttl_type=absolute, ttl=30d);
```
关于`TTL``TTL_TYPE`的更多信息参考[这里](./CREATE_TABLE_STATEMENT.md)

IOT表创建不同类型的索引,不指定type创建Covering索引,指定type为secondary,创建Secondary索引:
```SQL
CREATE INDEX index_s ON t5 (col3) OPTIONS (ts=ts1, ttl_type=absolute, ttl=30d, type=secondary);
```
同keys和ts列的索引被视为同一个索引,不要尝试建立不同type的同一索引。

## 相关SQL

[DROP INDEX](./DROP_INDEX_STATEMENT.md)
23 changes: 22 additions & 1 deletion docs/zh/openmldb_sql/ddl/CREATE_TABLE_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ IndexOption ::=

| 配置项 | 描述 | expr | 用法示例 |
|------------|---------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------|
| `KEY` | 索引列(必选)。OpenMLDB支持单列索引,也支持联合索引。当`KEY`后只有一列时,仅在该列上建立索引。当`KEY`后有多列时,建立这几列的联合索引:将多列按顺序拼接成一个字符串作为索引。 | 支持单列索引:`ColumnName`<br/>或联合索引:<br/>`(ColumnName (, ColumnName)* ) ` | 单列索引:`INDEX(KEY=col1)`<br />联合索引:`INDEX(KEY=(col1, col2))` |
| `KEY/CKEY/SKEY` | 索引列(必选)。OpenMLDB支持单列索引,也支持联合索引。当`KEY`后只有一列时,仅在该列上建立索引。当`KEY`后有多列时,建立这几列的联合索引:将多列按顺序拼接成一个字符串作为索引。多KEY使用见[Index-Orgnized Table(IOT)](#index-orgnized-tableiot)| 支持单列索引:`ColumnName`<br/>或联合索引:<br/>`(ColumnName (, ColumnName)* ) ` | 单列索引:`INDEX(KEY=col1)`<br />联合索引:`INDEX(KEY=(col1, col2))` |
| `TS` | 索引时间列(可选)。同一个索引上的数据将按照时间索引列排序。当不显式配置`TS`时,使用数据插入的时间戳作为索引时间。时间列的类型只能为BigInt或者Timestamp | `ColumnName` | `INDEX(KEY=col1, TS=std_time)`。索引列为col1,col1相同的数据行按std_time排序。 |
| `TTL_TYPE` | 淘汰规则(可选)。包括四种类型,当不显式配置`TTL_TYPE`时,默认使用`ABSOLUTE`过期配置。 | 支持的expr如下:`ABSOLUTE` <br/> `LATEST`<br/>`ABSORLAT`<br/> `ABSANDLAT`| 具体用法可以参考下文“TTL和TTL_TYPE的配置细则” |
| `TTL` | 最大存活时间/条数(可选)。依赖于`TTL_TYPE`,不同的`TTL_TYPE`有不同的`TTL` 配置方式。当不显式配置`TTL`时,`TTL=0`,表示不设置淘汰规则,OpenMLDB将不会淘汰记录。 | 支持数值:`int_literal`<br/> 或数值带时间单位(`S,M,H,D`):`interval_literal`<br/>或元组形式:`( interval_literal , int_literal )` |具体用法可以参考下文“TTL和TTL_TYPE的配置细则” |
Expand All @@ -240,6 +240,27 @@ IndexOption ::=
```{note}
最大过期时间和最大存活条数的限制,是出于性能考虑。如果你一定要配置更大的TTL值,可先创建表时临时使用合规的TTL值,然后使用nameserver的UpdateTTL接口来调整到所需的值(可无视max限制),生效需要经过一个gc时间;或者,调整nameserver配置`absolute_ttl_max`和`latest_ttl_max`,重启生效后再创建表。
```
#### Index-Orgnized Table(IOT)

索引使用KEY设置时创建Covering索引,在OpenMLDB中Covering索引存储完整的数据行,也因此占用内存较多。如果希望内存占用更低,同时允许性能损失,可以使用IOT表。IOT表中可以建三种类型的索引:
- `CKEY`:Clustered索引,存完整数据行。配置的CKEY+TS用于唯一标识一行数据,INSERT重复主键时将更新数据(会触发所有索引上的删除旧数据,再INSERT新数据,性能会有损失)。也可只使用CKEY,不配置TS,CKEY唯一标识一行数据。查询到此索引的性能无损失。
- `SKEY`:Secondary索引,存主键。不配置TS时,同SKEY下按插入时间排序。查询时先在Secondary索引中找到对应主键值,再根据主键查数据,查询性能有损失。
- `KEY`:Covering索引,存完整数据行。不配置TS时,同KEY下按插入时间排序。查询到此索引的性能无损失。

创建IOT表,第一个索引必须是唯一一个Clustered索引,其他索引可选。暂不支持调整Clustered索引的顺序。

```sql
CREATE TABLE iot (c1 int64, c2 int64, c3 int64, INDEX(ckey=c1, ts=c2)); -- 一个Clustered索引
CREATE TABLE iot (c1 int64, c2 int64, c3 int64, INDEX(ckey=c1), INDEX(skey=c2)); -- 一个Clustered索引和一个Secondary索引
CREATE TABLE iot (c1 int64, c2 int64, c3 int64, INDEX(ckey=c1), INDEX(skey=c2), INDEX(key=c3)); -- 一个Clustered索引、一个Secondary索引和一个Covering索引
```

IOT各个索引的TTL与普通表的不同点是,IOT Clustered索引的ttl淘汰,将触发其他索引的删除操作,而Secondary索引和Covering索引的ttl淘汰,只会删除自身索引中的数据,不会触发其他索引的删除操作。通常来讲,除非有必要让Secondary和Covering索引更加节约内存,可以只设置Clustered索引的ttl,不设置Secondary和Covering索引的ttl。

##### 注意事项

- IOT表不可以并发写入相同主键的多条数据,可能出现冲突,至少一条数据会写入失败。IOT表中已存在的相同主键的数据不需要额外处理,将会被覆盖。为了不用修复导入,请在导入前做好数据清洗,对导入数据中相同主键的数据进行去重。(覆盖会出触发所有索引中的删除,单线程写入效率也非常低,所以并不推荐单线程导入。)
-

#### Example
**示例1:创建一张带单列索引的表**
Expand Down
4 changes: 2 additions & 2 deletions hybridse/include/node/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ class NodeManager {
SqlNode *MakeColumnIndexNode(SqlNodeList *keys, SqlNode *ts, SqlNode *ttl,
SqlNode *version);
SqlNode *MakeColumnIndexNode(SqlNodeList *index_item_list);
SqlNode *MakeIndexKeyNode(const std::string &key);
SqlNode *MakeIndexKeyNode(const std::vector<std::string> &keys);
SqlNode *MakeIndexKeyNode(const std::string &key, const std::string &type);
SqlNode *MakeIndexKeyNode(const std::vector<std::string> &keys, const std::string &type);
SqlNode *MakeIndexTsNode(const std::string &ts);
SqlNode *MakeIndexTTLNode(ExprListNode *ttl_expr);
SqlNode *MakeIndexTTLTypeNode(const std::string &ttl_type);
Expand Down
13 changes: 11 additions & 2 deletions hybridse/include/node/sql_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -2084,14 +2084,19 @@ class CreateStmt : public SqlNode {
class IndexKeyNode : public SqlNode {
public:
IndexKeyNode() : SqlNode(kIndexKey, 0, 0) {}
explicit IndexKeyNode(const std::string &key) : SqlNode(kIndexKey, 0, 0), key_({key}) {}
explicit IndexKeyNode(const std::vector<std::string> &keys) : SqlNode(kIndexKey, 0, 0), key_(keys) {}
explicit IndexKeyNode(const std::string &key, const std::string &type)
: SqlNode(kIndexKey, 0, 0), key_({key}), index_type_(type) {}
explicit IndexKeyNode(const std::vector<std::string> &keys, const std::string &type)
: SqlNode(kIndexKey, 0, 0), key_(keys), index_type_(type) {}
~IndexKeyNode() {}
void AddKey(const std::string &key) { key_.push_back(key); }
void SetIndexType(const std::string &type) { index_type_ = type; }
std::vector<std::string> &GetKey() { return key_; }
std::string &GetIndexType() { return index_type_; }

private:
std::vector<std::string> key_;
std::string index_type_ = "key";
};
class IndexVersionNode : public SqlNode {
public:
Expand Down Expand Up @@ -2145,6 +2150,7 @@ class ColumnIndexNode : public SqlNode {
public:
ColumnIndexNode()
: SqlNode(kColumnIndex, 0, 0),
index_type_("key"),
ts_(""),
version_(""),
version_count_(0),
Expand All @@ -2155,6 +2161,8 @@ class ColumnIndexNode : public SqlNode {

std::vector<std::string> &GetKey() { return key_; }
void SetKey(const std::vector<std::string> &key) { key_ = key; }
void SetIndexType(const std::string &type) { index_type_ = type; }
std::string &GetIndexType() { return index_type_; }

std::string GetTs() const { return ts_; }

Expand Down Expand Up @@ -2183,6 +2191,7 @@ class ColumnIndexNode : public SqlNode {

private:
std::vector<std::string> key_;
std::string index_type_;
std::string ts_;
std::string version_;
int version_count_;
Expand Down
9 changes: 5 additions & 4 deletions hybridse/src/node/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ SqlNode *NodeManager::MakeColumnIndexNode(SqlNodeList *index_item_list) {
switch (node_ptr->GetType()) {
case kIndexKey:
index_ptr->SetKey(dynamic_cast<IndexKeyNode *>(node_ptr)->GetKey());
index_ptr->SetIndexType(dynamic_cast<IndexKeyNode *>(node_ptr)->GetIndexType());
break;
case kIndexTs:
index_ptr->SetTs(dynamic_cast<IndexTsNode *>(node_ptr)->GetColumnName());
Expand Down Expand Up @@ -649,12 +650,12 @@ FnParaNode *NodeManager::MakeFnParaNode(const std::string &name, const TypeNode
::hybridse::node::FnParaNode *para_node = new ::hybridse::node::FnParaNode(expr_id);
return RegisterNode(para_node);
}
SqlNode *NodeManager::MakeIndexKeyNode(const std::string &key) {
SqlNode *node_ptr = new IndexKeyNode(key);
SqlNode *NodeManager::MakeIndexKeyNode(const std::string &key, const std::string &type) {
SqlNode *node_ptr = new IndexKeyNode(key, type);
return RegisterNode(node_ptr);
}
SqlNode *NodeManager::MakeIndexKeyNode(const std::vector<std::string> &keys) {
SqlNode *node_ptr = new IndexKeyNode(keys);
SqlNode *NodeManager::MakeIndexKeyNode(const std::vector<std::string> &keys, const std::string &type) {
SqlNode *node_ptr = new IndexKeyNode(keys, type);
return RegisterNode(node_ptr);
}
SqlNode *NodeManager::MakeIndexTsNode(const std::string &ts) {
Expand Down
2 changes: 1 addition & 1 deletion hybridse/src/node/plan_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ TEST_F(PlanNodeTest, MultiPlanNodeTest) {

TEST_F(PlanNodeTest, ExtractColumnsAndIndexsTest) {
SqlNodeList *index_items = manager_->MakeNodeList();
index_items->PushBack(manager_->MakeIndexKeyNode("col4"));
index_items->PushBack(manager_->MakeIndexKeyNode("col4", "key"));
index_items->PushBack(manager_->MakeIndexTsNode("col5"));
ColumnIndexNode *index_node = dynamic_cast<ColumnIndexNode *>(manager_->MakeColumnIndexNode(index_items));
index_node->SetName("index1");
Expand Down
2 changes: 2 additions & 0 deletions hybridse/src/node/sql_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,8 @@ static absl::flat_hash_map<SqlNodeType, absl::string_view> CreateSqlNodeTypeToNa
{kCreateFunctionStmt, "kCreateFunctionStmt"},
{kCreateUserStmt, "kCreateUserStmt"},
{kAlterUserStmt, "kAlterUserStmt"},
{kRevokeStmt, "kRevokeStmt"},
{kGrantStmt, "kGrantStmt"},
{kDynamicUdfFnDef, "kDynamicUdfFnDef"},
{kDynamicUdafFnDef, "kDynamicUdafFnDef"},
{kWithClauseEntry, "kWithClauseEntry"},
Expand Down
2 changes: 1 addition & 1 deletion hybridse/src/node/sql_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ TEST_F(SqlNodeTest, IndexVersionNodeTest) {

TEST_F(SqlNodeTest, CreateIndexNodeTest) {
SqlNodeList *index_items = node_manager_->MakeNodeList();
index_items->PushBack(node_manager_->MakeIndexKeyNode("col4"));
index_items->PushBack(node_manager_->MakeIndexKeyNode("col4", "key"));
index_items->PushBack(node_manager_->MakeIndexTsNode("col5"));
ColumnIndexNode *index_node = dynamic_cast<ColumnIndexNode *>(node_manager_->MakeColumnIndexNode(index_items));
CreatePlanNode *node = node_manager_->MakeCreateTablePlanNode(
Expand Down
3 changes: 1 addition & 2 deletions hybridse/src/plan/planner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ bool Planner::ExpandCurrentHistoryWindow(std::vector<const node::WindowDefNode *
}
return has_window_expand;
}

// TODO(hw): unused
base::Status Planner::TransformTableDef(const std::string &table_name, const NodePointVector &column_desc_list,
type::TableDef *table) {
std::set<std::string> index_names;
Expand Down Expand Up @@ -1199,7 +1199,6 @@ base::Status Planner::TransformTableDef(const std::string &table_name, const Nod

case node::kColumnIndex: {
node::ColumnIndexNode *column_index = static_cast<node::ColumnIndexNode *>(column_desc);

if (column_index->GetName().empty()) {
column_index->SetName(PlanAPI::GenerateName("INDEX", table->indexes_size()));
}
Expand Down
37 changes: 28 additions & 9 deletions hybridse/src/planv2/ast_node_converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1598,7 +1598,7 @@ base::Status ConvertColumnIndexNode(const zetasql::ASTIndexDefinition* ast_def_n
}

// case entry->name()
// "key" -> IndexKeyNode
// "key"/"ckey"/"skey" -> IndexKeyNode
// "ts" -> IndexTsNode
// "ttl" -> IndexTTLNode
// "ttl_type" -> IndexTTLTypeNode
Expand All @@ -1607,14 +1607,13 @@ base::Status ConvertIndexOption(const zetasql::ASTOptionsEntry* entry, node::Nod
node::SqlNode** output) {
auto name = entry->name()->GetAsString();
absl::string_view name_v(name);
if (absl::EqualsIgnoreCase("key", name_v)) {
if (absl::EqualsIgnoreCase("key", name_v) || absl::EqualsIgnoreCase("ckey", name_v) || absl::EqualsIgnoreCase("skey", name_v)) {
switch (entry->value()->node_kind()) {
case zetasql::AST_PATH_EXPRESSION: {
std::string column_name;
CHECK_STATUS(
AstPathExpressionToString(entry->value()->GetAsOrNull<zetasql::ASTPathExpression>(), &column_name));
*output = node_manager->MakeIndexKeyNode(column_name);

*output = node_manager->MakeIndexKeyNode(column_name, absl::AsciiStrToLower(name_v));
return base::Status::OK();
}
case zetasql::AST_STRUCT_CONSTRUCTOR_WITH_PARENS: {
Expand All @@ -1632,7 +1631,7 @@ base::Status ConvertIndexOption(const zetasql::ASTOptionsEntry* entry, node::Nod
ast_struct_expr->field_expression(0)->GetAsOrNull<zetasql::ASTPathExpression>(), &key_str));

node::IndexKeyNode* index_keys =
dynamic_cast<node::IndexKeyNode*>(node_manager->MakeIndexKeyNode(key_str));
dynamic_cast<node::IndexKeyNode*>(node_manager->MakeIndexKeyNode(key_str, absl::AsciiStrToLower(name_v)));

for (int i = 1; i < field_expr_len; ++i) {
std::string key;
Expand All @@ -1643,7 +1642,6 @@ base::Status ConvertIndexOption(const zetasql::ASTOptionsEntry* entry, node::Nod
index_keys->AddKey(key);
}
*output = index_keys;

return base::Status::OK();
}
default: {
Expand Down Expand Up @@ -2256,13 +2254,34 @@ base::Status ConvertCreateIndexStatement(const zetasql::ASTCreateIndexStatement*
keys.push_back(path.back());
}
node::SqlNodeList* index_node_list = node_manager->MakeNodeList();

node::SqlNode* index_key_node = node_manager->MakeIndexKeyNode(keys);
// extract index type from options
std::string index_type{"key"};
if (root->options_list() != nullptr) {
for (const auto option : root->options_list()->options_entries()) {
if (auto name = option->name()->GetAsString(); absl::EqualsIgnoreCase(name, "type")) {
CHECK_TRUE(option->value()->node_kind() == zetasql::AST_PATH_EXPRESSION, common::kSqlAstError,
"Invalid index type, should be path expression");
std::string type_name;
CHECK_STATUS(
AstPathExpressionToString(option->value()->GetAsOrNull<zetasql::ASTPathExpression>(), &type_name));
if (absl::EqualsIgnoreCase(type_name, "secondary")) {
index_type = "skey";
} else if (!absl::EqualsIgnoreCase(type_name, "covering")) {
FAIL_STATUS(common::kSqlAstError, "Invalid index type: ", type_name);
}
}
}
}
node::SqlNode* index_key_node = node_manager->MakeIndexKeyNode(keys, index_type);
index_node_list->PushBack(index_key_node);
if (root->options_list() != nullptr) {
for (const auto option : root->options_list()->options_entries()) {
// ignore type
if (auto name = option->name()->GetAsString(); absl::EqualsIgnoreCase(name, "type")) {
continue;
}
node::SqlNode* node = nullptr;
CHECK_STATUS(ConvertIndexOption(option, node_manager, &node));
CHECK_STATUS(ConvertIndexOption(option, node_manager, &node)); // option set secondary index type
if (node != nullptr) {
// NOTE: unhandled option will return OK, but node is not set
index_node_list->PushBack(node);
Expand Down
2 changes: 1 addition & 1 deletion hybridse/src/sdk/codec_sdk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ bool RowIOBufView::Reset(const butil::IOBuf& buf) {
return false;
}
str_addr_length_ = codec::GetAddrLength(size_);
DLOG(INFO) << "size " << size_ << " addr length " << str_addr_length_;
DLOG(INFO) << "size " << size_ << " addr length " << (unsigned int)str_addr_length_;
return true;
}

Expand Down
Loading

0 comments on commit 1c1e213

Please sign in to comment.