Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] replace HashMap with inner implementation ConcurrentLongLongPairHashMap in Negative Ack Tracker. #23582

Merged
merged 6 commits into from
Nov 13, 2024

Conversation

thetumbled
Copy link
Member

@thetumbled thetumbled commented Nov 11, 2024

Motivation

Negative ack feature need to retain the message id and timestamp info in the memory of the consumer client side, leading to great memory consumption.
This PR aim to replace the HashMap with the inner map implementation ConcurrentLongLongPairHashMap to reduce the memory consumption. Though HashMap is faster than the inner map implementation ConcurrentLongLongPairHashMap in some cases, but the most important issue in this case is memory consumption instead of the speed.

Some test data list as follows:

experiment 1


    public static void main(String[] args) throws IOException {
        ConcurrentLongLongPairHashMap map1 = ConcurrentLongLongPairHashMap.newBuilder()
                .autoShrink(true)
                .concurrencyLevel(16)
                .build();
        HashMap<MessageId, Long> map2 = new HashMap<>();
        long numMessages = 5000000;
        long ledgerId, entryId, partitionIndex, timestamp;
        for (long i = 0; i < numMessages; i++) {
            ledgerId = 10000+i;
            entryId = i;
            partitionIndex = 0;
            timestamp = System.currentTimeMillis();
            map1.put(ledgerId, entryId, partitionIndex, timestamp);
            map2.put(new MessageIdImpl(ledgerId, entryId, (int)partitionIndex), timestamp);
        }
        System.out.println("map1 size: " + map1.size());
        System.out.println("map2 size: " + map2.size());
        try {
            Thread.sleep(10000000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
  • 100w entry
    image

HashMap:178Mb
ConcurrentLongLongPairHashMap:64Mb

  • 500w entry
    image

HashMap:566Mb
ConcurrentLongLongPairHashMap:256Mb

  • 1000w entry
    image

HashMap:1132MB
Approximately each entry consume 1132MB/10000000=118byte.

ConcurrentLongLongPairHashMap:512MB
Approximately each entry consume 512MB/10000000=53byte.

With this improvement, we can reduce 50+% of the memory consumption!

experiment 2

Test three candidate data structures:

  • HashMap<LongPair, Long>
    org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPair
  • HashMap<LongLongPair, Long>
    it.unimi.dsi.fastutil.longs.LongLongPair
  • ConcurrentLongLongPairHashMap

Test code:

    public static void main(String[] args) throws IOException {
        ConcurrentLongLongPairHashMap map1 = ConcurrentLongLongPairHashMap.newBuilder()
                .autoShrink(true)
                .concurrencyLevel(16)
                .build();
        HashMap<LongPair, Long> map4 = new HashMap<>();
        HashMap<LongLongPair, Long> map5 = new HashMap<>();
        long numMessages = 5000000, numLedgers=100;
        long numEntries = numMessages/numLedgers;
        long ledgerId, entryId, partitionIndex, timestamp;
        for(long i=0; i<numLedgers; i++) {
            ledgerId = 10000+i;
            for(long j=0; j<numEntries; j++) {
                entryId = 10000+j;
                partitionIndex = 0;
                timestamp = System.currentTimeMillis();
                map1.put(ledgerId, entryId, partitionIndex, timestamp);
                map4.put(new LongPair(ledgerId, entryId), timestamp);
                map5.put(LongLongPair.of(ledgerId, entryId), timestamp);
            }
        }
        
        System.out.println("map1 size: " + map1.size());
        System.out.println("map4 size: " + map4.size());
        System.out.println("map5 size: " + map5.size());
        try {
            Thread.sleep(10000000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

The results list as follows:

  • 100w entry
    image
    image
    image

Conclusion:
HashMap<LongPair, Long> 91MB
HashMap<LongLongPair, Long> 114MB
ConcurrentLongLongPairHashMap 64MB

  • 500w entry
    image
    image
    image
    HashMap<LongPair, Long> 451MB
    HashMap<LongLongPair, Long> 566MB
    ConcurrentLongLongPairHashMap 256MB

It shows that the ConcurrentLongLongPairHashMap is still the best option to store enormous amount of entries.

Modifications

Replace HashMap with ConcurrentLongLongPairHashMap in Negative Ack Tracker.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is already covered by existing tests, such as (please describe tests).

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: thetumbled#63

Copy link

@thetumbled Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@lhotari
Copy link
Member

lhotari commented Nov 12, 2024

HashMap<LongLongPair, Long>

btw. In Fastutil, there's also Obj2LongMap interface which would be applicable in this case when the value is a long, for example using Object2LongOpenHashMap implementation. In Object2LongOpenHashMap, there's a trim method to reduce the size. I guess the benefit of ConcurrentLongLongPairHashMap is that it has the auto shrink feature.

@thetumbled
Copy link
Member Author

's also Obj2LongMap interface which would be applicable in this case when the value is a long, for example using Object2LongOpenHashMap implementation. In Object2LongOpenHashMap, there's a trim method to reduce

No, there is no shrink logic triggerd in the test code, as i only add new item into the map, without any deletion. Shrinking logic is triggered by item deletion.
The reason why ConcurrentLongLongPairHashMap is space efficient is that it use open hash addrressing with linear probing, which require less space to implement, while HashMap require more space to implement the data structure, and there is no any wrapper in ConcurrentLongLongPairHashMap.
As for Object2LongOpenHashMap, i guess it take up more space than ConcurrentLongLongPairHashMap too, as it use wrapper. There is no any wrapper in ConcurrentLongLongPairHashMap .

@thetumbled thetumbled merged commit 9d65a85 into apache:master Nov 13, 2024
49 of 52 checks passed
lhotari pushed a commit that referenced this pull request Nov 13, 2024
…ntLongLongPairHashMap in Negative Ack Tracker. (#23582)

(cherry picked from commit 9d65a85)
lhotari pushed a commit that referenced this pull request Nov 13, 2024
…ntLongLongPairHashMap in Negative Ack Tracker. (#23582)

(cherry picked from commit 9d65a85)
lhotari pushed a commit that referenced this pull request Nov 13, 2024
…ntLongLongPairHashMap in Negative Ack Tracker. (#23582)

(cherry picked from commit 9d65a85)
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 20, 2024
…ntLongLongPairHashMap in Negative Ack Tracker. (apache#23582)

(cherry picked from commit 9d65a85)
(cherry picked from commit 431c232)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 21, 2024
…ntLongLongPairHashMap in Negative Ack Tracker. (apache#23582)

(cherry picked from commit 9d65a85)
(cherry picked from commit 431c232)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants