Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etl experiment report #1360

Merged
merged 9 commits into from
Dec 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
10. [GUI](docs/gui/README.md): 簡單好用的叢集資訊圖形化工具
11. [Connector](./docs/connector/README.md): 提供基於 `kafka connector` 實作的高效平行化工具,包含效能測試和資料遷移等工具
12. [Build](docs/build_project.md): 說明如何建構與測試本專案各模組
13. [etl](./docs/etl/README.md): 構建 spark-kafka 的資料傳輸通道

# 技術發表

Expand Down
8 changes: 1 addition & 7 deletions config/spark2kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
#The data source path should be a directory.
source.path =

#The data sink path should be a directory.
sink.path =

#The CSV Column Name.For example:sA=string,sB=integer,sC=boolean...
column.name =
column.names =

#Primary keys.For example:sA=string,sB=integer,sC=boolean...
primary.keys =
Expand All @@ -18,9 +15,6 @@ kafka.bootstrap.servers =
#Set your topic name.
topic.name =

#Set deploy model, which will be used in SparkSession.builder().master(deployment.model).Two settings are currently supported spark://HOST:PORT and local[*].
deploy.model =

#Spark checkpoint path
checkpoint =

Expand Down
51 changes: 51 additions & 0 deletions docs/etl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
Astraea etl 中文文件
===

Astraea etl 的目標是構建 kafka-spark-delta 的資料傳輸通道。目前支持的功能爲讀取csv檔案透過 spark streaming 轉入 kafka topic。
wycccccc marked this conversation as resolved.
Show resolved Hide resolved

### 系統架構

spark -> kafka 將csv檔案透過 spark streaming 轉入 kafka topic,包括讀取資料,清洗資料與寫入資料。該模組分爲三個部分。
1. 讀取資料: spark streaming 會讀取source path 下所有csv檔案,並將其轉化為dataframe。
2. 清洗資料: dataframe 以行為單位,通過將json convertor 封裝進 udf 的方式,將資料變為 json 格式字串的集合。
3. 寫入資料: 將轉化為 json 格式的資料以一行對應一條 record 的方式,發送到kafka中指定的topic。record的key為table的primary key,value 為轉換為json 字串的csv row。

![framework diagram](../pictures/etl_README_1.png)

### Astraea etl 使用

Astraea etl 通過讀取[property file](../../config/spark2kafka.properties) 來獲取系統運行時可能需要的資訊。
以下是property中參數簡介

| 參數名稱 | 說明 | 預設值 |
|:-----------------:|:----------------------------------------------------------------------------|:-----:|
| source.path | (必填) 資料來源路徑 | 無 |
| column.names | (必填) csv table的欄位名稱及該欄位對應的屬性 For example:sA=string,sB=integer,sC=boolean... | 無 |
| primary.keys | (必填) csv table中的primary key. For example:sA=string,sB=integer,sC=boolean... | 無 |
| bootstrap.servers | (必填) 欲連接的Kafka server address | 無 |
| topic.name | (必填) 欲發往的topic name | 無 |
| checkpoint | (必填) spark checkpoint 存放路徑 | 無 |
| topic.partitions | (選填) 目標topic的partition數量 | 15 |
| topic.replicas | (選填) 目標topic的replica數量 | 1 |
| topic.configs | (選填) 配置kafka的其他參數 For example: compression.type\=lz4 | 無 |

#### 使用範例

專案內的工具都有整合到`container`中,使用者利用docker運行,可方便管理,使用前請注意兩件事情:

1. 確認自己的Kafka server ip與spark master ip,並且Kafka與spark 有正常運作,關於啟動Kafka 可參考 [run_kafka_broker](run_kafka_broker.md)。
2. 可使用 [Astraea GUI](../gui/README.md) 來建構測試用途的 `topics`。

使用`docker`執行`start_etl`,以下爲範例

```bash
# Run Spark-submit
./docker/start_etl.sh master=spark://192.168.103.189:8080 \
property.file=/home/kafka/spark2kafkaTest/spark2kafka.properties
```

### Astraea etl 實驗

experiments 資料夾中收錄不同版本的實驗紀錄,主要使用 [performance tool](../performance_benchmark.md) 測試並紀錄數據。

