Skip to content

Commit

Permalink
Polish windowUntilChanged predicate cleanup on cancel (#1901)
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle authored Sep 20, 2019
1 parent 87d636d commit f705674
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ public void cancel() {
if (CANCELLED.compareAndSet(this, 0, 1)) {
if (WINDOW_COUNT.decrementAndGet(this) == 0) {
s.cancel();
cleanup();
}
else if (!outputFused) {
if (WIP.getAndIncrement(this) == 0) {
Expand All @@ -364,6 +365,7 @@ else if (!outputFused) {
if (WIP.decrementAndGet(this) == 0) {
if (!done && WINDOW_COUNT.get(this) == 0) {
s.cancel();
cleanup();
}
else {
CANCELLED.set(this, 2);
Expand All @@ -381,6 +383,7 @@ else if (CANCELLED.get(this) == 2) {
//no new window should have been created
if (WINDOW_COUNT.get(this) == 0) {
s.cancel();
cleanup();
}
//next one to call cancel in state 2 that decrements to 0 will cancel outer
}
Expand All @@ -393,6 +396,7 @@ void groupTerminated() {
window = null;
if (WINDOW_COUNT.decrementAndGet(this) == 0) {
s.cancel();
cleanup();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void untilChangedDisposesStateOnCancel() {
Flux.range(1, 100)
.map(AtomicInteger::new) // wrap integer with object to test gc
.map(retainedDetector::tracked)
.concatWith(Mono.error(new Throwable("expected")))
.concatWith(Mono.error(new Throwable("unexpected")))
.bufferUntilChanged()
.take(50);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public void untilChangedDisposesStateOnCancel() {
Flux.range(1, 100)
.map(AtomicInteger::new) // wrap integer with object to test gc
.map(retainedDetector::tracked)
.concatWith(Mono.error(new Throwable("expected")))
.concatWith(Mono.error(new Throwable("unexpected")))
.windowUntilChanged()
.take(50);

Expand Down

0 comments on commit f705674

Please sign in to comment.