2424import java .util .List ;
2525import java .util .concurrent .ArrayBlockingQueue ;
2626import java .util .concurrent .BlockingQueue ;
27- import java .util .concurrent .LinkedBlockingQueue ;
27+ import java .util .concurrent .ExecutorService ;
28+ import java .util .concurrent .Executors ;
2829import java .util .concurrent .Semaphore ;
2930import java .util .concurrent .atomic .AtomicBoolean ;
3031import java .util .concurrent .atomic .AtomicInteger ;
@@ -48,7 +49,9 @@ class FSEditLogAsync extends FSEditLog implements Runnable {
4849
4950 // requires concurrent access from caller threads and syncing thread.
5051 private final BlockingQueue <Edit > editPendingQ ;
51- private final BlockingQueue <EditSyncEx > logSyncNotifyQ ;
52+
53+ // Thread pool for executing logSyncNotify
54+ private final ExecutorService logSyncNotifyExecutor ;
5255
5356 // only accessed by syncing thread so no synchronization required.
5457 // queue is unbounded because it's effectively limited by the size
@@ -65,7 +68,9 @@ class FSEditLogAsync extends FSEditLog implements Runnable {
6568 DFS_NAMENODE_EDITS_ASYNC_LOGGING_PENDING_QUEUE_SIZE_DEFAULT );
6669
6770 editPendingQ = new ArrayBlockingQueue <>(editPendingQSize );
68- logSyncNotifyQ = new LinkedBlockingQueue <>();
71+
72+ // the thread pool size should be configurable later, and justified with a rationale
73+ logSyncNotifyExecutor = Executors .newFixedThreadPool (10 );
6974 }
7075
7176 private boolean isSyncThreadAlive () {
@@ -120,6 +125,7 @@ void openForWrite(int layoutVersion) throws IOException {
120125 public void close () {
121126 super .close ();
122127 stopSyncThread ();
128+ logSyncNotifyExecutor .shutdown ();
123129 }
124130
125131 @ Override
@@ -230,33 +236,8 @@ private Edit dequeueEdit() throws InterruptedException {
230236 return syncWaitQ .isEmpty () ? editPendingQ .take () : editPendingQ .poll ();
231237 }
232238
233- private static class EditSyncEx {
234- final Edit edit ;
235- final RuntimeException ex ;
236- EditSyncEx (Edit edit , RuntimeException ex ) {
237- this .edit = edit ;
238- this .ex = ex ;
239- }
240- }
241-
242- private class LogSyncNotifyThread extends Thread {
243- volatile boolean stopped = false ;
244-
245- @ Override
246- public void run () {
247- try {
248- while (!stopped ) {
249- EditSyncEx editSyncEx = logSyncNotifyQ .take ();
250- editSyncEx .edit .logSyncNotify (editSyncEx .ex );
251- }
252- } catch (InterruptedException ie ) {} // just swallow it
253- }
254- }
255-
256239 @ Override
257240 public void run () {
258- final LogSyncNotifyThread logSyncNotifyThread = new LogSyncNotifyThread ();
259- logSyncNotifyThread .start ();
260241 try {
261242 while (true ) {
262243 boolean doSync ;
@@ -279,14 +260,14 @@ public void run() {
279260 syncEx = ex ;
280261 }
281262 while ((edit = syncWaitQ .poll ()) != null ) {
282- logSyncNotifyQ .put (new EditSyncEx (edit , syncEx ));
263+ final Edit notifyEdit = edit ;
264+ final RuntimeException ex = syncEx ;
265+ logSyncNotifyExecutor .submit (() -> notifyEdit .logSyncNotify (ex ));
283266 }
284267 }
285268 }
286269 } catch (InterruptedException ie ) {
287270 LOG .info (Thread .currentThread ().getName () + " was interrupted, exiting" );
288- logSyncNotifyThread .stopped = true ;
289- logSyncNotifyThread .interrupt ();
290271 } catch (Throwable t ) {
291272 terminate (t );
292273 }
0 commit comments