Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etl experiment report #1360

Merged
merged 9 commits into from
Dec 31, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions docs/etl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
Astraea etl 中文文件
===

Astraea etl 的目標是構建 kafka-spark-delta 的資料傳輸通道。目前支持的功能爲讀取csv檔案透過 spark streaming 轉入 kafka topic。
wycccccc marked this conversation as resolved.
Show resolved Hide resolved

### 系統架構

spark -> kafka 將csv檔案透過 spark streaming 轉入 kafka topic,包括讀取資料,清洗資料與寫入資料。該模組分爲三個部分。
1. 讀取資料: spark streaming 會讀取source path 下所有csv檔案,並將其轉化為dataframe。
2. 清洗資料: dataframe 以行為單位,通過將json convertor 封裝進 udf 的方式,將資料變為 json 格式字串的集合。
3. 寫入資料: 將轉化為 json 格式的資料以一行對應一條 record 的方式,發送到kafka中指定的topic。record的key為table的primary key,value 為轉換為json 字串的csv row。

![framework diagram](../pictures/etl_README_1.png)

### Astraea etl 使用

Astraea etl 通過讀取[property file](../../config/spark2kafka.properties) 來獲取系統運行時可能需要的資訊。
以下是property中參數簡介

| 參數名稱 | 說明 | 預設值 |
|:-------------------:|:--------------------------------------------------------------------------------------------------------------------------------------------------------|:-----:|
| source.path | (必填) 資料來源路徑 | 無 |
| sink.path | (必填) 處理完畢後的資料寫入路徑 | 無 |
wycccccc marked this conversation as resolved.
Show resolved Hide resolved
| column.name | (必填) csv table的欄位名稱及該欄位對應的屬性 For example:sA=string,sB=integer,sC=boolean... | 無 |
wycccccc marked this conversation as resolved.
Show resolved Hide resolved
| producers | (必填) csv table中的primary key. For example:sA=string,sB=integer,sC=boolean... | 無 |
wycccccc marked this conversation as resolved.
Show resolved Hide resolved
| bootstrap.servers | (必填) 欲連接的Kafka server address | 無 |
| topic.name | (必填) 欲發往的topic name | 無 |
| deploy.model | (必填) 設置spark deploy model 支持local mode 與standalone mode。For example: spark://HOST:PORT or local[*]. | 無 |
wycccccc marked this conversation as resolved.
Show resolved Hide resolved
| topic.partitions | (選填) 目標topic的partition數量 | 15 |
| topic.replicas | (選填) 目標topic的replica數量 | 1 |
| topic.config | (選填) 配置kafka的其他參數 For example: compression.type\=lz4 | 無 |
wycccccc marked this conversation as resolved.
Show resolved Hide resolved

#### 使用範例

專案內的工具都有整合到`container`中,使用者利用docker運行,可方便管理,使用前請注意兩件事情:

1. 確認自己的Kafka server ip與spark master ip,並且Kafka與spark 有正常運作,關於啟動Kafka 可參考 [run_kafka_broker](run_kafka_broker.md)。
2. 可使用 [Astraea GUI](../gui/README.md) 來建構測試用途的 `topics`。

使用`docker`執行`start_etl`,以下爲範例

```bash
# Run Spark-submit
./docker/start_etl.sh master=spark://192.168.103.189:8080 \
property.file=/home/kafka/spark2kafkaTest/spark2kafka.properties
```

### Astraea etl 實驗

experiments 資料夾中收錄不同版本的實驗紀錄,主要使用 [performance tool](../performance_benchmark.md) 測試並紀錄數據。

