-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathThreadPoolExample.java
368 lines (312 loc) · 10.4 KB
/
ThreadPoolExample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
/**
*
*/
package org.ankit.threads.samples;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* @author yz09
*
*/
public class ThreadPoolExample {
/**
* @param args
*/
public static void main(String[] args) {
// spawnSingleThread();
// spawnMultipleFixedThreads();
// spawnMultipleCachedThreads();
// spawnSingleScheduledThread();
// checkDiffInScheduleExecutorAndThreadPool();
checkDiffAtFixedRateAndWithFixedDelay();
}
private static void spawnSingleThread() {
/*
* Creates an Executor that uses a single worker thread operating off an
* unbounded queue. (Note however that if this single thread terminates
* due to a failure during execution prior to shutdown, a new one will
* take its place if needed to execute subsequent tasks.) Tasks are
* guaranteed to execute sequentially, and no more than one task will be
* active at any given time. Unlike the otherwise equivalent
* newFixedThreadPool(1) the returned executor is guaranteed not to be
* reconfigurable to use additional threads.
*/
Executor myExecutor = Executors.newSingleThreadExecutor();
myExecutor.execute(() -> {
System.out.println(
"My Thread Running" + Thread.currentThread().getName());
for (int i = 100; i < 200; i++) {
System.out.println("In My own Thread.." + i);
try {
Thread.sleep(9000);
} catch (InterruptedException e) {
System.out.println("Exception..."
+ Thread.currentThread().isInterrupted());
}
}
});
for (int i = 0; i < 100; i++) {
System.out.println("In Main Thread.." + i);
if (i == 50) {
ExecutorService service = (ExecutorService) myExecutor;
service.shutdownNow();
}
}
}
private static void spawnMultipleFixedThreads() {
/*
* Creates a thread pool that reuses a fixed number of threads operating
* off a shared unbounded queue. At any point, at most nThreads threads
* will be active processing tasks. If additional tasks are submitted
* when all threads are active, they will wait in the queue until a
* thread is available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if needed
* to execute subsequent tasks. The threads in the pool will exist until
* it is explicitly shutdown.
*/
Executor myExecutor = Executors.newFixedThreadPool(2);
MyLock lock = new MyLock();
myExecutor.execute(new EvenPrintThread(100, lock));
myExecutor.execute(new OddPrintThread(100, lock));
ExecutorService service = (ExecutorService) myExecutor;
service.shutdown();
}
private static void spawnMultipleCachedThreads() {
/*
* Creates a thread pool that creates new threads as needed, but will
* reuse previously constructed threads when they are available. These
* pools will typically improve the performance of programs that execute
* many short-lived asynchronous tasks. Calls to execute will reuse
* previously constructed threads if available. If no existing thread is
* available, a new thread will be created and added to the pool.
* Threads that have not been used for sixty seconds are terminated and
* removed from the cache. Thus, a pool that remains idle for long
* enough will not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using ThreadPoolExecutor constructors.
*/
ExecutorService myExecutor = Executors.newCachedThreadPool();
List<Future<String>> returnList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
System.out.println("submitting task.." + i);
Future<String> result = myExecutor.submit(() -> {
try {
System.out.println("Inter status..for task"
+ Thread.currentThread().getId()
+ Thread.currentThread().isInterrupted());
System.out
.println("Thread.." + Thread.currentThread().getId()
+ "/....going to sleep");
MILLISECONDS.sleep(2000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
System.out.println(
"Inter caught" + Thread.currentThread().getId()
+ "...status.." + Thread.interrupted());
}
return "I am callable for" + Thread.currentThread().getId();
});
returnList.add(result);
}
System.out.println("going to shutdown pool..");
myExecutor.shutdownNow();
System.out.println("pool shutdown....");
try {
for (int i = 3; i < 5; i++) {
System.out.println("submitting task.." + i);
Future<String> result = myExecutor.submit(() -> {
try {
System.out.println("Inter status..for task.."
+ Thread.currentThread().getId()
+ Thread.currentThread().isInterrupted());
System.out.println(
"Thread.." + Thread.currentThread().getId()
+ "/....going to sleep");
MILLISECONDS.sleep(2000);
} catch (InterruptedException ex) {
System.out.println("Inter caught"
+ Thread.currentThread().getId() + "...status.."
+ Thread.currentThread().isInterrupted());
}
return "I am callable for" + Thread.currentThread().getId();
});
returnList.add(result);
}
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
System.out.println("going to get result..");
returnList.forEach((item) -> {
try {
System.out.println("Result.." + item.get());
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
private static void checkDiffAtFixedRateAndWithFixedDelay() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.scheduleAtFixedRate(() -> {
System.out.println("My Thread running");
for (int i = 0; i < 4; i++) {
System.out.println(i);
try {
MILLISECONDS.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
} , 5, 1, TimeUnit.SECONDS);
/*
* executor.scheduleWithFixedDelay(() -> { System.out.println(
* "My Thread running"); for (int i = 0; i < 4; i++) {
* System.out.println(i); try { MILLISECONDS.sleep(2000); } catch
* (Exception e) { e.printStackTrace(); } } } , 5, 10,
* TimeUnit.SECONDS);
*/
}
private static void checkDiffInScheduleExecutorAndThreadPool() {
/*
* in this case once 1st thread finish then only second will execute as
* we have only 1 worker thread ,so for second thread delay does not
* matter.
*/
// ScheduledExecutorService executor =
// Executors.newSingleThreadScheduledExecutor();
/*
* In this case the second thread will execute after 15 second while
* first thraed is being run, as we have 2 pool size executor
*/
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
executor.schedule(() -> {
System.out.println("Started running...1");
for (int i = 0; i < 1000000; i++) {
System.out.println(i);
try {
MILLISECONDS.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
} , 10, TimeUnit.SECONDS);
executor.schedule(() -> {
System.out.println("I am another thread");
} , 12, TimeUnit.SECONDS);
executor.schedule(() -> {
System.out.println("I am another thread 2");
} , 15, TimeUnit.SECONDS);
executor.shutdown();
}
private static void spawnSingleScheduledThread() {
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
/*
* ScheduledExecutorService executor =
* Executors.newScheduledThreadPool(2);
*/
/*
* executor.schedule(() -> { System.out.println("Beep.Once.."); } , 1,
* TimeUnit.SECONDS);
*
* executor.scheduleAtFixedRate(() -> { System.out.println("Beep..."); }
* , 10, 10, TimeUnit.SECONDS);
*/
final ScheduledFuture<?> handle = executor.schedule(() -> {
System.out.println(
"You have not done any activity..so logging you out..");
} , 120, TimeUnit.SECONDS);
executor.schedule(() -> {
System.out.println("Activity done...so cacnelling timer");
// handle.cancel(true);
System.out.println("Scheduling a 2 min timer again...");
executor.schedule(() -> {
System.out.println(
"You have not done any activity..so logging you out..");
} , 120, TimeUnit.SECONDS);
} , 130, TimeUnit.SECONDS);
try {
executor.awaitTermination(250, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.out.println("Ineruppt while awaitTermination");
}
System.out.println("Shutdown starts..");
executor.shutdown();
System.out.println("Shutdown completes..");
}
}
class MyLock {
private boolean flag;
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
class EvenPrintThread implements Runnable {
private int numberLimit;
private MyLock lock;
public EvenPrintThread(int numberLimit, MyLock lock) {
this.lock = lock;
this.numberLimit = numberLimit;
}
@Override
public void run() {
int counter = 0;
while (counter < numberLimit) {
synchronized (lock) {
while (lock.isFlag()) {
try {
lock.wait();
} catch (InterruptedException ex) {
}
}
if (counter % 2 == 0) {
System.out.println("EvenPrintThread.." + counter);
}
lock.setFlag(true);
counter++;
lock.notifyAll();
}
}
}
}
class OddPrintThread implements Runnable {
private int numberLimit;
private MyLock lock;
public OddPrintThread(int numberLimit, MyLock lock) {
this.numberLimit = numberLimit;
this.lock = lock;
}
@Override
public void run() {
int counter = 0;
while (counter < numberLimit) {
synchronized (lock) {
while (!lock.isFlag()) {
try {
lock.wait();
} catch (InterruptedException ex) {
}
}
if (counter % 2 != 0) {
System.out.println("OddPrintThread..." + counter);
}
lock.setFlag(false);
counter++;
lock.notifyAll();
}
}
}
}