Skip to content

Commit 27b1c48

Browse files
committed
add workers process files to completion
1 parent 6266feb commit 27b1c48

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

main.go

+9
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,24 @@ func (a *Archiver) ArchiveDir(root string) error {
4040
type FileProcessPool struct {
4141
tasks chan File
4242
executor func(f File)
43+
wg *sync.WaitGroup
4344
}
4445

4546
func (f *FileProcessPool) Start() {
4647
for i := 0; i < 1; i++ {
48+
f.wg.Add(1)
4749
go f.listen()
4850
}
4951
}
5052

53+
func (f *FileProcessPool) Close() {
54+
close(f.tasks)
55+
f.wg.Wait()
56+
}
57+
5158
func (f *FileProcessPool) listen() {
59+
defer f.wg.Done()
60+
5261
for file := range f.tasks {
5362
f.executor(file)
5463
}

main_test.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"io"
99
"io/fs"
1010
"os"
11+
"sync"
1112
"testing"
1213
"time"
1314

@@ -139,17 +140,23 @@ func TestFileProcessPool(t *testing.T) {
139140
assert.Equal(t, 1, fileProcessPool.PendingFiles())
140141
})
141142

142-
t.Run("can have workers process files", func(t *testing.T) {
143+
t.Run("can have workers process files to completion", func(t *testing.T) {
144+
output := bytes.Buffer{}
143145
executor := func(_ File) {
146+
time.Sleep(5 * time.Millisecond)
147+
output.WriteString("hello, world!")
144148
}
145149

146-
fileProcessPool := &FileProcessPool{tasks: make(chan File), executor: executor}
150+
fileProcessPool := &FileProcessPool{tasks: make(chan File), executor: executor, wg: new(sync.WaitGroup)}
147151
fileProcessPool.Start()
148152

149153
info := getFileInfo(t, helloTxtFileFixture)
150154
fileProcessPool.Enqueue(File{Path: helloTxtFileFixture, Info: info})
151155

156+
fileProcessPool.Close()
157+
152158
assert.Equal(t, 0, fileProcessPool.PendingFiles())
159+
assert.Equal(t, "hello, world!", output.String())
153160
})
154161
}
155162

0 commit comments

Comments
 (0)