* [2022 Dec26](experiments/etl_1.md), 測試 spark->kafka 模組功能及替換[Strict Cost Dispatcher](./strict_cost_dispatcher.md) 的影響 (Astraea revision: [be8c3ffdf35ab0651dfc1a33b5552fd7e3381069](https://github.com/skiptests/astraea/tree/be8c3ffdf35ab0651dfc1a33b5552fd7e3381069))
120 changes: 120 additions & 0 deletions docs/etl/experiments/etl_1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# etl 測試
wycccccc marked this conversation as resolved.
Show resolved Hide resolved

此次實驗目的主要是測試 Astraea etl 的三個部分,包括測試
1. 10GB 資料需要多少時間跑完
2. 檢查 input output 資料的一致性,例如資料筆數,抽檢資料是否一致
3. 替換 spark 中的 kafka partitioner 再次測試效能,看有沒有變好

## 測試環境

### 硬體規格

實驗使用6台實體機器,以下皆以代號表示,分別是 B1, B2, B3, B4, M1, C1 ,六台實體機器規格均相同

| 硬體 | 品名 |
| ---------- | ------------------------------------------------------------ |
| CPU | Intel i9-12900K 3.2G(5.2G)/30M/UHD770/125W |
| 主機板 | 微星 Z690 CARBON WIFI(ATX/1H1P/Intel 2.5G+Wi-Fi 6E) |
| 記憶體 | 十銓 T-Force Vulcan 32G(16G*2) DDR5-5200 (CL40) |
| 硬碟 | 威剛XPG SX8200Pro 2TB/M.2 2280/讀:3500M/寫:3000M/TLC/SMI控 * 2 |
| 散熱器 | NZXT Kraken Z53 24cm水冷排/2.4吋液晶冷頭/6年/厚:5.6cm |
| 電源供應器 | 海韻 FOCUS GX-850(850W) 雙8/金牌/全模組 |
| 網卡 | Marvell AQtion 10Gbit Network Adapter |

### 網路拓樸

```
switch(10G)
┌─────┬─────┬─────┬─────┬─────┐
B1 B2 B3 S1 S2 C1
```

### 軟體版本

| 軟體 | 版本(/image ID) |
| ---------------------- |------------------------------------------|
| 作業系統 | ubuntu-20.04.3-live-server-amd64 |
| Astraea revision | 75bcc3faa39864d5ec5f5ed530346184e79fc0c9 |
| Zookeeper version | 3.8.0 |
| Apache Kafka version | 3.3.1 |
| Java version | OpenJDK 11 |
| Docker version | 20.10.17, build 100c701 |
| grafana image ID | b6ea013786be |
| prometheus version | v2.32.1 |
| node-exporter image ID | 1dbe0e931976 |

實驗執行軟體

| 執行軟體 | B1 | B2 | B3 | S1 | S2 | C1 |
| ------------------------ | :--: | :--: | :--: | :--: | :--: | :--: |
| Spark master | | | | V | | |
| Spark worker | | | | V | V | |
| Zookeeper | V | | | | | |
| Kafka Broker | V | V | V | | | |
| Node Exporter | V | V | V | | | |
| Prometheus | | | V | | | |
| Grafana | | | V | | | |
| Astraea Performance tool | | | | | | V |

## 測試情境

整個實驗分爲兩種情景,在普通情景下測試與在kafka叢集中一臺broker,網路延遲較高、且網路頻寬較低的不平衡情境下進行測試。

測試流程:兩種情景都需要首先啓動spark cluster。可以參考[start_spark](../../run_spark.md)啓動spark cluster.


### 普通情景

在普通情景下,只會用到上述的五臺機器{B1, B2, B3, S1, S2}。本情景將測試10GB 資料需要多少時間跑完與檢查 input output 資料的一致性。

#### 測試10GB資料需要多少時間跑完

```bash
# Run Spark-submit
./docker/start_etl.sh master=spark://192.168.103.189:8080 \
property.file=/home/kafka/spark2kafkaTest/spark2kafka.properties
```

從圖中可以看出10GB的資料處理完畢需要花費2分55秒
![processing time](../../pictures/etl_experiment_1_1.png)

#### 檢查 input output 資料的一致性

10GB資料共98090266筆資料。
![number of records](../../pictures/etl_experiment_1_2.png)

當資料經過etl處理完畢發往指定topic後。通過subscribe topic來消費所有該topic下的資料,以測定資料是否有缺失。
可以看到資料的筆數是相同的。
![number of consumer records](../../pictures/etl_experiment_1_3.png)

抽查對比consumer到的資料,各欄位資料內容也與原資料一致,所以資料一致性方面未發現問題。
![record comparison](../../pictures/etl_experiment_1_8.png)

### 不平衡情景

在該情景下會用到上述的全部六臺機器,同時B1, B2, B3的網路頻寬將被設置爲2.5G以確保etl效能的變化在叢集高負載的情況下會有較明顯的體現。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我們可能需要呈現一下“問題”,也就是當有一個節點不穩或是忙碌時,其吞吐量的表現。例如我們可以將各節點拿到的資料量和頻寬呈現出來,說明當某個節點已經很忙很不穩了,default partitioner 依然嘗試放這麼多資料過去

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好 已添加對比實驗來說明這一問題

其中C1將被用來向B1發送資料,以確保B1處在高負載的狀態。

使用default partitioner進行測試
不替換時總共花費了4min處理完畢數據

![processing time of default partitioner](../../pictures/etl_experiment_1_4.png)

替換 spark 中的 kafka partitioner進行測試
替換partitioner後的花費5min處理完畢數據

![processing time of strict partitioner](../../pictures/etl_experiment_1_5.png)

所以結論替換partitioner後反而變成了負優化,進一步觀察分布在各Broker partition的offset。處在負載最重的B1的partitions反而吃到了最多的資料。這與設想的不符合,這一問題還需要再探究其發生原因。
wycccccc marked this conversation as resolved.
Show resolved Hide resolved

![partition offset of strict partitioner](../../pictures/etl_experiment_1_6.png)
wycccccc marked this conversation as resolved.
Show resolved Hide resolved

## 結論

在普通情景下,擁有兩個worker的spark cluster中,使用standalone mode 啓動 astraea etl ,處理資料的平均速率爲58.5MB/s。

在不平衡情境下,替換partitioner後的效能對比。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

上面有提到不平衡的叢集造成的效能下降,麻煩在結論也要提到

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感謝建議,已經修改完畢


| 吞吐/延遲比較 | default partitioner | strict partitioner | 改善 |
| ------------- |---------------------|--------------------| ---------------------------- |
| 吞吐量 | 42.7 MB/second | 34.1 MiB/second | 平均吞吐提升:約 0.80 倍 |
Binary file added docs/pictures/etl_README_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_4.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_5.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_6.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_7.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pictures/etl_experiment_1_8.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 14 additions & 0 deletions docs/run_spark.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#TODO astraea#1365 撰寫start_spark.sh中文文件

```bash
# Run Spark Master
SPARK_PORT=8080 SPARK_UI_PORT=7077 docker/start_spark.sh \
folder=/home/kafka/spark2kafkaTest/ImportcsvTest/source:/home/kafka/spark2kafkaTest/ImportcsvTest/source
```

```bash
# Run Spark Worker
SPARK_PORT=8081 SPARK_UI_PORT=7078 docker/start_spark.sh \
master=spark://192.168.103.189:8080 \
folder=/home/kafka/spark2kafkaTest/ImportcsvTest/source:/home/kafka/spark2kafkaTest/ImportcsvTest/source
```