@@ -399,117 +399,138 @@ private:
399
399
// //////////////////////////////////////////////////////////////////////////////
400
400
401
401
402
- version ( unittest )
402
+ unittest
403
403
{
404
- static if ( ! is ( typeof ( Thread ) ) )
405
- private import core.thread ;
404
+ import core.atomic , core.thread , core.sync.semaphore ;
406
405
407
-
408
- void testRead ( ReadWriteMutex .Policy policy )
406
+ static void runTest (ReadWriteMutex .Policy policy)
409
407
{
410
- auto mutex = new ReadWriteMutex ( policy );
411
- auto synInfo = new Object ;
412
- int numThreads = 10 ;
413
- int numReaders = 0 ;
414
- int maxReaders = 0 ;
408
+ scope mutex = new ReadWriteMutex (policy);
409
+ scope rdSemA = new Semaphore , rdSemB = new Semaphore ,
410
+ wrSemA = new Semaphore , wrSemB = new Semaphore ;
411
+ shared size_t numReaders, numWriters;
415
412
416
413
void readerFn ()
417
414
{
418
- synchronized ( mutex.reader )
415
+ synchronized ( mutex.reader)
419
416
{
420
- synchronized ( synInfo )
421
- {
422
- if ( ++ numReaders > maxReaders )
423
- maxReaders = numReaders;
424
- }
425
- Thread .sleep( dur! " msecs" (1 ) );
426
- synchronized ( synInfo )
427
- {
428
- -- numReaders;
429
- }
417
+ atomicOp! " +=" (numReaders, 1 );
418
+ rdSemA.notify();
419
+ rdSemB.wait();
420
+ atomicOp! " -=" (numReaders, 1 );
430
421
}
431
422
}
432
423
433
- auto group = new ThreadGroup ;
434
-
435
- for ( int i = 0 ; i < numThreads; ++ i )
436
- {
437
- group.create( &readerFn );
438
- }
439
- group.joinAll();
440
- assert ( numReaders < 1 && maxReaders > 1 );
441
- }
442
-
443
-
444
- void testReadWrite ( ReadWriteMutex .Policy policy )
445
- {
446
- auto mutex = new ReadWriteMutex ( policy );
447
- auto synInfo = new Object ;
448
- int numThreads = 10 ;
449
- int numReaders = 0 ;
450
- int numWriters = 0 ;
451
- int maxReaders = 0 ;
452
- int maxWriters = 0 ;
453
- int numTries = 5 ;
454
-
455
- void readerFn ()
424
+ void writerFn ()
456
425
{
457
- for ( int i = 0 ; i < numTries; ++ i )
426
+ synchronized (mutex.writer )
458
427
{
459
- synchronized ( mutex.reader )
460
- {
461
- synchronized ( synInfo )
462
- {
463
- if ( ++ numReaders > maxReaders )
464
- maxReaders = numReaders;
465
- }
466
- Thread .sleep( dur! " usecs" (100 ) );
467
- synchronized ( synInfo )
468
- {
469
- -- numReaders;
470
- }
471
- }
428
+ atomicOp! " +=" (numWriters, 1 );
429
+ wrSemA.notify();
430
+ wrSemB.wait();
431
+ atomicOp! " -=" (numWriters, 1 );
472
432
}
473
433
}
474
434
475
- void writerFn ( )
435
+ void waitQueued ( size_t queuedReaders, size_t queuedWriters )
476
436
{
477
- for ( int i = 0 ; i < numTries; ++ i )
437
+ for (;; )
478
438
{
479
- synchronized ( mutex.writer )
439
+ synchronized ( mutex.m_commonMutex )
480
440
{
481
- synchronized ( synInfo )
482
- {
483
- if ( ++ numWriters > maxWriters )
484
- maxWriters = numWriters;
485
- }
486
- Thread .sleep( dur! " usecs" (100 ) );
487
- synchronized ( synInfo )
488
- {
489
- -- numWriters;
490
- }
441
+ if (mutex.m_numQueuedReaders == queuedReaders &&
442
+ mutex.m_numQueuedWriters == queuedWriters)
443
+ break ;
491
444
}
445
+ Thread .yield();
492
446
}
493
447
}
494
448
495
- auto group = new ThreadGroup ;
449
+ scope group = new ThreadGroup ;
496
450
497
- for ( int i = 0 ; i < numThreads; ++ i )
451
+ // 2 simultaneous readers
452
+ group.create(&readerFn); group.create(&readerFn);
453
+ rdSemA.wait(); rdSemA.wait();
454
+ assert (numReaders == 2 );
455
+ rdSemB.notify(); rdSemB.notify();
456
+ group.joinAll();
457
+ assert (numReaders == 0 );
458
+ foreach (t; group) group.remove(t);
459
+
460
+ // 1 writer at a time
461
+ group.create(&writerFn); group.create(&writerFn);
462
+ wrSemA.wait();
463
+ assert (! wrSemA.tryWait());
464
+ assert (numWriters == 1 );
465
+ wrSemB.notify();
466
+ wrSemA.wait();
467
+ assert (numWriters == 1 );
468
+ wrSemB.notify();
469
+ group.joinAll();
470
+ assert (numWriters == 0 );
471
+ foreach (t; group) group.remove(t);
472
+
473
+ // reader and writer are mutually exclusive
474
+ group.create(&readerFn);
475
+ rdSemA.wait();
476
+ group.create(&writerFn);
477
+ waitQueued(0 , 1 );
478
+ assert (! wrSemA.tryWait());
479
+ assert (numReaders == 1 && numWriters == 0 );
480
+ rdSemB.notify();
481
+ wrSemA.wait();
482
+ assert (numReaders == 0 && numWriters == 1 );
483
+ wrSemB.notify();
484
+ group.joinAll();
485
+ assert (numReaders == 0 && numWriters == 0 );
486
+ foreach (t; group) group.remove(t);
487
+
488
+ // writer and reader are mutually exclusive
489
+ group.create(&writerFn);
490
+ wrSemA.wait();
491
+ group.create(&readerFn);
492
+ waitQueued(1 , 0 );
493
+ assert (! rdSemA.tryWait());
494
+ assert (numReaders == 0 && numWriters == 1 );
495
+ wrSemB.notify();
496
+ rdSemA.wait();
497
+ assert (numReaders == 1 && numWriters == 0 );
498
+ rdSemB.notify();
499
+ group.joinAll();
500
+ assert (numReaders == 0 && numWriters == 0 );
501
+ foreach (t; group) group.remove(t);
502
+
503
+ // policy determines whether queued reader or writers progress first
504
+ group.create(&writerFn);
505
+ wrSemA.wait();
506
+ group.create(&readerFn);
507
+ group.create(&writerFn);
508
+ waitQueued(1 , 1 );
509
+ assert (numReaders == 0 && numWriters == 1 );
510
+ wrSemB.notify();
511
+
512
+ if (policy == ReadWriteMutex .Policy.PREFER_READERS )
513
+ {
514
+ rdSemA.wait();
515
+ assert (numReaders == 1 && numWriters == 0 );
516
+ rdSemB.notify();
517
+ wrSemA.wait();
518
+ assert (numReaders == 0 && numWriters == 1 );
519
+ wrSemB.notify();
520
+ }
521
+ else if (policy == ReadWriteMutex .Policy.PREFER_WRITERS )
498
522
{
499
- group.create( &readerFn );
500
- group.create( &writerFn );
523
+ wrSemA.wait();
524
+ assert (numReaders == 0 && numWriters == 1 );
525
+ wrSemB.notify();
526
+ rdSemA.wait();
527
+ assert (numReaders == 1 && numWriters == 0 );
528
+ rdSemB.notify();
501
529
}
502
530
group.joinAll();
503
- assert ( numReaders < 1 && maxReaders > 1 &&
504
- numWriters < 1 && maxWriters < 2 );
505
- }
506
-
507
-
508
- unittest
509
- {
510
- testRead( ReadWriteMutex .Policy.PREFER_READERS );
511
- testRead( ReadWriteMutex .Policy.PREFER_WRITERS );
512
- testReadWrite( ReadWriteMutex .Policy.PREFER_READERS );
513
- testReadWrite( ReadWriteMutex .Policy.PREFER_WRITERS );
531
+ assert (numReaders == 0 && numWriters == 0 );
532
+ foreach (t; group) group.remove(t);
514
533
}
534
+ runTest(ReadWriteMutex .Policy.PREFER_READERS );
535
+ runTest(ReadWriteMutex .Policy.PREFER_WRITERS );
515
536
}
0 commit comments