Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache for transactional producer #498

Merged

Conversation

brandboat
Copy link
Member

fix #473

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandboat thanks for implementing this useful cache. a couple of comments below. PTAL


public class Cache<K, V> {

public interface CacheLoader<K, V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add @FunctionalInterface

V load(K key);
}

public interface RemovalListener<K, V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add @FunctionalInterface

private RemovalListener<? super A, ? super B> removalListener;
private int maxCapacity = -1;

public static CacheBuilder<Object, Object> builder() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me this method is duplicate to line#36

}

public interface RemovalListener<K, V> {
void onRemove(Map.Entry<K, V> entry);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using Entry? (K k, V v) is more straightforward , right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, many thanks

public <A1 extends A, B1 extends B> CacheBuilder<A1, B1> cacheLoader(CacheLoader<? super A1,
B1> cacheLoader) {
@SuppressWarnings("unchecked")
CacheBuilder<A1, B1> me = (CacheBuilder<A1, B1>) this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This casting is dangerous since the type may be incompatible to type injected by removalListener. For example, the following test can't pass.

  @Test
  void test() {
    var cache = Cache.builder()
            .cacheLoader((Base k) -> new Base())
            .removalListener((Map.Entry<Sub, Sub> e) -> {})
            .build();
    var a = cache.get(new Sub());
  }

  class Base {
  }

  class Sub extends Base {
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the catch, removalListener should always happen before cacheLoader, to force user follow this order, I'll move cacheLoader inside build().

@brandboat brandboat force-pushed the feature/impl-cache-4-producer branch from 5d3c3d6 to f1a7a36 Compare July 22, 2022 16:35
@brandboat brandboat changed the title [WIP] Add cache for transactional producer Add cache for transactional producer Jul 22, 2022
Comment on lines 78 to 79
.maxCapacity(100)
.expireAfterAccess(Duration.ofMinutes(10))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to expose these two configs to user ?

}

public <A1 extends A, B1 extends B> CacheBuilder<A1, B1> removalListener(
RemovalListener<? super A1, ? super B1> removalListener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我有點不確定這邊的轉型是否真的有用途,或者只是徒增複雜度而已。當使用者在設定removeListener的時候真的會需要把型態再做一次調整嗎?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感謝建議,主要是 Cache#builder() 會回傳 CacheBuilder<Object, Object>,那麼 user 就會需要在 removalListener 去將型態作轉換

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

主要是 Cache#builder() 會回傳 CacheBuilder<Object, Object>,那麼 user 就會需要在 removalListener 去將型態作轉換

這裡可以做一點取捨,例如保留cacheLoader的設置方法並用其來“綁定”正確的型別,然後removalListener這裡就可以沿用已經綁定的型別。這樣的好處是可以簡化介面上的泛型定義,但壞處是使用者必須先設定cacheLoader後才可以設定removalListener

簡單來說,同時使用萬用字元和上下界和介面宣告上是很難閱讀的事情,我會希望能盡量避免 @@

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同時使用萬用字元和上下界和介面宣告上是很難閱讀的事情

收到,確實這樣混合不好理解,我會照著建議修改,感謝

@brandboat brandboat force-pushed the feature/impl-cache-4-producer branch 2 times, most recently from ca03309 to 8605b6d Compare July 23, 2022 17:18
@brandboat
Copy link
Member Author

testTransactionIds passed in my macbook. This seems to be a flaky test, already filed #500

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandboat 感謝更新,一些小建議請看一下

return value;
}

boolean isExpired(long now) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expired這樣就好了

private void cleanup() {
map.entrySet().stream()
.filter(e -> e.getValue().isExpired(System.nanoTime()))
.collect(Collectors.toList())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊要事先收集成一個陣列是為了避免邊走訪邊刪除造成衝突吧?如果是的話麻煩加上註解

.transactionId(transactionId)
.bootstrapServers(bootstrapServers)
.buildTransactional())
.maxCapacity(100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

麻煩用一個常數變數來取代100

.bootstrapServers(bootstrapServers)
.buildTransactional())
.maxCapacity(100)
.expireAfterAccess(Duration.ofMinutes(10))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

private class Wrapper<T> {
private final T value;

private long timestamp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastAccess

return map.size();
}

private void cleanup() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可否加上測試確定只要在資料過時之前,無論怎麼呼叫cleanup都不會有資料真的被清除

@brandboat brandboat force-pushed the feature/impl-cache-4-producer branch 3 times, most recently from 1060c71 to ff2f585 Compare July 24, 2022 16:36
@brandboat brandboat force-pushed the feature/impl-cache-4-producer branch from ff2f585 to 814f658 Compare July 24, 2022 16:44
Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for this great patch. @brandboat

@chia7712 chia7712 merged commit 194c147 into opensource4you:main Jul 24, 2022
@brandboat brandboat deleted the feature/impl-cache-4-producer branch July 25, 2022 01:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Find a way to cache transactional producer in RecordsHandler
2 participants