Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

diskqueue: support a callback when the disk queue is empty #21

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (

type AppLogFunc func(lvl LogLevel, f string, args ...interface{})

// EmptyCallback is called when the disk queue has no messages (after the last message was written)
type EmptyCallback func()

func (l LogLevel) String() string {
switch l {
case 1:
Expand Down Expand Up @@ -99,14 +102,15 @@ type diskQueue struct {
exitChan chan int
exitSyncChan chan int

logf AppLogFunc
logf AppLogFunc
emptyCallback EmptyCallback
}

// New instantiates an instance of diskQueue, retrieving metadata
// from the filesystem and starting the read ahead goroutine
func New(name string, dataPath string, maxBytesPerFile int64,
minMsgSize int32, maxMsgSize int32,
syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
syncEvery int64, syncTimeout time.Duration, logf AppLogFunc, cb EmptyCallback) Interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have quibbles with the naming here ... the type EmptyCallback func() initially sounds to me like it is describing the callback taking no arguments and returning nothing. I lean towards just queueEmptyCB func() (no extra type).

It's a bit annoying that we have to break compatibility when adding optional arguments here. Already did that for logf though, and not sure want to redesign with a DiskQueueOptions struct or similar, right now.

d := diskQueue{
name: name,
dataPath: dataPath,
Expand All @@ -124,6 +128,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
syncEvery: syncEvery,
syncTimeout: syncTimeout,
logf: logf,
emptyCallback: cb,
}

// no need to lock here, nothing else could possibly be touching this instance
Expand Down Expand Up @@ -647,6 +652,9 @@ func (d *diskQueue) ioLoop() {
count++
// moveForward sets needSync flag if a file is removed
d.moveForward()
if d.depth == 0 && d.emptyCallback != nil {
d.emptyCallback()
}
case d.depthChan <- d.depth:
case <-d.emptyChan:
d.emptyResponseChan <- d.deleteAllFiles()
Expand Down
23 changes: 14 additions & 9 deletions diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ func TestDiskQueue(t *testing.T) {
if err != nil {
panic(err)
}
var empty bool
defer os.RemoveAll(tmpDir)
dq := New(dqName, tmpDir, 1024, 4, 1<<10, 2500, 2*time.Second, l)
dq := New(dqName, tmpDir, 1024, 4, 1<<10, 2500, 2*time.Second, l, func() { empty = true })
defer dq.Close()
NotNil(t, dq)
Equal(t, int64(0), dq.Depth())
Expand All @@ -98,6 +99,10 @@ func TestDiskQueue(t *testing.T) {

msgOut := <-dq.ReadChan()
Equal(t, msg, msgOut)
// empty callback isn't guaranteed to happen until the next IOLoop operation has completed
// check depth to ensure ioloop has called empty callback
Equal(t, int64(0), dq.Depth())
Equal(t, empty, true)
}

func TestDiskQueueRoll(t *testing.T) {
Expand All @@ -110,7 +115,7 @@ func TestDiskQueueRoll(t *testing.T) {
defer os.RemoveAll(tmpDir)
msg := bytes.Repeat([]byte{0}, 10)
ml := int64(len(msg))
dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l)
dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l, nil)
defer dq.Close()
NotNil(t, dq)
Equal(t, int64(0), dq.Depth())
Expand Down Expand Up @@ -145,7 +150,7 @@ func TestDiskQueueEmpty(t *testing.T) {
}
defer os.RemoveAll(tmpDir)
msg := bytes.Repeat([]byte{0}, 10)
dq := New(dqName, tmpDir, 100, 0, 1<<10, 2500, 2*time.Second, l)
dq := New(dqName, tmpDir, 100, 0, 1<<10, 2500, 2*time.Second, l, nil)
defer dq.Close()
NotNil(t, dq)
Equal(t, int64(0), dq.Depth())
Expand Down Expand Up @@ -213,7 +218,7 @@ func TestDiskQueueCorruption(t *testing.T) {
}
defer os.RemoveAll(tmpDir)
// require a non-zero message length for the corrupt (len 0) test below
dq := New(dqName, tmpDir, 1000, 10, 1<<10, 5, 2*time.Second, l)
dq := New(dqName, tmpDir, 1000, 10, 1<<10, 5, 2*time.Second, l, nil)
defer dq.Close()

msg := make([]byte, 123) // 127 bytes per message, 8 (1016 bytes) messages per file
Expand Down Expand Up @@ -290,7 +295,7 @@ func TestDiskQueueSyncAfterRead(t *testing.T) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := New(dqName, tmpDir, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l)
dq := New(dqName, tmpDir, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l, nil)
defer dq.Close()

msg := make([]byte, 1000)
Expand Down Expand Up @@ -341,7 +346,7 @@ func TestDiskQueueTorture(t *testing.T) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := New(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l)
dq := New(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l, nil)
NotNil(t, dq)
Equal(t, int64(0), dq.Depth())

Expand Down Expand Up @@ -382,7 +387,7 @@ func TestDiskQueueTorture(t *testing.T) {

t.Logf("restarting diskqueue")

dq = New(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l)
dq = New(dqName, tmpDir, 262144, 0, 1<<10, 2500, 2*time.Second, l, nil)
defer dq.Close()
NotNil(t, dq)
Equal(t, depth, dq.Depth())
Expand Down Expand Up @@ -456,7 +461,7 @@ func benchmarkDiskQueuePut(size int64, b *testing.B) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := New(dqName, tmpDir, 1024768*100, 0, 1<<20, 2500, 2*time.Second, l)
dq := New(dqName, tmpDir, 1024768*100, 0, 1<<20, 2500, 2*time.Second, l, nil)
defer dq.Close()
b.SetBytes(size)
data := make([]byte, size)
Expand Down Expand Up @@ -607,7 +612,7 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := New(dqName, tmpDir, 1024768, 0, 1<<30, 2500, 2*time.Second, l)
dq := New(dqName, tmpDir, 1024768, 0, 1<<30, 2500, 2*time.Second, l, nil)
defer dq.Close()
b.SetBytes(size)
data := make([]byte, size)
Expand Down