Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 131 additions & 22 deletions docs/lakehouse/catalogs/doris-catalog.mdx

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ PROPERTIES
"s3.endpoint" = "http://s3-REGION.amazonaws.com",
"s3.region" = "s3-REGION",
"s3.access_key" = "AWS_ACCESS_KEY",
"s3.secret_key"="AWS_SECRET_KEY",
"s3.region" = "REGION"
"s3.secret_key"="AWS_SECRET_KEY"
);
```
**Note: **
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

Doris Catalog 允许用户通过 HTTP 协议和 Arrow Flight 协议进行跨多个 Doris 集群的数据访问。

本文档介绍如何配置远程 Doris 集群连接并进行查询。

:::note
该功能自 4.0.2 版本支持。

Expand All @@ -22,7 +18,7 @@ Doris Catalog 允许用户通过 HTTP 协议和 Arrow Flight 协议进行跨多

| 场景 | 说明 |
|---------|-----------------------------------------------------------------------|
| 联邦查询 | Doris 通过谓词下推与 Arrow Flight 协议,实现对多个独立 Doris 集群的关联查询 |
| 联邦查询 | 对多个独立 Doris 集群的关联查询 |

## 配置 Catalog

Expand All @@ -33,6 +29,8 @@ CREATE CATALOG [IF NOT EXISTS] catalog_name PROPERTIES (
'type' = 'doris', -- required
'fe_http_hosts' = 'http://<fe-host1>:<fe-http-port>,<fe-host2>:<fe-http-port>', -- required
'fe_arrow_hosts' = '<fe-host1>:<fe-arrow-flight-port>,<fe-host2>:<fe-arrow-flight-port>', -- required
'fe_thrift_hosts' = '<fe-host1>:<fe-thrift-port>,<fe-host2>:<fe-thrift-port>', -- required
'use_arrow_flight' = 'true/false', -- required
'user' = '', -- required
'password' = '', -- required
{QueryProperties},
Expand All @@ -49,13 +47,21 @@ CREATE CATALOG [IF NOT EXISTS] catalog_name PROPERTIES (

远端 Doris 集群 FE Arrow Flight 服务端点列表。

* `fe_thrift_hosts`

远端 Doris 集群 FE Thrift 服务端点列表。

* `use_arrow_flight`

采用 Arrow Flight 方式访问远端 Doris 集群还是将远端表当做内表执行计划发送给远端 Doris 集群执行

* `{QueryProperties}`

可选属性

| 参数名称 | 说明 | 默认值 |
|-----------------------------|------------------------------------------------------------------------------------------|-------|
| `enable_parallel_result_sink` | 开启后,本地 Doris BE 节点将并行地从远端 Doris 集群各 BE 节点拉取数据。 | true |
| `enable_parallel_result_sink` | 开启后,本地 Doris BE 节点将并行地从远端 Doris 集群各 BE 节点拉取数据。(针对 Arrow Flight 方式) | true |
| `query_retry_count` | 向远端 Doris 发送查询请求失败的最大重试次数。(不包含请求被接受后,远端 Doris 异步执行过程中可能发生的失败) | 3 |
| `query_timeout_sec` | 向远端 Doris 发送查询的超时时间。(不包含请求被接受后,远端 Doris 异步执行时间) | 15 |
| `compatible` | 用于在访问版本低于本集群的远端 Doris 时,尝试兼容其元数据格式。集群版本一致时无需开启。 | false |
Expand All @@ -80,14 +86,40 @@ CREATE CATALOG [IF NOT EXISTS] catalog_name PROPERTIES (

CommonProperties 部分用于填写通用属性。请参阅 数据目录概述 中【通用属性】部分。

## 访问模式

### Arrow Flight 模式

当 `use_arrow_flight` 属性为 `true` 时,则为 Arrow Flight 模式。

![arrow-flight-mode](/images/Lakehouse/doris-catalog/arrow-flight-mode.png)

在该模式下进行跨集群查询时,FE 之间通过 HTTP 协议同步 Schema 等元信息,然后本地集群的 BE 节点,通过 Arrow Flight 接口访问 Remote Doris 集群。

**优点**:对于 FE 基本没开销,执行计划仅生成查询 SQL 发往远端集群

**缺点**:可能无法利用 Doris 内表的各种优化特性,如聚合下推、有限的谓词下推等。

### 虚拟集群模式

当 `use_arrow_flight` 属性为 `false` 时,则为虚拟集群模式。

![virtual-cluster-mode](/images/Lakehouse/doris-catalog/virtual-cluster-mode.png)

在该模式下进行跨集群查询时,会将 Remote Doris 集群中的 Backend 节点当做虚拟节点进行查询规划。

FE 之间通过 HTTP 协议同步 Schema 等元信息。BE 直接通过内部通信协议进行数据传输。

**优点**:基本可以利用 Doris 内表查询的所有优化特性。查询执行流程和单集群内部流程一致。

**缺点**:对于较大的远端表来说,会获取远端表的所有信息 (分区信息,副本信息)。FE 的内存开销会上升,需要扩大 FE 内存。在各集群版本不一致时,比如高版本查询低版本,可能会出现查询失败。

## 列类型映射

Doris 外表类型与本地 Doris 类型完全相同。

## 查询操作

### 基础查询

配置好 Catalog 后,可以通过以下方式查询 Catalog 中的表数据:

```sql
Expand All @@ -104,10 +136,15 @@ SELECT * FROM doris_tbl LIMIT 10;
SELECT * FROM doris_ctl.doris_db.doris_tbl LIMIT 10;
```

### 查询优化
## 查询优化

Doris Catalog 访问数据源时,Doris 会尽量将谓词或函数条件下推并拼接到生成的 SQL 中。可以通过 EXPLAIN SQL 查看到生成的 SQL 语句。
```
### Arrow Flight 模式

