Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ETL: CSV to JSON #761

Merged
merged 7 commits into from
Oct 11, 2022
Merged

ETL: CSV to JSON #761

merged 7 commits into from
Oct 11, 2022

Conversation

wycccccc
Copy link
Collaborator

該pr的目的是,實現處理資料讓資料變成kafka能夠吃到key-value。
從CSV中讀取到的DataFrame的所有col會做tojson處理,作為value。
key則是將col name按照keyA_keyB_....的形式進行結合。

這裡還想向學長確認一下key應該是col name的結合還是col 值的結合。還有該種處理是否符合預期。是否還需要timestamp之類的額外訊息。

@wycccccc wycccccc requested a review from chia7712 September 25, 2022 19:38
@chia7712
Copy link
Contributor

從CSV中讀取到的DataFrame的所有col會做tojson處理,作為value。

請問一下,record key 和 record value 都是用 json 來處理嗎?以及 key 的部分能保證同一筆資料轉換後的 json 會是一模一樣嗎?(也就是會不會有順序調換,導致 kafka 認爲這兩筆值並不相同)

@wycccccc
Copy link
Collaborator Author

請問一下,record key 和 record value 都是用 json 來處理嗎?

目前record key並不是,record value 是,需要都改成json嘛。

以及 key 的部分能保證同一筆資料轉換後的 json 會是一模一樣嗎?(也就是會不會有順序調換,導致 kafka 認爲這兩筆值並不相同)

我認為能,因為整個操作都是在一張table上進行,不會有這個record key對到另一個record value的情況。

@chia7712
Copy link
Contributor

我認為能,因為整個操作都是在一張table上進行,不會有這個record key對到另一個record value的情況。

這部分我沒驗證過,因為對於 json 物件而言,元素的順序並不是重點,但是當元素順序不一樣後,產生的 byte arrya 會跟著不一樣,這會導致 kafka 認為這兩筆 reocrd 是不一樣,這樣當我們啟動 compacted topic 的時候會出現問題,也就是該 de-duplicate 的資料並沒有正確被刪除

@wycccccc
Copy link
Collaborator Author

那我去跑個測試例如寫1W筆相同的JSON資料到kafka然後消費它們,對比它們的binary是不是一樣,這樣能夠驗證上述可能造成的問題嗎。

@chia7712
Copy link
Contributor

那我去跑個測試例如寫1W筆相同的JSON資料到kafka然後消費它們,對比它們的binary是不是一樣,這樣能夠驗證上述可能造成的問題嗎。

我主要的擔憂是在實作 astraea json 物件轉換邏輯時,因為安插的順序不同而導致最後結果不一致認同 (我會希望能製作一個 astraea json 轉換器,然後可以用在 etl 和 app (web apis)裡面),例如以最常見的 Map結構來說,就程式碼來說先塞 k0 再塞 k1,應該是等同於先塞 "k1" 再塞 "k0",產生的 json string 來說兩者也是等價,但是轉換成 byte array 後,對於 kafka 來說就是不同的東西了

上面的擔憂可以用 LinkedHashMap 這種物件作為接口來處理或是其他方法也行,只要能避免這種狀況就好

@wycccccc
Copy link
Collaborator Author

wycccccc commented Sep 29, 2022

我去看了一下spark tojson的實現,他會按照schema讀到的順序來將需要的資料從InternalRow寫入Seq[ValueWriter],因此生成的json會保持跟schema順序一致。

 private def writeFields(
      row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = {
    var i = 0
    while (i < row.numFields) {
      val field = schema(i)
      if (!row.isNullAt(i)) {
        gen.writeFieldName(field.name)
        fieldWriters(i).apply(row, i)
      } else if (!options.ignoreNullFields) {
        gen.writeFieldName(field.name)
        gen.writeNull()
      }
      i += 1
    }
  }

所以我認為spark的tojson是在順序問題上是可以信賴的,我只要保證我所創建的schema順序是一致(塞 k0 再塞 k1,應該是等同於先塞 "k1" 再塞 "k0")的那麼我拿到的json也就是順序一致的。但是schema順序是由config檔決定的並且於csv欄位綁定,所以也不會有先塞 "k1" 再塞 "k0",不知道這個想法對不對。

@chia7712
Copy link
Contributor

@wycccccc 感謝說明,有機會寫個測試來驗證嗎?

@wycccccc
Copy link
Collaborator Author

wycccccc commented Sep 29, 2022

好 我再寫個測試驗證一下我有沒有在胡說

@wycccccc
Copy link
Collaborator Author

wycccccc commented Sep 29, 2022

已添加測試 這樣應該可以證明在toJson後同一schema的排序是一樣的,因為他們轉成byte也是一樣的。
#755 meger 我再將該method移動至CsvReader

@wycccccc
Copy link
Collaborator Author

wycccccc commented Oct 6, 2022

已解決conflict,麻煩學長繼續review,看看還需要改什麼。

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc 感謝更新,剩下幾個小建議,請看一下

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wycccccc 感謝修正,看起來很不錯了,剩下一個小建議請看一下

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wycccccc wycccccc merged commit b4d05ec into opensource4you:main Oct 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants