Skip to content

Commit

Permalink
[ETL] add ETL test report (#1360)
Browse files Browse the repository at this point in the history
  • Loading branch information
wycccccc authored Dec 31, 2022
1 parent 5ed4636 commit 9bb0515
Show file tree
Hide file tree
Showing 15 changed files with 205 additions and 7 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
10. [GUI](docs/gui/README.md): 簡單好用的叢集資訊圖形化工具
11. [Connector](./docs/connector/README.md): 提供基於 `kafka connector` 實作的高效平行化工具,包含效能測試和資料遷移等工具
12. [Build](docs/build_project.md): 說明如何建構與測試本專案各模組
13. [etl](./docs/etl/README.md): 構建 spark-kafka 的資料傳輸通道

# 技術發表

Expand Down
8 changes: 1 addition & 7 deletions config/spark2kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
#The data source path should be a directory.
source.path =

#The data sink path should be a directory.
sink.path =

#The CSV Column Name.For example:sA=string,sB=integer,sC=boolean...
column.name =
column.names =

#Primary keys.For example:sA=string,sB=integer,sC=boolean...
primary.keys =
Expand All @@ -18,9 +15,6 @@ kafka.bootstrap.servers =
#Set your topic name.
topic.name =

#Set deploy model, which will be used in SparkSession.builder().master(deployment.model).Two settings are currently supported spark://HOST:PORT and local[*].
deploy.model =

#Spark checkpoint path
checkpoint =

Expand Down
51 changes: 51 additions & 0 deletions docs/etl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
Astraea etl 中文文件
===

Astraea etl 的目標是構建 kafka-spark-delta 的資料傳輸通道。目前支持的功能爲讀取csv檔案透過 spark streaming 轉入 kafka topic。

### 系統架構

spark -> kafka 將csv檔案透過 spark streaming 轉入 kafka topic,包括讀取資料,清洗資料與寫入資料。該模組分爲三個部分。
1. 讀取資料: spark streaming 會讀取source path 下所有csv檔案,並將其轉化為dataframe。
2. 清洗資料: dataframe 以行為單位,通過將json convertor 封裝進 udf 的方式,將資料變為 json 格式字串的集合。
3. 寫入資料: 將轉化為 json 格式的資料以一行對應一條 record 的方式,發送到kafka中指定的topic。record的key為table的primary key,value 為轉換為json 字串的csv row。

![framework diagram](../pictures/etl_README_1.png)

### Astraea etl 使用

Astraea etl 通過讀取[property file](../../config/spark2kafka.properties) 來獲取系統運行時可能需要的資訊。
以下是property中參數簡介

| 參數名稱 | 說明 | 預設值 |
|:-----------------:|:----------------------------------------------------------------------------|:-----:|
| source.path | (必填) 資料來源路徑 ||
| column.names | (必填) csv table的欄位名稱及該欄位對應的屬性 For example:sA=string,sB=integer,sC=boolean... ||
| primary.keys | (必填) csv table中的primary key. For example:sA=string,sB=integer,sC=boolean... ||
| bootstrap.servers | (必填) 欲連接的Kafka server address ||
| topic.name | (必填) 欲發往的topic name ||
| checkpoint | (必填) spark checkpoint 存放路徑 ||
| topic.partitions | (選填) 目標topic的partition數量 | 15 |
| topic.replicas | (選填) 目標topic的replica數量 | 1 |
| topic.configs | (選填) 配置kafka的其他參數 For example: compression.type\=lz4 ||

#### 使用範例

專案內的工具都有整合到`container`中,使用者利用docker運行,可方便管理,使用前請注意兩件事情:

1. 確認自己的Kafka server ip與spark master ip,並且Kafka與spark 有正常運作,關於啟動Kafka 可參考 [run_kafka_broker](run_kafka_broker.md)
2. 可使用 [Astraea GUI](../gui/README.md) 來建構測試用途的 `topics`

使用`docker`執行`start_etl`,以下爲範例

```bash
# Run Spark-submit
./docker/start_etl.sh master=spark://192.168.103.189:8080 \
property.file=/home/kafka/spark2kafkaTest/spark2kafka.properties
```

### Astraea etl 實驗

experiments 資料夾中收錄不同版本的實驗紀錄,主要使用 [performance tool](../performance_benchmark.md) 測試並紀錄數據。

* [2022 Dec26](experiments/etl_1.md), 測試 spark->kafka 模組功能及替換[Strict Cost Dispatcher](./strict_cost_dispatcher.md) 的影響 (Astraea revision: [be8c3ffdf35ab0651dfc1a33b5552fd7e3381069](https://github.com/skiptests/astraea/tree/be8c3ffdf35ab0651dfc1a33b5552fd7e3381069))
138 changes: 138 additions & 0 deletions docs/etl/experiments/etl_1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# etl 測試

此次實驗目的主要是測試 Astraea etl 的三個部分,包括測試
1. 10GB 資料需要多少時間跑完
2. 檢查 input output 資料的一致性,例如資料筆數,抽檢資料是否一致
3. 替換 spark 中的 kafka partitioner 再次測試效能,看有沒有變好

## 測試環境

### 硬體規格

實驗使用6台實體機器,以下皆以代號表示,分別是 B1, B2, B3, B4, M1, C1 ,六台實體機器規格均相同

| 硬體 | 品名 |
| ---------- | ------------------------------------------------------------ |
| CPU | Intel i9-12900K 3.2G(5.2G)/30M/UHD770/125W |
| 主機板 | 微星 Z690 CARBON WIFI(ATX/1H1P/Intel 2.5G+Wi-Fi 6E) |
| 記憶體 | 十銓 T-Force Vulcan 32G(16G*2) DDR5-5200 (CL40) |
| 硬碟 | 威剛XPG SX8200Pro 2TB/M.2 2280/讀:3500M/寫:3000M/TLC/SMI控 * 2 |
| 散熱器 | NZXT Kraken Z53 24cm水冷排/2.4吋液晶冷頭/6年/厚:5.6cm |
| 電源供應器 | 海韻 FOCUS GX-850(850W) 雙8/金牌/全模組 |
| 網卡 | Marvell AQtion 10Gbit Network Adapter |

### 網路拓樸

```
switch(10G)
┌─────┬─────┬─────┬─────┬─────┐
B1 B2 B3 S1 S2 C1
```

### 軟體版本

| 軟體 | 版本(/image ID) |
| ---------------------- |------------------------------------------|
| 作業系統 | ubuntu-20.04.3-live-server-amd64 |
| Astraea revision | 75bcc3faa39864d5ec5f5ed530346184e79fc0c9 |
| Zookeeper version | 3.8.0 |
| Apache Kafka version | 3.3.1 |
| Java version | OpenJDK 11 |
| Docker version | 20.10.17, build 100c701 |
| grafana image ID | b6ea013786be |
| prometheus version | v2.32.1 |
| node-exporter image ID | 1dbe0e931976 |

實驗執行軟體

| 執行軟體 | B1 | B2 | B3 | S1 | S2 | C1 |
| ------------------------ | :--: | :--: | :--: | :--: | :--: | :--: |
| Spark master | | | | V | | |
| Spark worker | | | | V | V | |
| Zookeeper | V | | | | | |
| Kafka Broker | V | V | V | | | |
| Node Exporter | V | V | V | | | |
| Prometheus | | | V | | | |
| Grafana | | | V | | | |
| Astraea Performance tool | | | | | | V |

## 測試情境

整個實驗分爲兩種情景,在普通情景下測試與在kafka叢集中一臺broker,網路延遲較高、且網路頻寬較低的不平衡情境下進行測試。

測試流程:兩種情景都需要首先啓動spark cluster。可以參考[start_spark](../../run_spark.md)啓動spark cluster.


### 普通情景

在普通情景下,只會用到上述的五臺機器{B1, B2, B3, S1, S2}。本情景將測試10GB 資料需要多少時間跑完與檢查 input output 資料的一致性。

#### 測試10GB資料需要多少時間跑完

```bash
# Run Spark-submit
./docker/start_etl.sh master=spark://192.168.103.189:8080 \
property.file=/home/kafka/spark2kafkaTest/spark2kafka.properties
```

從圖中可以看出10GB的資料處理完畢需要花費2分55秒
![processing time](../../pictures/etl_experiment_1_1.png)

#### 檢查 input output 資料的一致性

10GB資料共98090266筆資料。
![number of records](../../pictures/etl_experiment_1_2.png)

當資料經過etl處理完畢發往指定topic後。通過subscribe topic來消費所有該topic下的資料,以測定資料是否有缺失。
可以看到資料的筆數是相同的。
![number of consumer records](../../pictures/etl_experiment_1_3.png)

抽查對比consumer到的資料,各欄位資料內容也與原資料一致,所以資料一致性方面未發現問題。
![record comparison](../../pictures/etl_experiment_1_8.png)

### 不平衡情景
#### 不平衡情景下會誘發的問題
以下圖爲例

實驗環境:testTopic用來接受etl產生的資料,分佈於B1, B2, B3
costTopic用來對kafka叢集中的單一節點造成負載,分佈於B1

圖中左側爲不平衡情景,右側爲普通情景,方便直觀感受差別
costTopic: 接受使一個節點較忙碌的資料。它只分布在B1上。
testTopic: etl產生的資料會發往該topic。它分布在B1, B2, B3上。
圖中的testTopic有三個是因為它顯示了該topic在三個節點中各自的流量。
而costTopic之所以只有一個是因為只有B1一個節點接收到資料。

左側實驗開始時先向costTopic發送資料,使其到達節點的頻寬上線。在一段時間後啓動etl,可以看到因爲etl發送資料分走了原先costTopic所佔據的頻寬,造成其效能下降。等到etl運行完畢costTopic的效能恢復到開始狀態。
左側數據處理完畢共花費3分40秒

右側實驗在普通情景下的效能即每秒處理的資料量要明顯高於左側,數據處理完畢的時間也更短總共花費3分鐘。這證明在kafka叢集不平衡的情景下,會影響到etl的效能。
![imbalance kafka cluster](../../pictures/etl_experiment_1_9.png)

#### 實驗過程
在該情景下會用到上述的全部六臺機器,同時B1, B2, B3的網路頻寬將被設置爲2.5G,確保etl效能的變化在叢集高負載的情況下會有較明顯的體現。
其中C1將被用來向B1發送資料,以確保B1處在高負載的狀態。

使用default partitioner進行測試
不替換時總共花費了4min處理完畢數據

![processing time of default partitioner](../../pictures/etl_experiment_1_4.png)

替換 spark 中的 kafka partitioner進行測試
替換partitioner後的花費5min處理完畢數據

![processing time of strict partitioner](../../pictures/etl_experiment_1_5.png)

所以結論替換partitioner後反而變成了負優化,進一步觀察分布在各Broker partition的offset。處在負載最重的B1的partitions反而吃到了最多的資料。這與設想的不符合,這一問題還需要再探究其發生原因。

![partition offset of strict partitioner](../../pictures/etl_experiment_1_6.png)

## 結論

在普通情景下,擁有兩個worker的spark cluster中,使用standalone mode 啓動 astraea etl ,處理資料的平均速率爲58.5MB/s。

在kafka叢集不平衡情境下,etl的效能會收到影響,這會導致處理相同的資料要花費更多的時間表。以下爲該情景下,替換partitioner前後的效能對比。

| 吞吐/延遲比較 | default partitioner | strict partitioner | 改善 |
| ------------- |---------------------|--------------------| ---------------------------- |
| 吞吐量 | 42.7 MB/second | 34.1 MiB/second | 平均吞吐提升:約 0.80 倍 |
Binary file added docs/pictures/etl_README_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_4.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_5.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_6.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_7.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_8.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_9.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 14 additions & 0 deletions docs/run_spark.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#TODO astraea#1365 撰寫start_spark.sh中文文件

```bash
# Run Spark Master
SPARK_PORT=8080 SPARK_UI_PORT=7077 docker/start_spark.sh \
folder=/home/kafka/spark2kafkaTest/ImportcsvTest/source:/home/kafka/spark2kafkaTest/ImportcsvTest/source
```

```bash
# Run Spark Worker
SPARK_PORT=8081 SPARK_UI_PORT=7078 docker/start_spark.sh \
master=spark://192.168.103.189:8080 \
folder=/home/kafka/spark2kafkaTest/ImportcsvTest/source:/home/kafka/spark2kafkaTest/ImportcsvTest/source
```

0 comments on commit 9bb0515

Please sign in to comment.