1
- package main .concurrency ;
2
-
3
1
import java .util .concurrent .ArrayBlockingQueue ;
4
2
import java .util .concurrent .BlockingQueue ;
5
3
6
4
/**
7
5
* Simple sample of Producer/Consumer using ArrayBlockingQueue (ThreadSafe)
8
- * - One Producer adds items in a pipeline in a frequency of 100ms
6
+ * - One Producer adds items in a pipeline in a frequency of 200ms
9
7
* - The pipeline has a limit of 5 items
10
- * - When the pipeline is full, the Producer will wait for 500ms
11
8
* - Two consumers take items from the pipeline and consumes it in 500ms
12
9
*/
13
10
public class BlockingQueueDemo {
@@ -16,28 +13,23 @@ public class BlockingQueueDemo {
16
13
private static final int CONSUMERS = 2 ;
17
14
private static final int CAPACITY = 5 ;
18
15
19
- static class Producer extends Thread {
20
- BlockingQueue pipeline ;
16
+ private static class Producer extends Thread {
17
+ private final BlockingQueue < String > pipeline ;
21
18
22
- Producer (BlockingQueue pipeline ) {
19
+ Producer (BlockingQueue < String > pipeline ) {
23
20
this . pipeline = pipeline ;
24
21
}
25
22
26
23
public void run () {
27
24
int nItems = 1 ;
28
25
while (nItems <= 20 ) {
29
26
try {
30
- if (pipeline .remainingCapacity () > 0 ) {
31
- String item = "item" + nItems ;
32
- pipeline .add (item );
33
- String capacity = String .format (" [%d/%d]" , pipeline .size (), CAPACITY );
34
- System .out .println ("Producer is adding " + item + capacity );
35
- nItems ++;
36
- Thread .sleep (100 );
37
- } else {
38
- System .out .println ("Producer queue is full" );
39
- Thread .sleep (500 );
40
- }
27
+ String item = "item" + nItems ;
28
+ pipeline .offer (item );
29
+ String capacity = String .format (" [%d/%d]" , pipeline .size (), CAPACITY );
30
+ System .out .println ("Producer is adding " + item + capacity );
31
+ nItems ++;
32
+ Thread .sleep (200 );
41
33
} catch (InterruptedException e ) {
42
34
e .printStackTrace ();
43
35
}
@@ -59,19 +51,19 @@ public void run() {
59
51
}
60
52
}
61
53
62
- static class Consumer extends Thread {
63
- BlockingQueue pipeline ;
64
- String name ;
54
+ private static class Consumer extends Thread {
55
+ private final BlockingQueue < String > pipeline ;
56
+ private final String name ;
65
57
66
- Consumer (String name , BlockingQueue pipeline ) {
58
+ Consumer (String name , BlockingQueue < String > pipeline ) {
67
59
this .name = name ;
68
60
this . pipeline = pipeline ;
69
61
}
70
62
71
63
public void run () {
72
64
while (true ) {
73
65
try {
74
- String item = ( String ) pipeline .take ();
66
+ String item = pipeline .take ();
75
67
if (item .equals (STOP )) {
76
68
break ;
77
69
}
@@ -85,7 +77,7 @@ public void run() {
85
77
}
86
78
87
79
public static void main (String [] args ) throws InterruptedException {
88
- BlockingQueue pipeline = new ArrayBlockingQueue <String >(CAPACITY );
80
+ BlockingQueue < String > pipeline = new ArrayBlockingQueue <>(CAPACITY );
89
81
new Producer (pipeline ).start ();
90
82
Thread .sleep (100 );
91
83
for (int i = 0 ; i < CONSUMERS ; i ++) {
0 commit comments