Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support transactionId and multi records in POST /records #464

Conversation

brandboat
Copy link
Member

resolve #432

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandboat 感謝實作此好用的APIs,幾個意見請看一下,謝謝

Producer.builder()
.transactionId(transactionId)
.bootstrapServers(bootstrapServers)
.build())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

麻煩請明確使用buildTransactional

| 名稱 | 說明 | 預設值 |
|---------------|-------------------------------------------|-------|
| records | (必填) 本次寫入資料,需填寫至少一比資料 | 無 |
| transactionId | (選填) 填入 transaction id 以執行 transaction 寫入 | 無 |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

請補充說這個值是字串

// this shouldn't happen, but still add error 404 here
results.add(Response.for404("missing result"));
}
latch.countDown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

請使用try-finally以免出現意外導致永遠等不到latch被完成

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops

static final String PARTITION = "partition";
static final String ASYNC = "async";
static final String DISTANCE_FROM_LATEST = "distanceFromLatest";
static final String DISTANCE_FROM_BEGINNING = "distanceFromBeginning";
static final String SEEK_TO = "seekTo";
static final String KEY_DESERIALIZER = "keyDeserializer";
static final String VALUE_DESERIALIZER = "valueDeserializer";
static final String RECORDS = "records";
static final String LIMIT = "limit";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個名稱更換是為了避免跟post batch裡出現的records重複嗎?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,另外一個原因是我覺得 limit 的命名會更清楚一些

.get(TRANSACTION_ID)
.map(
transactionId ->
Producer.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果使用者在不斷送出交易行為時,這邊重複建立的成本會很高,可否開個議題來討論如何實作快取transactional producer? 並且在這裡留個TODO

}

@Override
public boolean equals(Object obj) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個在production code有用到嗎?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

沒有,僅在 test code 裡頭用以確認資料一致

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

抱歉,這是我之前加測試時用到的程式碼,但後來把該測試移除了,便不再需要,我會在下一個 commit 移除,感謝

request
.get(RECORDS)
.map(toPostRecordList)
.filter(list -> list.size() > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!list.isEmpty()

(data) ->
new GsonBuilder()
.create()
.fromJson(data, new TypeToken<ArrayList<PostRecord>>() {}.getType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個邏輯可否整合到PostRequest裡面?讓其他handler也可以享用到

.collect(toList()));
} finally {
if (producer.transactional()) {
producer.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊會破壞async的邏輯,因為關閉producer的時候都會等到資料被處理完成

|---------------|-------------------------------------------|-------|
| records | (必填) 本次寫入資料,需填寫至少一比資料 | 無 |
| transactionId | (選填) 填入 transaction id 以執行 transaction 寫入 | 無 |
| async | (選填) 是否要等到資料寫入 topic 再回傳 | false |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

或許在寫入資料時應該要重複使用timeout這個概念,否則在sync+batch同時使用下,使用者很難知道是否要“繼續等下去",你覺得呢?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好主意,我會再加上 timeout parameter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個可以再開一隻議題處理

@brandboat brandboat force-pushed the feature/implement-transaction-post-records branch 2 times, most recently from 042f263 to 74722a6 Compare July 10, 2022 15:44
@brandboat brandboat force-pushed the feature/implement-transaction-post-records branch from 74722a6 to 210744f Compare July 11, 2022 15:57
Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandboat 感謝patch,功能大致上沒問題,不過有一些code style的地方需要討論,請看一下,謝謝

CompletableFuture.supplyAsync(
() -> {
try {
return producer.send(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個方法有些太長了,可否把這段放到獨立的method裡面去?

})
.thenApply(
senderFutures -> {
var latch = new CountDownLatch(senderFutures.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這一段又是async又是latch,這樣的風格不太好、閱讀起來也有點痛苦,可否考慮以下寫法:

  1. 新增一個方法來處理多的 CompletableFuture (仿照scala),例如
  static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList())
            );
  }
  1. 用上述方法來重寫這一段,例如:
            .thenCompose(
                senderFutures -> sequence(senderFutures.stream().map(s -> s.handle((metadata, exception) -> {
                    if (metadata != null) return new Metadata(metadata);
                    if (exception != null) return Response.for500(exception.getMessage());
                    // this shouldn't happen, but still add error 404 here
                    return Response.for404("missing result");
                })).map(CompletionStage::toCompletableFuture).collect(Collectors.toUnmodifiableList())));
  1. 最後的輸出就可以改成:
    if (async) return Response.ACCEPT;
    return Utils.packException(() -> new PostResponse(result.get(timeout.toNanos(), TimeUnit.NANOSECONDS)));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, sequence這個方法麻煩放到Utils裡面,並且補上測試

request.get(TIMESTAMP).ifPresent(t -> sender.timestamp(Long.parseLong(t)));
request.get(PARTITION).ifPresent(p -> sender.partition(Integer.parseInt(p)));
Optional<List<PostRecord>> recordsArg =
request.get(RECORDS, new TypeToken<ArrayList<PostRecord>>() {}.getType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我原先的意思比較是說能夠做到 只給class就好,每次都要寫出完整的轉換 (new TypeToken<ArrayList<PostRecord>>)有點囉唆,例如:

  default <T> List<T> values(String key, Class<T> clz) {
    return new Gson().fromJson(value(key), TypeToken.getParameterized(ArrayList.class, clz).getType());
  }

  default <T> T value(String key, Class<T> clz) {
    return new Gson().fromJson(value(key), TypeToken.get(clz).getType());
  }

Copy link
Member Author

@brandboat brandboat Jul 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感謝,這樣更為簡潔。不過如果不使用回傳 Optional<...> 的形式,user 填寫的 json value 為 null 或者 "",gson 在進行 fromJson 時會直接噴出 null pointer exception。不過我想這部分我們可以從文件中去強調某些 parameter 絕對不可為 空來提醒使用者。 (update: 透過文件來規範,或者如果有詳細的檢核需求應在各自 handler 處理)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不過如果不使用回傳 Optional<...> 的形式,user 填寫的 json value 為 null 或者 "",gson 在進行 fromJson 時會直接噴出 null pointer exception。不過我想這部分我們可以從文件中去強調某些 parameter 絕對不可為 空來提醒使用者。

這個概念不錯,可以開一個議題來追蹤,理想上要能捕捉到是缺少哪個參數,並且將其包裝再回傳的錯誤訊息裡面,就像是value(String)會拋出NoSuchElementException這個例外,而這個例外會被包裝成404這個錯誤,並且在錯誤訊息裡會描述"the value for " + key + " is nonexistent"

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@chia7712 chia7712 merged commit 342934f into opensource4you:main Jul 12, 2022
@brandboat brandboat deleted the feature/implement-transaction-post-records branch July 13, 2022 03:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Transaction寫入Web APIs
2 participants