Skip to content

Commit

Permalink
bug fix: concurrent increase by CounterWindow may cause PriorityQueue…
Browse files Browse the repository at this point in the history
… broken (#12505)
  • Loading branch information
kael-aiur authored Aug 4, 2024
1 parent 1111f89 commit 5c33ee0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
1 change: 1 addition & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
* BanyanDB: stream sort-by `time` query, use internal time-series rather than `index` to improve the query performance.
* Bump up graphql-java to 21.5.
* Add Unknown Node when receive Kubernetes peer address is not aware in current cluster.
* Fix CounterWindow concurrent increase cause NPE by PriorityQueue

#### UI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,27 @@ public class CounterWindow {
public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) {
ID id = new ID(name, labels);
Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, unused -> new PriorityQueue<>());
window.offer(Tuple.of(now, value));
long waterLevel = now - windowSize;
Tuple2<Long, Double> peek = window.peek();
if (peek._1 > waterLevel) {
return peek;
}
synchronized (window) {
window.offer(Tuple.of(now, value));
long waterLevel = now - windowSize;
Tuple2<Long, Double> peek = window.peek();
if (peek._1 > waterLevel) {
return peek;
}

Tuple2<Long, Double> result = peek;
while (peek._1 < waterLevel) {
result = window.poll();
peek = window.element();
}
Tuple2<Long, Double> result = peek;
while (peek._1 < waterLevel) {
result = window.poll();
peek = window.element();
}

// Choose the closed slot to the expected timestamp
if (waterLevel - result._1 <= peek._1 - waterLevel) {
return result;
}
// Choose the closed slot to the expected timestamp
if (waterLevel - result._1 <= peek._1 - waterLevel) {
return result;
}

return peek;
return peek;
}
}

public Tuple2<Long, Double> pop(String name, ImmutableMap<String, String> labels, Double value, long now) {
Expand Down

0 comments on commit 5c33ee0

Please sign in to comment.