* [2022 Dec26](experiments/etl_1.md), 測試 spark->kafka 模組功能及替換[Strict Cost Dispatcher](./strict_cost_dispatcher.md) 的影響 (Astraea revision: [be8c3ffdf35ab0651dfc1a33b5552fd7e3381069](https://github.com/skiptests/astraea/tree/be8c3ffdf35ab0651dfc1a33b5552fd7e3381069))
138 changes: 138 additions & 0 deletions docs/etl/experiments/etl_1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# etl 測試
wycccccc marked this conversation as resolved.
Show resolved Hide resolved

此次實驗目的主要是測試 Astraea etl 的三個部分,包括測試
1. 10GB 資料需要多少時間跑完
2. 檢查 input output 資料的一致性,例如資料筆數,抽檢資料是否一致
3. 替換 spark 中的 kafka partitioner 再次測試效能,看有沒有變好

## 測試環境

### 硬體規格

實驗使用6台實體機器,以下皆以代號表示,分別是 B1, B2, B3, B4, M1, C1 ,六台實體機器規格均相同

| 硬體 | 品名 |
| ---------- | ------------------------------------------------------------ |
| CPU | Intel i9-12900K 3.2G(5.2G)/30M/UHD770/125W |
| 主機板 | 微星 Z690 CARBON WIFI(ATX/1H1P/Intel 2.5G+Wi-Fi 6E) |
| 記憶體 | 十銓 T-Force Vulcan 32G(16G*2) DDR5-5200 (CL40) |
| 硬碟 | 威剛XPG SX8200Pro 2TB/M.2 2280/讀:3500M/寫:3000M/TLC/SMI控 * 2 |
| 散熱器 | NZXT Kraken Z53 24cm水冷排/2.4吋液晶冷頭/6年/厚:5.6cm |
| 電源供應器 | 海韻 FOCUS GX-850(850W) 雙8/金牌/全模組 |
| 網卡 | Marvell AQtion 10Gbit Network Adapter |

### 網路拓樸

```
switch(10G)
┌─────┬─────┬─────┬─────┬─────┐
B1 B2 B3 S1 S2 C1
```

### 軟體版本

| 軟體 | 版本(/image ID) |
| ---------------------- |------------------------------------------|
| 作業系統 | ubuntu-20.04.3-live-server-amd64 |
| Astraea revision | 75bcc3faa39864d5ec5f5ed530346184e79fc0c9 |
| Zookeeper version | 3.8.0 |
| Apache Kafka version | 3.3.1 |
| Java version | OpenJDK 11 |
| Docker version | 20.10.17, build 100c701 |
| grafana image ID | b6ea013786be |
| prometheus version | v2.32.1 |
| node-exporter image ID | 1dbe0e931976 |

實驗執行軟體

| 執行軟體 | B1 | B2 | B3 | S1 | S2 | C1 |
| ------------------------ | :--: | :--: | :--: | :--: | :--: | :--: |
| Spark master | | | | V | | |
| Spark worker | | | | V | V | |
| Zookeeper | V | | | | | |
| Kafka Broker | V | V | V | | | |
| Node Exporter | V | V | V | | | |
| Prometheus | | | V | | | |
| Grafana | | | V | | | |
| Astraea Performance tool | | | | | | V |

## 測試情境

整個實驗分爲兩種情景,在普通情景下測試與在kafka叢集中一臺broker,網路延遲較高、且網路頻寬較低的不平衡情境下進行測試。

測試流程:兩種情景都需要首先啓動spark cluster。可以參考[start_spark](../../run_spark.md)啓動spark cluster.


### 普通情景

在普通情景下,只會用到上述的五臺機器{B1, B2, B3, S1, S2}。本情景將測試10GB 資料需要多少時間跑完與檢查 input output 資料的一致性。

#### 測試10GB資料需要多少時間跑完

```bash
# Run Spark-submit
./docker/start_etl.sh master=spark://192.168.103.189:8080 \
property.file=/home/kafka/spark2kafkaTest/spark2kafka.properties
```

從圖中可以看出10GB的資料處理完畢需要花費2分55秒
![processing time](../../pictures/etl_experiment_1_1.png)

#### 檢查 input output 資料的一致性

10GB資料共98090266筆資料。
![number of records](../../pictures/etl_experiment_1_2.png)

當資料經過etl處理完畢發往指定topic後。通過subscribe topic來消費所有該topic下的資料,以測定資料是否有缺失。
可以看到資料的筆數是相同的。
![number of consumer records](../../pictures/etl_experiment_1_3.png)

抽查對比consumer到的資料,各欄位資料內容也與原資料一致,所以資料一致性方面未發現問題。
![record comparison](../../pictures/etl_experiment_1_8.png)

### 不平衡情景
#### 不平衡情景下會誘發的問題
以下圖爲例

實驗環境:testTopic用來接受etl產生的資料,分佈於B1, B2, B3
costTopic用來對kafka叢集中的單一節點造成負載,分佈於B1

圖中左側爲不平衡情景,右側爲普通情景,方便直觀感受差別
costTopic: 接受使一個節點較忙碌的資料。它只分布在B1上。
testTopic: etl產生的資料會發往該topic。它分布在B1, B2, B3上。
圖中的testTopic有三個是因為它顯示了該topic在三個節點中各自的流量。
而costTopic之所以只有一個是因為只有B1一個節點接收到資料。

左側實驗開始時先向costTopic發送資料,使其到達節點的頻寬上線。在一段時間後啓動etl,可以看到因爲etl發送資料分走了原先costTopic所佔據的頻寬,造成其效能下降。等到etl運行完畢costTopic的效能恢復到開始狀態。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

想再確認一下情境,這段話主要目的是要說當發送的節點中有一個較忙碌時,預設的 partitioner 並不會跳過該節點,一樣有可能朝該節點推送資料,導致整體吞吐量/延遲受到影響。

因此我想確認一下costTopictestTopic各自的分佈是什麼?另外圖下方的三個不同顏色的testTopic代表什麼意思?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

costTopic 接受使一個節點較忙碌的資料。它只分布在B1上。
testTopic etl產生的資料會發往該topic。他分布在B1 B2 B3上。
testTopic有三個是因為它顯示了該topic在三個節點中各自的流量。
而costTopic之所以只有一個是因為只有B1一個節點接收到資料。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc 感謝回應,可否把這段文字也加上去?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好 已經添加上去了

左側數據處理完畢共花費3分40秒

右側實驗在普通情景下的效能即每秒處理的資料量要明顯高於左側,數據處理完畢的時間也更短總共花費3分鐘。這證明在kafka叢集不平衡的情景下,會影響到etl的效能。
![imbalance kafka cluster](../../pictures/etl_experiment_1_9.png)

#### 實驗過程
在該情景下會用到上述的全部六臺機器,同時B1, B2, B3的網路頻寬將被設置爲2.5G,確保etl效能的變化在叢集高負載的情況下會有較明顯的體現。
其中C1將被用來向B1發送資料,以確保B1處在高負載的狀態。

使用default partitioner進行測試
不替換時總共花費了4min處理完畢數據

![processing time of default partitioner](../../pictures/etl_experiment_1_4.png)

替換 spark 中的 kafka partitioner進行測試
替換partitioner後的花費5min處理完畢數據

![processing time of strict partitioner](../../pictures/etl_experiment_1_5.png)

所以結論替換partitioner後反而變成了負優化,進一步觀察分布在各Broker partition的offset。處在負載最重的B1的partitions反而吃到了最多的資料。這與設想的不符合,這一問題還需要再探究其發生原因。
wycccccc marked this conversation as resolved.
Show resolved Hide resolved

![partition offset of strict partitioner](../../pictures/etl_experiment_1_6.png)
wycccccc marked this conversation as resolved.
Show resolved Hide resolved

## 結論

在普通情景下,擁有兩個worker的spark cluster中,使用standalone mode 啓動 astraea etl ,處理資料的平均速率爲58.5MB/s。

在kafka叢集不平衡情境下,etl的效能會收到影響,這會導致處理相同的資料要花費更多的時間表。以下爲該情景下,替換partitioner前後的效能對比。

| 吞吐/延遲比較 | default partitioner | strict partitioner | 改善 |
| ------------- |---------------------|--------------------| ---------------------------- |
| 吞吐量 | 42.7 MB/second | 34.1 MiB/second | 平均吞吐提升:約 0.80 倍 |
Binary file added docs/pictures/etl_README_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_4.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_5.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_6.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_7.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_8.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_9.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 14 additions & 0 deletions docs/run_spark.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#TODO astraea#1365 撰寫start_spark.sh中文文件

```bash
# Run Spark Master
SPARK_PORT=8080 SPARK_UI_PORT=7077 docker/start_spark.sh \
folder=/home/kafka/spark2kafkaTest/ImportcsvTest/source:/home/kafka/spark2kafkaTest/ImportcsvTest/source
```

```bash
# Run Spark Worker
SPARK_PORT=8081 SPARK_UI_PORT=7078 docker/start_spark.sh \
master=spark://192.168.103.189:8080 \
folder=/home/kafka/spark2kafkaTest/ImportcsvTest/source:/home/kafka/spark2kafkaTest/ImportcsvTest/source
```