forked from schananas/practical-reactor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
c1_Introduction.java
167 lines (139 loc) · 6.07 KB
/
c1_Introduction.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import org.junit.jupiter.api.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.*;
/**
* This chapter will introduce you to the basics of Reactor.
* You will learn how to retrieve result from Mono and Flux
* in different ways.
*
* Read first:
*
* https://projectreactor.io/docs/core/release/reference/#intro-reactive
* https://projectreactor.io/docs/core/release/reference/#reactive.subscribe
* https://projectreactor.io/docs/core/release/reference/#_subscribe_method_examples
*
* Useful documentation:
*
* https://projectreactor.io/docs/core/release/reference/#which-operator
* https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html
* https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
*
* @author Stefan Dragisic
*/
public class c1_Introduction extends IntroductionBase {
/**
* Every journey starts with Hello World!
* As you may know, Mono represents asynchronous result of 0-1 element.
* Retrieve result from this Mono by blocking indefinitely or until a next signal is received.
*/
@Test
public void hello_world() {
Mono<String> serviceResult = hello_world_service();
String result = serviceResult.block(); //todo: change this line only
assertEquals("Hello World!", result);
}
/**
* Retrieving result should last for a limited time amount of time, or you might get in trouble.
* Try retrieving result from service by blocking for maximum of 1 second or until a next signal is received.
*/
@Test
public void unresponsive_service() {
Exception exception = assertThrows(IllegalStateException.class, () -> {
Mono<String> serviceResult = unresponsiveService();
String result = serviceResult
.timeout(Duration.ofMillis(1000))
.onErrorResume(e -> Mono.error(new IllegalStateException("Timeout on blocking read for 1")))
.block();
});
String expectedMessage = "Timeout on blocking read for 1";
String actualMessage = exception.getMessage();
assertTrue(actualMessage.contains(expectedMessage));
}
/**
* Services are unpredictable, they might and might not return a result and no one likes nasty NPE's.
* Retrieve result from the service as optional object.
*/
@Test
public void empty_service() {
Mono<String> serviceResult = emptyService();
Optional<String> optionalServiceResult = serviceResult.blockOptional();
assertTrue(optionalServiceResult.isEmpty());
assertTrue(emptyServiceIsCalled.get());
}
/**
* Many services return more than one result and best services supports streaming!
* It's time to introduce Flux, an Asynchronous Sequence of 0-N Items.
*
* Service we are calling returns multiple items, but we are interested only in the first one.
* Retrieve first item from this Flux by blocking indefinitely until a first item is received.
*/
@Test
public void multi_result_service() {
Flux<String> serviceResult = multiResultService();
String result = serviceResult.blockFirst(); //todo: change this line only
assertEquals("valid result", result);
}
/**
* We have the service that returns list of fortune top five companies.
* Collect companies emitted by this service into a list.
* Retrieve results by blocking.
*/
@Test
public void fortune_top_five() {
Flux<String> serviceResult = fortuneTop5();
List<String> results = serviceResult.take(5).collectList().block(); //todo: change this line only
assertEquals(Arrays.asList("Walmart", "Amazon", "Apple", "CVS Health", "UnitedHealth Group"), results);
assertTrue(fortuneTop5ServiceIsCalled.get());
}
/***
* "I Used an Operator on my Flux, but it Doesn’t Seem to Apply. What Gives?"
*
* Previously we retrieved result by blocking on a Mono/Flux.
* That really beats whole purpose of non-blocking and asynchronous library like Reactor.
* Blocking operators are usually used for testing or when there is no way around, and
* you need to go back to synchronous world.
*
* Fix this test without using any blocking operator.
* Change only marked line!
*/
@Test
public void nothing_happens_until_you_() throws InterruptedException {
CopyOnWriteArrayList<String> companyList = new CopyOnWriteArrayList<>();
Flux<String> serviceResult = fortuneTop5();
serviceResult
.subscribe(companyList::add);
//todo: add an operator here, don't use any blocking operator!
;
Thread.sleep(1000); //bonus: can you explain why this line is needed?
assertEquals(Arrays.asList("Walmart", "Amazon", "Apple", "CVS Health", "UnitedHealth Group"), companyList);
}
/***
* If you finished previous task, this one should be a breeze.
*
* Upgrade previously used solution, so that it:
* - adds each emitted item to `companyList`
* - does nothing if error occurs
* - sets `serviceCallCompleted` to `true` once service call is completed.
*
* Don't use doOnNext, doOnError, doOnComplete hooks.
*/
@Test
public void leaving_blocking_world_behind() throws InterruptedException {
AtomicReference<Boolean> serviceCallCompleted = new AtomicReference<>(false);
CopyOnWriteArrayList<String> companyList = new CopyOnWriteArrayList<>();
fortuneTop5()
.map(companyList::add)
.subscribe(serviceCallCompleted::set);
Thread.sleep(1000);
assertTrue(serviceCallCompleted.get());
assertEquals(Arrays.asList("Walmart", "Amazon", "Apple", "CVS Health", "UnitedHealth Group"), companyList);
}
}