Skip to content

Commit

Permalink
[doc](ecosystem) improve spark connector doc (#1292)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Nov 6, 2024
1 parent 0a86f07 commit 87f39fb
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 139 deletions.
132 changes: 67 additions & 65 deletions common_docs_zh/ecosystem/spark-doris-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,30 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据
| 1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 |
| 1.0.1 | 3.1, 2.3 | 0.12 - 0.15 | 8 | 2.12, 2.11 |

## 编译与安装
## 使用

准备工作
### Maven
```
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.4_2.12</artifactId>
<version>1.3.2</version>
</dependency>
```

**备注**

1. 请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。

2. 也可从[这里](https://repo.maven.apache.org/maven2/org/apache/doris/)下载相关版本 jar 包。

### 编译

编译时,可直接运行 `sh build.sh`,具体可参考这里。

编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar。 将此文件复制到 `Spark``ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。
也可以

1. 修改`custom_env.sh.tpl`文件,重命名为`custom_env.sh`

2. 在源码目录下执行:
`sh build.sh`
Expand All @@ -59,36 +78,18 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据

例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。

例如将 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar
包路径

例如将 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar包路径
```shell
1. 上传 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 到 hdfs。

```
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/
```

2. 在集群中添加 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 依赖。

```
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar
```

## 使用 Maven 管理

```
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.4_2.12</artifactId>
<version>1.3.0</version>
</dependency>
```

**注意**

请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。

## 使用示例

### 读取
Expand All @@ -112,7 +113,7 @@ FROM spark_doris;

#### DataFrame

```scala
```java
val dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
Expand All @@ -125,7 +126,7 @@ dorisSparkDF.show(5)

#### RDD

```scala
```java
import org.apache.doris.spark._

val dorisSparkRDD = sc.dorisRDD(
Expand All @@ -142,7 +143,7 @@ dorisSparkRDD.collect()

#### pySpark

```scala
```java
dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
Expand Down Expand Up @@ -182,8 +183,8 @@ FROM YOUR_TABLE

#### DataFrame(batch/stream)

```scala
## batch sink
```java
// batch sink
val mockDataDF = List(
(3, "440403001005", "21.cn"),
(1, "4404030013005", "22.cn"),
Expand All @@ -203,9 +204,9 @@ mockDataDF.write.format("doris")
// .option("save_mode", SaveMode.Overwrite)
.save()

## stream sink(StructuredStreaming)
// stream sink(StructuredStreaming)

### 结果 DataFrame 和 doris 表相同的结构化数据, 配置方式和批量模式一致。
// 结果 DataFrame 和 doris 表相同的结构化数据, 配置方式和批量模式一致。
val sourceDf = spark.readStream.
.format("your_own_stream_source")
.load()
Expand All @@ -222,7 +223,7 @@ resultDf.writeStream
.start()
.awaitTermination()

### 结果 DataFrame 中存在某一列的数据可以直接写入的,比如符合导入规范的 Kafka 消息中的 value 值
// 结果 DataFrame 中存在某一列的数据可以直接写入的,比如符合导入规范的 Kafka 消息中的 value 值

val kafkaSource = spark.readStream
.format("kafka")
Expand Down Expand Up @@ -265,14 +266,14 @@ kafkaSource.selectExpr("CAST(value as STRING)")
| doris.request.query.timeout.s | 3600 | 查询 doris 的超时时间,默认值为 1 小时,-1 表示无超时限制 |
| doris.request.tablet.size | Integer.MAX_VALUE | 一个 RDD Partition 对应的 Doris Tablet 个数。<br />此数值设置越小,则会生成越多的 Partition。从而提升 Spark 侧的并行度,但同时会对 Doris 造成更大的压力。 |
| doris.read.field | -- | 读取 Doris 表的列名列表,多列之间使用逗号分隔 |
| doris.batch.size | 1024 | 一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数。<br />从而减轻网络延迟所带来的额外时间开销。 |
| doris.batch.size | 4064 | 一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数。<br />从而减轻网络延迟所带来的额外时间开销。 |
| doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 |
| doris.deserialize.arrow.async | false | 是否支持异步转换 Arrow 格式到 spark-doris-connector 迭代所需的 RowBatch |
| doris.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 |
| doris.write.fields | -- | 指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。<br />默认写入时要按照 Doris 表字段顺序写入全部字段。 |
| doris.sink.batch.size | 100000 | 单次写 BE 的最大行数 |
| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数 |
| doris.sink.properties.format | csv | Stream Load 的数据格式。<br/>共支持 3 种格式:csv,json,arrow(1.4.0 版本开始支持)<br/> [更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) |
| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始, 默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 |
| doris.sink.properties.format | csv | Stream Load 的数据格式。<br/>共支持 3 种格式:csv,json,arrow <br/> [更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) |
| doris.sink.properties.* | -- | Stream Load 的导入参数。<br/>例如:<br/>指定列分隔符:`'doris.sink.properties.column_separator' = ','`等<br/> [更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) |
| doris.sink.task.partition.size | -- | Doris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。<br/>此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 |
| doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris 写入 Partition 数。默认值为 false,采用 coalesce 方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。<br/>如果设置为 true,则采用 repartition 方式(注意:可设置最后 Partition 数,但会额外增加 shuffle 开销)。 |
Expand Down Expand Up @@ -303,9 +304,36 @@ kafkaSource.selectExpr("CAST(value as STRING)")
| doris.request.auth.password | -- | 访问 Doris 的密码 |
| doris.filter.query | -- | 过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。 |

:::tip

1. 在 Spark SQL 中,通过 insert into 方式写入数据时,如果 doris 的目标表中包含 `BITMAP``HLL` 类型的数据时,需要设置参数 `doris.ignore-type` 为对应类型,并通过 `doris.write.fields` 对列进行映射转换,使用方式如下:
## Doris 和 Spark 列类型映射关系

| Doris Type | Spark Type |
|------------|----------------------------------|
| NULL_TYPE | DataTypes.NullType |
| BOOLEAN | DataTypes.BooleanType |
| TINYINT | DataTypes.ByteType |
| SMALLINT | DataTypes.ShortType |
| INT | DataTypes.IntegerType |
| BIGINT | DataTypes.LongType |
| FLOAT | DataTypes.FloatType |
| DOUBLE | DataTypes.DoubleType |
| DATE | DataTypes.DateType |
| DATETIME | DataTypes.StringType<sup>1</sup> |
| DECIMAL | DecimalType |
| CHAR | DataTypes.StringType |
| LARGEINT | DecimalType |
| VARCHAR | DataTypes.StringType |
| TIME | DataTypes.DoubleType |
| HLL | Unsupported datatype |
| Bitmap | Unsupported datatype |

* 注:Connector 中,将`DATETIME`映射为`String`。由于`Doris`底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 `String` 类型直接返回对应的时间可读文本。


## 常见问题
1. **如何写入 Bitmap 类型?**

在 Spark SQL 中,通过 insert into 方式写入数据时,如果 doris 的目标表中包含 `BITMAP``HLL` 类型的数据时,需要设置参数 `doris.ignore-type` 为对应类型,并通过 `doris.write.fields` 对列进行映射转换,使用方式如下:
> BITMAP
> ```sql
> CREATE TEMPORARY VIEW spark_doris
Expand Down Expand Up @@ -333,13 +361,11 @@ kafkaSource.selectExpr("CAST(value as STRING)")
> );
> ```
2. **如何使用overwrite写入?**
2. 从 1.3.0 版本开始, `doris.sink.max-retries` 配置项的默认值为 0,即默认不进行重试。
当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。
3. 从 1.3.0 版本开始,支持 overwrite 模式写入(只支持全表级别的数据覆盖),具体使用方式如下
从 1.3.0 版本开始,支持 overwrite 模式写入(只支持全表级别的数据覆盖),具体使用方式如下
> DataFrame
> ```scala
> ```java
> resultDf.format("doris")
> .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
> // your own options
Expand All @@ -353,28 +379,4 @@ kafkaSource.selectExpr("CAST(value as STRING)")
> SELECT * FROM your_source_table
> ```
:::
## Doris 和 Spark 列类型映射关系
| Doris Type | Spark Type |
|------------|----------------------------------|
| NULL_TYPE | DataTypes.NullType |
| BOOLEAN | DataTypes.BooleanType |
| TINYINT | DataTypes.ByteType |
| SMALLINT | DataTypes.ShortType |
| INT | DataTypes.IntegerType |
| BIGINT | DataTypes.LongType |
| FLOAT | DataTypes.FloatType |
| DOUBLE | DataTypes.DoubleType |
| DATE | DataTypes.DateType |
| DATETIME | DataTypes.StringType<sup>1</sup> |
| DECIMAL | DecimalType |
| CHAR | DataTypes.StringType |
| LARGEINT | DecimalType |
| VARCHAR | DataTypes.StringType |
| TIME | DataTypes.DoubleType |
| HLL | Unsupported datatype |
| Bitmap | Unsupported datatype |
* 注:Connector 中,将`DATETIME`映射为`String`。由于`Doris`底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 `String` 类型直接返回对应的时间可读文本。
Loading

0 comments on commit 87f39fb

Please sign in to comment.