In Anwendungen in denen mehrere Threads Daten erzeugen und konsumieren, benötigt man einen Mechanismus und eine Datenstruktur zur Synchronisation und Datenaustausch, die das ermöglichen.
In der vergangenen Kolumnen haben wir uns ausgefeilte Mechanismen dazu angeschaut, wie zum Beispiel LMAX Disruptor [Hun11], einen hochperformanten Ringpuffer mit verschiedenen Koordinierungsstrategien, der eine sehr effiziente Interaktion zwischen Produzenten und Konsumenten ermöglicht.
Aber auch die Java Concurrency API bietet mit den integrierten Synchronisationsmechanismen und den verschiedenen Warteschlangen Möglichkeiten, um Threads zu synchronisieren und Daten zwischen ihnen auszutauschen.
In dieser Kolumne schauen wir uns die Konzepte und Implementierungen von Blocking Queue an, einer Warteschlange, die dafür vorgesehen ist, von mehreren Threads gleichzeitig genutzt zu werden, um thread-sicher Daten in die Warteschlange einfügen und entfernen. Die Warteschlange, wie der Name schon sagt, ist so implementiert, dass sie die Threads blockiert, wenn sie versuchen Daten einzufügen wenn die Warteschlange voll oder zu entnehmen, aber die Warteschlange leer ist. Die Threads werden erst wieder freigegeben, wenn die Warteschlange wieder Daten enthält bzw. wieder Platz für Daten hat.
Ich habe selbst Blocking Queues genutzt, wenn Daten aus parallelen Quellen als Produzenten in einen einzelnen Java Stream als Konsument umgewandelt werden mussten. Andere Beispiele sind die Verarbeitung von Daten aus einer oder mehreren Quellen (z.b. API Aufrufe oder Datenbanken) die von mehreren Konsumenten asynchron und parallel verarbeitet werden sollen, wobei der Durchsatz des Systems kontrollierbar bleiben muss. Oder um mehrere Quellen mit einen einzelnen Konsumenten zu verarbeiten, z.B. um Updates in Batches in einem Single-Threaded Datenbankzugriff wie z.B. bei DuckDB oder SQLite durchzuführen.
Das BlockingQueue
Interface und seine Implementierungen sind einige der nützlichsten Bestandteile der java.util.concurrent
Package für den Datenaustausch zwischen nebenläufigen Parteien.
Wir werden uns einige Beispiele anschauen, die die Blocking Queue in Aktion zeigen und dann kurz in die Implementierung der ArrayBlockingQueue in Java eintauchen. Desweiteren wollen wir auch untersuchen, wie sich virtuelle Threads in diesem Fall verhalten, und ob ein System das virtuelle Threads mit Blocking Queues nutzt skalierbarer ist, als ein System, das mit Platformthreads arbeitet.
Im einfachsten Beispiel haben wir einen Produzenten und einen Konsumenten von Daten, die unabhängig voneinander und mit verschiedenen Geschwindigkeiten arbeiten. Um sie beide gleichzeitig in unserer Anwendung auszuführen, müssen wir sie in verschiedenen Threads laufen lassen.
Damit der Produzent die erzeugten Daten asynchron bereitstellen kann, braucht es einen Mechanismus der nicht nur die Daten bereitstellt, sondern auch die Information darüber, dass neue Informationen vorhanden sind.
Prinzipiell könnte man dieses mit für einen Datenwert mit einer Semaphore und einem Speicherplatz lösen, wobei die Semaphore anzeigt ob neue Daten vorhanden sind und der Speicherplatz die Daten enthält.
Der Speicherplatz selbst muss für den Zugriff durch mehrere Threads geeignet sein, also entweder volatile
oder als AtomicReference
implementiert werden.
Stattdessen können wir aber die dafür vorgesehenen Mechanismen der Java Concurrency API nutzen, die uns die Arbeit erleichtern und die Fehleranfälligkeit reduzieren.
Um Daten zwischen den beiden Threads auszutauschen, nutzen wir eine geeignete Blocking Queue Implementierung, die wir als gemeinsame Datenstruktur zwischen den beiden Threads nutzen.
Es gibt sowohl begrenzte als auch unbegrenzte Warteschlangen, die wir nutzen können, wobei die letzteren zu vermeiden sind, da sie keine Kontrollmechanismen bieten, um die bereitgestellte Datenmenge zu begrenzen und somit zu Speicherproblemen führen können.
Begrenzte, blockiertende Warteschlangen stellen ein gutes Design für nebenläufige Anwendungen dar, da hier die Kapazität des System kontrolliert werden kann und Produzenten und Konsumenten synchronisiert werden und trotzdem ein Puffer für Abweichungen in der Verarbeitungsgeschwindigkeit vorhanden ist. Damit kann unmittelbar ein Rückstau (Backpressure) im System implementiert werden, so dass Konsumenten nicht von Produzenten überfordert werden und die Kontrolle der Produktion bei den Konsumenten liegt.
Wenn Daten eines einzelnen Produzenten in einzelnen Einträgen in der Warteschlange abgelegt werden, kann der Konsument die Daten in der Reihenfolge abrufen, in der sie erzeugt wurden. Um anzuzeigen, dass ein Produzent keine Daten mehr erzeugt, kann ein spezielles Element in die Warteschlange eingefügt werden, das anzeigt, dass keine weiteren Daten mehr erzeugt werden. Dieses spezielle Stopsignal wird auch als Poison Pill (Giftpille) oder Tombstone (Grabstein) Eintrag bezeichnet.
Wie schon aus der Platzierung im Paket java.util.concurrent
hervorgeht, und auch aus der vorgesehenen Nutzung zum Datenaustausch zwischen mehreren Threads sind die Implementierungen von BlockingQueue thread-safe.
Ihre Methoden stellen atomare Operationen dar und benutzen intern Locks [LockSupport], um die Datenstruktur zu schützen.
Die Blocking Queue Implementierungen werfen eine NullPointerException
, wenn ein Null-Eintrag eingefügt wird.
Wie alle Klassen des Collections Frameworks sind BlockingQueues generifiziert, so dass sie mit verschiedenen Datentypen genutzt werden können.
Im folgenden Beispiel (Listing 1) sehen wir die diskutierten Aspekte im Einsatz.
Wir haben ein Record Message
das wir in die auf 10 Einträge limitierte Warteschlange ArrayBlockingQueue
einfügen und vearbeiten wollen.
Ein Produzent erzeugt 100 Einträge und fügt sie mittels der blockierenden Operation put
in die Warteschlange ein, und dann ein Stopsignal (Message.POISON_PILL
)für jeden der 5 Konsumenten.
Falls der Thread unterbrochen wird, wird die Exception behandelt und das interrupt Flag auf dem Thread gesetzt.
Die 5 Konsumenten-Threads laufen jeweils in einer Endlosschleife und entnehmen die Elemente aus der Warteschlange und "verarbeiten" sie mittels Thread.sleep
für bis zu 500 Millisekunden (random), bis sie das Stopsignal erhalten.
Die Interruptbehandlung ist äquivalent zum Produzenten.
Somit haben wir insgesamt 5+1 Threads die auf die entsprechenden Hardware-Threads des Systems gemappt werden.
record Message(int id, String content) {
public static final Message POISON_PILL = new Message(-1, "POISON PILL");
}
static int CONSUMERS = 5;
BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
try (var executor = Executors.newFixedThreadPool(6)) {
long start = System.currentTimeMillis();
// Producer
executor.submit(() -> {
try {
for (int i = 0; i < 100; i++) {
System.out.println("Produced " + i + " remaining " + queue.remainingCapacity()+
" " +(System.currentTimeMillis()-start)+" ms");
queue.put(new Message(i, "Message " + i));
}
for (int j = 0; j < CONSUMERS; j++) {
System.out.println("Produced " + j + "PP" + " remaining " +
queue.remainingCapacity()+
" " +(System.currentTimeMillis()-start)+" ms");
queue.put(Message.POISON_PILL);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumers
for (int c = 0; c < CONSUMERS; c++) {
int consumer = c;
executor.submit(() -> {
try {
while (true) {
Message message = queue.take();
Thread.sleep(ThreadLocalRandom.current().nextInt(500));
System.out.println(consumer+". "+Thread.currentThread() + " result: " +
message + " " +(System.currentTimeMillis()-start)+" ms");
if (message.equals(Message.POISON_PILL)) {
return;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
Wenn wir diesen Code, z.b. in jshell ausführen dauert es ca. 5 Sekunden bis alle Elemente verarbeitet sind. Die durchschnittliche Wartezeit ist 250ms bei 20 Elementen pro Konsument (100/5) ergibt das in Summe 5s, in 5 parallelen Threads. In der Ausgabe von Listing 2 sehen wir dass er Produzent die Einträge erzeugt und die Warteschlange fast immer voll ist.
Produced 0 remaining 10 1 ms
Produced 1 remaining 9 3 ms
Produced 2 remaining 8 3 ms
...
Produced 14 remaining 1 4 ms
Produced 15 remaining 0 4 ms // 5 consumer haben das 1. Element verarbeitet
4. Thread[#582,pool-4-thread-6,5,main] result: Message[id=4, content=Message 4] 3 ms
0. Thread[#578,pool-4-thread-2,5,main] result: Message[id=0, content=Message 0] 3 ms
3. Thread[#581,pool-4-thread-5,5,main] result: Message[id=3, content=Message 3] 3 ms
2. Thread[#580,pool-4-thread-4,5,main] result: Message[id=2, content=Message 2] 3 ms
1. Thread[#579,pool-4-thread-3,5,main] result: Message[id=1, content=Message 1] 3 ms
Produced 16 remaining 0 162 ms
1. Thread[#579,pool-4-thread-3,5,main] result: Message[id=5, content=Message 5] 162 ms
2. Thread[#580,pool-4-thread-4,5,main] result: Message[id=6, content=Message 6] 174 ms
Produced 17 remaining 0 174 ms
Produced 18 remaining 0 209 ms
1. Thread[#579,pool-4-thread-3,5,main] result: Message[id=7, content=Message 7] 209 ms
0. Thread[#578,pool-4-thread-2,5,main] result: Message[id=9, content=Message 9] 253 ms
...
0. Thread[#578,pool-4-thread-2,5,main] result: Message[id=97, content=Message 97] 4140 ms
2. Thread[#580,pool-4-thread-4,5,main] result: Message[id=98, content=Message 98] 4196 ms
2. Thread[#580,pool-4-thread-4,5,main] result: Message[id=99, content=Message 99] 4206 ms
1. Thread[#579,pool-4-thread-3,5,main] result: Message[id=-1, content=POISON PILL] 4212 ms
4. Thread[#582,pool-4-thread-6,5,main] result: Message[id=-1, content=POISON PILL] 4229 ms
3. Thread[#581,pool-4-thread-5,5,main] result: Message[id=-1, content=POISON PILL] 4358 ms
2. Thread[#580,pool-4-thread-4,5,main] result: Message[id=-1, content=POISON PILL] 4382 ms
0. Thread[#578,pool-4-thread-2,5,main] result: Message[id=-1, content=POISON PILL] 4528 ms
Es ist wichtig, dass wir unsere Konsumenten-Threads stoppen, wenn sie auf ein Element warten, das nie erscheinen wird, das wird mit der "Poison Pill" als Anzeige für das Ende der Produktion erreicht. Da jeder Konsumetn-Thread ein Stopsignal erhält und konsumiert bevor er sich beendet, muss der Produzent entsprechend so viele Stopsignale einfügen, wie es Konsumenten gibt.
Im java.util.concurrent
Paket ist das Interface [BlockingQueue]
definiert, das die Methoden für den Zugriff auf die Warteschlange definiert.
Damit wird sowohl die Syntax definiert als auch die Semantik dokumentiert.
Die wichtigsten Methoden sind für das Hinzufügen von Elementen:
-
boolean add(E)
- fügt ein Element in die Warteschlange ein, wenn wenn möglich, sonst IllegalStateException -
put(E)
- fügt ein Element in die Warteschlange ein, blockiert wenn die Warteschlange voll ist -
boolean offer(e)
- versucht Element einzufügen, gibt true zurück wenn erfolgreich, sonst false -
boolean offer(e, timeout, TimeUnit)
- versucht Element einzufügen, oder wartet bis der Timeout abgelaufen ist und gibt dann false zurück
Entnehmen:
-
E take()
- entfernt ein Element aus der Warteschlange, blockiert wenn die Warteschlange leer ist -
E poll()
- versucht Element zu entfernen, oder gibt bei leerer Warteschlange null zurück -
E poll(timeout, TimeUnit)
- versucht Element zu entfernen, oder wartet bei leerer Warteschlange bis der Timeout abgelaufen ist und gibt dann null zurück -
int drainTo(Collection[, elemente])
- entfernt alle Elemente aus der Warteschlange und fügt sie in die Collection ein -
remove(E)
- entfernt das Element aus der Warteschlange, wenn es vorhanden ist
Desweiteren gibt es noch:
-
E peek()
- liefert das erste Element der Warteschlange, ohne es zu entfernen -
int size()
- liefert die Anzahl der Elemente in der Warteschlange -
boolean isEmpty()
- liefert true, wenn die Warteschlange leer ist -
contains(E)
- liefert true, wenn die Warteschlange das Element enthält -
int remainingCapacity()
- liefert die Anzahl der noch verfügbaren Plätze in der Warteschlange
Aktion | Wirft Exception | Immer Ergebnis | Blockiert | Time Out |
---|---|---|---|---|
Einfügen |
|
|
|
|
Entfernen |
|
|
|
|
Testen |
|
|
Die Implementierungen von BlockingQueue
sind in der java.util.concurrent
Package zu finden, wie in Abbildung 2 zu sehen.
-
ArrayBlockingQueue
- begrenzte Warteschlange, die auf einem Objekt-Array basiert, blockiert beim Einfügen, wenn die Warteschlange voll ist, oder beim Entfernen, wenn die Warteschlange leer ist. -
LinkedBlockingQueue
- begrenzte oder unbegrenzte Warteschlange, die auf einer Linked List basiert -
PriorityBlockingQueue
- unbegrenzte Warteschlange, die die Elemente nach ihrer Priorität sortiert, Elemente müssenComparable
implementieren, wiePriorityQueue
-
DelayQueue
- unbegrenzte Warteschlange, die die Elemente nach ihrer Verzögerung sortiert, Elemente müssenDelayed
implementieren -
SynchronousQueue
- Warteschlange, die nur ein Element aufnehmen kann, das von einem Konsumenten entfernt werden muss, bevor ein weiteres Element eingefügt werden kann
Eine Deque
(double ended queue) ist eine zweiseite Warteschlange in die man die von beiden Seiten einfügen und entfernen kann.
Ihre API entspricht der Queue nur dass es für alle Methoden eine Variante mit First/Last Suffix gibt, wie z.B. addFirst
, addLast
, removeFirst
, removeLast
usw.
Eine unbegrenzte Warteschlange blockiert nicht beim Hinzufügen, nur beim Entfernen aus einer leeren Warteschlange. Wichtig ist hier, dass die Konsumenten genauso schnell oder schneller sind als die Produzenten, sonst läuft die Warteschlange voll und die JVM bekommt ein Speicherproblem.
Unser einfaches Beispiel mit einem Produzenten und mehreren Konsumenten, die in verschiedenen Threads laufen, kann auch mit virtuellen Threads implementiert werden, siehe Listing 3.
Zum einen können wir im Beispiel den Executor durch einen VirtualThreadPerTaskExecutor
ersetzen, der virtuelle Threads statt echte Threads erzeugt.
Dabei würde sich aber das Verhalten nicht ändern, da wir nur 1+5 Threads einsetzen, da die Konsumenten die Verarbeitung in einer Schleife innerhalb des Threads ausführen.
Stattdessen wollen wir wirklich das Konzept des "Ein Task, ein Thread" wie wir auch in [HunXx] zu Virtuellen Threads gesehen haben, nutzen und jedem Konsum-Task einen eigenen virtuellen Thread zuordnen.
Daher nutzen wir hier nur einen Konsumenten der Elemente aus der Warteschlange entnimmt (es könnten auch mehrere, sogar 100 sein, das würde keinen Unterschied machen), aber dann jedes Element in einem eigenen virtuellen Thread verarbeitet. Damit benötigen wir hier auch nur eine "Poison Pill".
Wir müssen die Handhabung des Executors anpassen, da dieser in einem try-with-resources, schon beendet, ist bevor unsere Verarbeitung-Tasks submitted werden könenn, und dann übergebene virtuelle Threads einfach nichts tun (auch keine Fehlermeldung).
record Message(int id, String content) {
public static final Message POISON_PILL = new Message(-1, "POISON PILL");
}
BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
var executor = Executors.newVirtualThreadPerTaskExecutor();
long start = System.currentTimeMillis();
// ein Produzent
executor.submit(() -> {
try {
for (int i = 0; i < 100; i++) {
System.out.println("Produced " + i + " remaining " + queue.remainingCapacity()+
" " +(System.currentTimeMillis()-start)+" ms");
queue.put(new Message(i, "Message " + i));
}
queue.put(Message.POISON_PILL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// ein Konsument
executor.submit(() -> {
try {
while (true) {
Message message = queue.take();
if (message.equals(Message.POISON_PILL)) return;
// Verarbeitungs-Task
executor.submit(() -> {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(500));
System.out.println(Thread.currentThread() + " result: " +
message+" "+(System.currentTimeMillis()-start)+" ms");
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executor.shutdown();
}
});
executor.awaitTermination(5, TimeUnit.MINUTES);
Wie wir in der Ausgabe in Listing 4 sehen können, werden alle virtuellen Threads als Verarbeitungs-Tasks parallel ausgeführt, da das Thread.sleep
nicht wirklich blockiert sonder nur die Kontrolle abgibt und somit 100 virtuelle Threads instantan erzeugt werden und die Verarbeitung der Elemente parallel stattfindet.
Die Gesamtlaufzeit beruht sich dann nur einmalig auf die 500ms Pausenzeit, die maximal von einem der Konsumenten benötigt werden.
Produced 96 remaining 0 32 ms
Produced 97 remaining 0 32 ms
Produced 98 remaining 4 32 ms
Produced 99 remaining 5 32 ms
VirtualThread[#1342]/runnable@FJP-1-worker-21 result: Message[id=10, content=Message 10] 41 ms
VirtualThread[#1372]/runnable@FJP-1-worker-21 result: Message[id=35, content=Message 35] 43 ms
VirtualThread[#1411]/runnable@FJP-1-worker-21 result: Message[id=74, content=Message 74] 48 ms
...
VirtualThread[#1417]/runnable@FJP-1-worker-23 result: Message[id=80, content=Message 80] 496 ms
VirtualThread[#1362]/runnable@FJP-1-worker-21 result: Message[id=25, content=Message 25] 496 ms
VirtualThread[#1407]/runnable@FJP-1-worker-17 result: Message[id=70, content=Message 70] 509 ms
VirtualThread[#1374]/runnable@FJP-1-worker-17 result: Message[id=37, content=Message 37] 512 ms
VirtualThread[#1385]/runnable@FJP-1-worker-17 result: Message[id=48, content=Message 48] 513 ms
VirtualThread[#1329]/runnable@FJP-1-worker-17 result: Message[id=0, content=Message 0] 518 ms
VirtualThread[#1387]/runnable@FJP-1-worker-17 result: Message[id=50, content=Message 50] 529 ms
Man könnte das ganze auch mit Structured Concurrency handhaben, und dann die Verarbeitung beenden und den Executor herunterfahren lassen, sobald die Poison Pill angekommen ist.
Das wäre ähnlich zur StructuredTaskScope.ShutdownOnFailure
funktionieren nur dass der Executor nicht beim ersten "Fehler" eines Tasks beendet wird sondern beim Auftreten der "Poison Pill" Nachricht.
Um zu verstehen, wie eine Wartequeue implementiert werden kann, schauen wir uns kurz die Implementierung der [ArrayBlockingQueue] an. Wir finden diese auf GitHub im OpenJDK Repository [OpenJDKRepo].
Die ArrayBlockingQueue
ist eine begrenzte Warteschlange, die auf einem Objekt-Array mit fixer Größe entsprechend der Kapazität basiert.
Um die Zugriffe auf die Warteschlange zu synchronisieren, wird ein ReentrantLock
verwendet, dass es erlaubt, dass derselbe Thread mehrmals das Lock betreten kann, ohne ein Deadlock gegen sich selbst zu verursachen.
Jede Betreten des Locks erhöht erhöht einen Zähler für den Thread und erniedrigt ihn beim Verlassen.
Die Bedingungen für notFull
und notEmpty
werden mittels Konditions-Objekten [Condition] realisiert, die mit dem Lock verknüpft sind und die await()
und signal()
Methoden implementieren, die die Object.wait()
und Object.notify()
Methoden ersetzen.
Diese benutzen unter der Haube LockSupport.park()
um den aktuellen Thread zu pausieren, damit werden auch, anders als bei synchronized
virtuelle Threads mit unterstützt.
Im Konstruktor (Listing 5) wird das Array, das Lock und die Konditionen initialisiert.
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// begrenzte Warteschlange
this.items = new Object[capacity];
// Lock für Schreibzugriff
lock = new ReentrantLock(fair);
// Condition für Warteschlange voll und leer
// Condition externalisiert Object.wait() und notify()
// verknüpft mit dem Lock, so dass condition.await()
// das lock freigibt und den Thread parkt
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
Die put
Methode in Listing 6 führt zuerst den Null-Test durch, und sperrt dann den Zugriff über das Lock der Warteschlange.
Dann wird in einer Schleife solange gewartet (Thread blockiert und Lock gelöst) solange keine Platz in der Warteschlange ist.
Wenn das dann der Fall ist, wird das Element hinzugefügt und schlussendlich das Lock wieder freigegeben.
put
Methode der ArrayBlockingQueue
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
// Lock für Schreiben auf Warteschlange
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// solange die Warteschlange voll ist, warten
while (count == items.length) {
// notFull ist ein Condition Objekt
// ein Thread wird mit
// notFull.signal() aufgeweckt wenn
// ein Element entfernt wurde
notFull.await();
}
// Element einfügen
enqueue(e);
} finally {
// Lock freigeben
lock.unlock();
}
}
Blocking Queues sind nützliche Werkzeuge für nebenläufige Anwendungen, die Daten zwischen mehreren Threads austauschen müssen bzw. zwischen Produzenten und Konsumenten synchronisieren müssen. Auch die Verteilung eines eingehenden singulären Stroms auf mehrere Konsumenten oder die Zusammenführung von mehreren Quellen in einen singulären Konsumenten kann mit Blocking Queues einfach implementiert werden.
Mit virtuellen Threads besteht die Möglichkeit, dass die Verarbeitung jedes Elements in eigenen Task bzw. Thread stattfindet und damit die Resourcen des Systems besser ausgenutzt und der Durchsatz erhöht werden kann.
Wie schon im Disruptor Artikel und Paper vor vielen Jahren beschrieben sind in echten Systemem die Geschwindigkeiten von Produzenten und Konsumenten selten balanciert, so dass die Warteschlangen entweder immer leer (Konsumenten schneller) oder immer voll sind (Produzent schneller). Warteschlangen haben auch das Problem, dass sie einen Contention Point bilden, da mehrere Threads gleichzeitig auf diesselben synchronisierten Informationen zum Warteschlangenstatus und auf das Kopf-Element zugreifen. Zumindest in hochperformanten Systemen, wo die Verarbeitung von elementen im Mikrosekundenbereich liegt, ist das ein Problem, da die Synchronisation der Threads die Verarbeitung verlangsamt. Dann bieten sich alternative Ansätze wie LMAX Disruptor an.
-
[Hun11] JavaSpektrum 05/2011 - LMAX Disruptor
-
[Morling] https://www.morling.dev/blog/is-your-blocking-queue-blocking/
-
[JenkovBQVideo] https://www.youtube.com/watch?v=d3xb1Nj88pw
-
[PatelCustomQueue] https://javabypatel.blogspot.com/2020/09/custom-blocking-queue-implementation-in-java.html
-
[BQ Explainer] https://bito.ai/resources/blocking-queue-java-implementation-java-explained/
-
[Baeldung-BQ] https://www.baeldung.com/java-blocking-queue
-
[LockSupport] https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/locks/LockSupport.html
-
[OpenJDKRepo] https://github.com/openjdk/jdk
-
[ArrayBlockingQueue] https://github.com/openjdk/jdk/blob/jdk-17%2B35/src/java.base/share/classes/java/util/concurrent/ArrayBlockingQueue.java#L370
-
[BQ Implementation] https://www.javacodemonk.com/blocking-queue-implementation-in-java-044ee033
-
[ABQ Implementation] https://topdeveloperacademy.com/articles/java-arrayblockingqueue-a-thread-safe-bound-size-queue