该模式下,Doris 会尽量将谓词或函数条件下推并拼接到生成的 SQL 中。

可以通过 EXPLAIN SQL 查看到生成的 SQL 语句。

```sql
...
| 0:VREMOTE_DORIS_SCAN_NODE(68) |
| TABLE: test.test_time |
Expand All @@ -116,3 +153,75 @@ Doris Catalog 访问数据源时,Doris 会尽量将谓词或函数条件下推
...
```

### 虚拟集群模式

该模式下,执行计划看到的依然是 `VOlapScanNode`。

Doris 针对内表查询的各种优化都可以继续利用,比如 Join Runtime Filter。

```sql
MySQL [(none)]> explain select * from demo.inner_table a join edoris.external.example_tbl_duplicate b on (a.log_type = b.log_type) where error_code=2;
+-------------------------------------------------------------------------------------------------------------------------------------------+
| Explain String(Nereids Planner) |
+-------------------------------------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS: |
| log_type[#16] |
| reason[#17] |
| log_time[#18] |
| log_type[#19] |
| error_code[#20] |
| error_msg[#21] |
| op_id[#22] |
| op_time[#23] |
| PARTITION: HASH_PARTITIONED: log_type[#6] |
| |
| HAS_COLO_PLAN_NODE: false |
| |
| VRESULT SINK |
| MYSQL_PROTOCOL |
| |
| 3:VHASH JOIN(200) |
| | join op: INNER JOIN(BROADCAST)[] |
| | equal join conjunct: (log_type[#6] = log_type[#1]) |
| | cardinality=3 |
| | vec output tuple id: 3 |
| | output tuple id: 3 |
| | vIntermediate tuple ids: 2 |
| | hash output slot ids: 0 1 2 3 4 5 6 7 |
| | runtime filters: RF000[min_max] <- log_type[#1](1/1/1048576), RF001[in_or_bloom] <- log_type[#1](1/1/1048576) |
| | final projections: log_type[#8], reason[#9], log_time[#10], log_type[#11], error_code[#12], error_msg[#13], op_id[#14], op_time[#15] |
| | final project output tuple id: 3 |
| | distribute expr lists: log_type[#6] |
| | distribute expr lists: |
| | |
| |----1:VEXCHANGE |
| | offset: 0 |
| | distribute expr lists: log_type[#1] |
| | |
| 2:VOlapScanNode(187) |
| TABLE: demo.inner_table(inner_table), PREAGGREGATION: ON |
| partitions=1/1 (inner_table) |
| tablets=1/1, tabletList=1762832514491 |
| cardinality=3, avgRowSize=901.6666, numNodes=1 |
| pushAggOp=NONE |
| runtime filters: RF000[min_max] -> log_type[#6], RF001[in_or_bloom] -> log_type[#6] |
| |
| PLAN FRAGMENT 1 |
| |
| PARTITION: HASH_PARTITIONED: log_type[#1] |
| |
| HAS_COLO_PLAN_NODE: false |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 01 |
| UNPARTITIONED |
| |
| 0:VOlapScanNode(188) |
| TABLE: external.example_tbl_duplicate(example_tbl_duplicate), PREAGGREGATION: ON |
| PREDICATES: (error_code[#2] = 2) |
| partitions=1/1 (example_tbl_duplicate) |
| tablets=1/1, tabletList=1762481736238 |
| cardinality=1, avgRowSize=7425.0, numNodes=1 |
| pushAggOp=NONE
```
Loading