Skip to content

Commit f274ba7

Browse files
committed
allow worker pool to be restarted
1 parent 5ae63db commit f274ba7

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

main.go

+5
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func NewFileProcessPool(numberOfWorkers int, executor func(f File)) (*FileProces
7474
}
7575

7676
func (f *FileProcessPool) Start() {
77+
f.reset()
7778
f.wg.Add(f.numberOfWorkers)
7879
for i := 0; i < f.numberOfWorkers; i++ {
7980
go f.listen()
@@ -101,6 +102,10 @@ func (f *FileProcessPool) Enqueue(file File) {
101102
f.tasks <- file
102103
}
103104

105+
func (f *FileProcessPool) reset() {
106+
f.tasks = make(chan File)
107+
}
108+
104109
func (a *Archiver) newWalkDir(root string) error {
105110
a.filesToWrite = make(chan File)
106111
a.fileProcessPool.Start()

main_test.go

+24
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,30 @@ func TestFileProcessPool(t *testing.T) {
158158
_, err := NewFileProcessPool(0, executor)
159159
assert.Error(t, err)
160160
})
161+
162+
t.Run("can be closed and restarted", func(t *testing.T) {
163+
output := bytes.Buffer{}
164+
executor := func(_ File) {
165+
output.WriteString("hello ")
166+
}
167+
168+
fileProcessPool, err := NewFileProcessPool(1, executor)
169+
assert.NoError(t, err)
170+
fileProcessPool.Start()
171+
172+
info := getFileInfo(t, helloTxtFileFixture)
173+
fileProcessPool.Enqueue(File{Path: helloTxtFileFixture, Info: info})
174+
175+
fileProcessPool.Close()
176+
177+
fileProcessPool.Start()
178+
info = getFileInfo(t, helloTxtFileFixture)
179+
fileProcessPool.Enqueue(File{Path: helloTxtFileFixture, Info: info})
180+
181+
fileProcessPool.Close()
182+
183+
assert.Equal(t, "hello hello ", output.String())
184+
})
161185
}
162186

163187
func BenchmarkArchive(b *testing.B) {

0 commit comments

Comments
 (0)