forked from schananas/practical-reactor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathc4_LifecycleHooks.java
243 lines (199 loc) · 8.52 KB
/
c4_LifecycleHooks.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import org.junit.jupiter.api.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Lifecycle hooks are used to add additional behavior (side-effects) and to peek into sequence without modifying it. In
* this chapter we will explore most common lifecycle hooks.
*
* Read first:
*
* https://projectreactor.io/docs/core/release/reference/#which.peeking
*
* Useful documentation:
*
* https://projectreactor.io/docs/core/release/reference/#which-operator
* https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html
* https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
*
* @author Stefan Dragisic
*/
public class c4_LifecycleHooks extends LifecycleHooksBase {
/**
* Add a hook that will execute when Flux `temperatureFlux` is subscribed too.
* As a side effect hook should add string "subscribe" to `hooksTriggered` list.
*/
@Test
public void no_subscription_no_gains() {
CopyOnWriteArrayList<String> hooksTriggered = new CopyOnWriteArrayList<>();
Flux<Integer> temperatureFlux = room_temperature_service()
.doOnSubscribe(subscription -> {
hooksTriggered.add("subscribe");
})
//todo: change this line only
;
StepVerifier.create(temperatureFlux.take(5))
.expectNextCount(5)
.verifyComplete();
Assertions.assertEquals(hooksTriggered, List.of("subscribe"));
}
/**
* Add a hook that will execute before Flux `temperatureFlux` is subscribed too. As a side effect hook should add
* string "before subscribe" to `hooksTriggered` list.
*/
@Test
public void be_there_early() {
CopyOnWriteArrayList<String> hooksTriggered = new CopyOnWriteArrayList<>();
Flux<Integer> temperatureFlux = room_temperature_service()
.doFirst(() -> hooksTriggered.add("before subscribe"))
//todo: change this line only
;
StepVerifier.create(temperatureFlux.take(5).doOnSubscribe(s -> hooksTriggered.add("subscribe")))
.expectNextCount(5)
.verifyComplete();
Assertions.assertEquals(hooksTriggered, Arrays.asList("before subscribe", "subscribe"));
}
/**
* Add a hook that will execute for each element emitted by `temperatureFlux`. As a side effect print out the value
* using `System.out` and increment `counter` value.
*/
@Test
public void atomic_counter() {
AtomicInteger counter = new AtomicInteger(0);
Flux<Integer> temperatureFlux = room_temperature_service()
.doOnNext(i -> System.out.println(counter.incrementAndGet()))
//todo: change this line only
;
StepVerifier.create(temperatureFlux)
.expectNextCount(20)
.verifyComplete();
Assertions.assertEquals(counter.get(), 20);
}
/**
* Add a hook that will execute when `temperatureFlux` has completed without errors. As a side effect set
* `completed` flag to true.
*/
@Test
public void successfully_executed() {
AtomicBoolean completed = new AtomicBoolean(false);
Flux<Integer> temperatureFlux = room_temperature_service()
.doFinally((f) -> {
completed.set(true);
});
//todo: change this line only
;
StepVerifier.create(temperatureFlux.skip(20))
.expectNextCount(0)
.verifyComplete();
Assertions.assertTrue(completed.get());
}
/**
* Add a hook that will execute when `temperatureFlux` is canceled by the subscriber. As a side effect set
* `canceled` flag to true.
*/
@Test
public void need_to_cancel() {
AtomicBoolean canceled = new AtomicBoolean(false);
Flux<Integer> temperatureFlux = room_temperature_service()
.doOnCancel(() -> canceled.set(true));
//todo: change this line only
;
StepVerifier.create(temperatureFlux.take(0))
.expectNextCount(0)
.verifyComplete();
Assertions.assertTrue(canceled.get());
}
/**
* Add a side-effect that increments `hooksTriggeredCounter` counter when the `temperatureFlux` terminates, either
* by completing successfully or failing with an error.
* Use only one operator.
*/
@Test
public void terminator() {
AtomicInteger hooksTriggeredCounter = new AtomicInteger(0);
Flux<Integer> temperatureFlux = room_temperature_service()
.doFinally((f) -> hooksTriggeredCounter.incrementAndGet());
//todo: change this line only
;
StepVerifier.create(temperatureFlux.take(0))
.expectNextCount(0)
.verifyComplete();
StepVerifier.create(temperatureFlux.skip(20))
.expectNextCount(0)
.verifyComplete();
StepVerifier.create(temperatureFlux.skip(20).concatWith(Flux.error(new RuntimeException("oops"))))
.expectError()
.verify();
Assertions.assertEquals(hooksTriggeredCounter.get(), 2);
}
/**
* Add a side effect that increments `hooksTriggeredCounter` when the `temperatureFlux` terminates, either when
* completing successfully, gets canceled or failing with an error.
* Use only one operator!
*/
@Test
public void one_to_catch_them_all() {
AtomicInteger hooksTriggeredCounter = new AtomicInteger(0);
Flux<Integer> temperatureFlux = room_temperature_service()
.doFinally((f) -> hooksTriggeredCounter.incrementAndGet());
//todo: change this line only
;
StepVerifier.create(temperatureFlux.take(0))
.expectNextCount(0)
.verifyComplete();
StepVerifier.create(temperatureFlux.skip(20))
.expectNextCount(0)
.verifyComplete();
StepVerifier.create(temperatureFlux.skip(20)
.concatWith(Flux.error(new RuntimeException("oops"))))
.expectError()
.verify();
Assertions.assertEquals(hooksTriggeredCounter.get(), 3);
}
/**
* Replace `to do` strings with "one" || "two" || "three" depending on order of `doFirst()` hook execution.
*/
@Test
public void ordering_is_important() {
CopyOnWriteArrayList<String> sideEffects = new CopyOnWriteArrayList<>();
Mono<Boolean> just = Mono.just(true)
.doFirst(() -> sideEffects.add("three"))
.doFirst(() -> sideEffects.add("two"))
.doFirst(() -> sideEffects.add("one"));
List<String> orderOfExecution =
Arrays.asList("one", "two", "three"); //todo: change this line only
StepVerifier.create(just)
.expectNext(true)
.verifyComplete();
Assertions.assertEquals(sideEffects, orderOfExecution);
}
/**
* There is advanced operator, typically used for monitoring of a Flux. This operator will add behavior
* (side-effects) triggered for each signal that happens on Flux. It also has access to the context, which might be
* useful later.
*
* In this exercise, Flux will emit three elements and then complete. Add signal names to `signal` list dynamically,
* once these signals occur.
*
* Bonus: Explore this operator's documentation, as it may be useful in the future.
*/
@Test
public void one_to_rule_them_all() {
CopyOnWriteArrayList<String> signals = new CopyOnWriteArrayList<>();
Flux<Integer> flux = Flux.just(1, 2, 3)
.doOnEach(signal -> {
signals.add(signal.getType().toString());
})
//todo: change this line only
;
StepVerifier.create(flux)
.expectNextCount(3)
.verifyComplete();
Assertions.assertEquals(signals, Arrays.asList("ON_NEXT", "ON_NEXT", "ON_NEXT", "ON_COMPLETE"));
}
}