-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ETL : create topic #797
ETL : create topic #797
Conversation
在解決衝突的過程中我發現admin被替換成了asyncAdmin,並且topic創建的流程也被整合到了asyncAdmin中。如果我要調用asyncAdmin,還需要額外用scala判斷topic的規格嘛。是不是可以直接變成調用asyncAdmin的api就好,還是要用scala寫這一段的邏輯。 |
未來應該會逐漸改成用 |
修改完畢,不過似乎CompletableFuture在scala 2.12中無法輕鬆的轉換成Future(缺少FutureConverters),如果直接寫成 |
scala 2.12 的話可以用scala-java8-compat,不過我會建議用一個utils把這端包裝起來,這樣之後我們如果升級到2.13才不用改太多東西 又或者簡單一點,我們自己寫一個小程式來轉換 def asScala[T](f: CompletionStage[T]): Future[T] = {
val promise = Promise[T]()
f.whenComplete {(r, e) => if (e != null) promise.failure(e) else promise.success(r)}
promise.future
} |
按照該方法使用promise修改完畢,再麻煩學長review。 |
etl/src/main/scala/org/astraea/etl/KafkaAdmin/TopicCreator.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
該支pr是為了實做創建topic的邏輯,會使用我們專案的admin物件,雖然我感覺create topic的錯誤檢測和common里的差不多,
但我還是感覺需要寫一個scala版本,為以後的修改留有彈性。
會按照下述情況建立topic:
topic 的建立有三種狀況,
因為需要Kafka進行測試,先在此完成了scala2.12的降級,我會在之前的pr meger後發一支單獨的pr完成降級,防止太多conflict。