From 7da5a88a5b6c1283c68fd5777200987775cd2bb5 Mon Sep 17 00:00:00 2001 From: Jehiah Czebotar Date: Wed, 25 Nov 2020 09:40:00 -0500 Subject: [PATCH] diskqueue: support a callback when the disk queue is empty --- diskqueue.go | 12 ++++++++++-- diskqueue_test.go | 23 ++++++++++++++--------- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 29d15ac..20300f5 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -28,6 +28,9 @@ const ( type AppLogFunc func(lvl LogLevel, f string, args ...interface{}) +// EmptyCallback is called when the disk queue has no messages (after the last message was written) +type EmptyCallback func() + func (l LogLevel) String() string { switch l { case 1: @@ -99,14 +102,15 @@ type diskQueue struct { exitChan chan int exitSyncChan chan int - logf AppLogFunc + logf AppLogFunc + emptyCallback EmptyCallback } // New instantiates an instance of diskQueue, retrieving metadata // from the filesystem and starting the read ahead goroutine func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, - syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { + syncEvery int64, syncTimeout time.Duration, logf AppLogFunc, cb EmptyCallback) Interface { d := diskQueue{ name: name, dataPath: dataPath, @@ -124,6 +128,7 @@ func New(name string, dataPath string, maxBytesPerFile int64, syncEvery: syncEvery, syncTimeout: syncTimeout, logf: logf, + emptyCallback: cb, } // no need to lock here, nothing else could possibly be touching this instance @@ -647,6 +652,9 @@ func (d *diskQueue) ioLoop() { count++ // moveForward sets needSync flag if a file is removed d.moveForward() + if d.depth == 0 && d.emptyCallback != nil { + d.emptyCallback() + } case d.depthChan <- d.depth: case <-d.emptyChan: d.emptyResponseChan <- d.deleteAllFiles() diff --git a/diskqueue_test.go b/diskqueue_test.go index a685f07..6ba018e 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -85,8 +85,9 @@ func TestDiskQueue(t *testing.T) { if err != nil { panic(err) } + var empty bool defer os.RemoveAll(tmpDir) - dq := New(dqName, tmpDir, 1024, 4, 1<<10, 2500, 2*time.Second, l) + dq := New(dqName, tmpDir, 1024, 4, 1<<10, 2500, 2*time.Second, l, func() { empty = true }) defer dq.Close() NotNil(t, dq) Equal(t, int64(0), dq.Depth()) @@ -98,6 +99,10 @@ func TestDiskQueue(t *testing.T) { msgOut := <-dq.ReadChan() Equal(t, msg, msgOut) + // empty callback isn't guaranteed to happen until the next IOLoop operation has completed + // check depth to ensure ioloop has called empty callback + Equal(t, int64(0), dq.Depth()) + Equal(t, empty, true) } func TestDiskQueueRoll(t *testing.T) { @@ -110,7 +115,7 @@ func TestDiskQueueRoll(t *testing.T) { defer os.RemoveAll(tmpDir) msg := bytes.Repeat([]byte{0}, 10) ml := int64(len(msg)) - dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l) + dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l, nil) defer dq.Close() NotNil(t, dq) Equal(t, int64(0), dq.Depth()) @@ -145,7 +150,7 @@ func TestDiskQueueEmpty(t *testing.T) { } defer os.RemoveAll(tmpDir) msg := bytes.Repeat([]byte{0}, 10) - dq := New(dqName, tmpDir, 100, 0, 1<<10, 2500, 2*time.Second, l) + dq := New(dqName, tmpDir, 100, 0, 1<<10, 2500, 2*time.Second, l, nil) defer dq.Close() NotNil(t, dq) Equal(t, int64(0), dq.Depth()) @@ -213,7 +218,7 @@ func TestDiskQueueCorruption(t *testing.T) { } defer os.RemoveAll(tmpDir) // require a non-zero message length for the corrupt (len 0) test below - dq := New(dqName, tmpDir, 1000, 10, 1<<10, 5, 2*time.Second, l) + dq := New(dqName, tmpDir, 1000, 10, 1<<10, 5, 2*time.Second, l, nil) defer dq.Close() msg := make([]byte, 123) // 127 bytes per message, 8 (1016 bytes) messages per file @@ -290,7 +295,7 @@ func TestDiskQueueSyncAfterRead(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - dq := New(dqName, tmpDir, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + dq := New(dqName, tmpDir, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l, nil) defer dq.Close() msg := make([]byte, 1000) @@ -341,7 +346,7 @@ func TestDiskQueueTorture(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - dq := New(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l) + dq := New(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l, nil) NotNil(t, dq) Equal(t, int64(0), dq.Depth()) @@ -382,7 +387,7 @@ func TestDiskQueueTorture(t *testing.T) { t.Logf("restarting diskqueue") - dq = New(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l) + dq = New(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l, nil) defer dq.Close() NotNil(t, dq) Equal(t, depth, dq.Depth()) @@ -456,7 +461,7 @@ func benchmarkDiskQueuePut(size int64, b *testing.B) { panic(err) } defer os.RemoveAll(tmpDir) - dq := New(dqName, tmpDir, 1024768*100, 0, 1<<20, 2500, 2*time.Second, l) + dq := New(dqName, tmpDir, 1024768*100, 0, 1<<20, 2500, 2*time.Second, l, nil) defer dq.Close() b.SetBytes(size) data := make([]byte, size) @@ -607,7 +612,7 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) { panic(err) } defer os.RemoveAll(tmpDir) - dq := New(dqName, tmpDir, 1024768, 0, 1<<30, 2500, 2*time.Second, l) + dq := New(dqName, tmpDir, 1024768, 0, 1<<30, 2500, 2*time.Second, l, nil) defer dq.Close() b.SetBytes(size) data := make([]byte, size)