Skip to content

Commit

Permalink
refactor: rename to file worker pool
Browse files Browse the repository at this point in the history
  • Loading branch information
ybirader committed Aug 13, 2023
1 parent c8641ae commit d294dde
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
22 changes: 11 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type Archiver struct {
Dest *os.File
w *zip.Writer
numberOfWorkers int
fileProcessPool *FileProcessPool
fileWriterPool *FileProcessPool
fileProcessPool *FileWorkerPool
fileWriterPool *FileWorkerPool
}

type File struct {
Expand Down Expand Up @@ -59,56 +59,56 @@ func (a *Archiver) ArchiveDir(root string) error {

const minNumberOfWorkers = 1

type FileProcessPool struct {
type FileWorkerPool struct {
tasks chan File
executor func(f File)
wg *sync.WaitGroup
numberOfWorkers int
}

func NewFileProcessPool(numberOfWorkers int, executor func(f File)) (*FileProcessPool, error) {
func NewFileProcessPool(numberOfWorkers int, executor func(f File)) (*FileWorkerPool, error) {
if numberOfWorkers < minNumberOfWorkers {
return nil, errors.New("number of workers must be greater than 0")
}

return &FileProcessPool{
return &FileWorkerPool{
tasks: make(chan File),
executor: executor,
wg: new(sync.WaitGroup),
numberOfWorkers: numberOfWorkers,
}, nil
}

func (f *FileProcessPool) Start() {
func (f *FileWorkerPool) Start() {
f.reset()
f.wg.Add(f.numberOfWorkers)
for i := 0; i < f.numberOfWorkers; i++ {
go f.listen()
}
}

func (f *FileProcessPool) Close() {
func (f *FileWorkerPool) Close() {
close(f.tasks)
f.wg.Wait()
}

func (f *FileProcessPool) listen() {
func (f *FileWorkerPool) listen() {
defer f.wg.Done()

for file := range f.tasks {
f.executor(file)
}
}

func (f FileProcessPool) PendingFiles() int {
func (f FileWorkerPool) PendingFiles() int {
return len(f.tasks)
}

func (f *FileProcessPool) Enqueue(file File) {
func (f *FileWorkerPool) Enqueue(file File) {
f.tasks <- file
}

func (f *FileProcessPool) reset() {
func (f *FileWorkerPool) reset() {
f.tasks = make(chan File)
}

Expand Down
4 changes: 2 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func TestCompressToBuffer(t *testing.T) {
})
}

func TestFileProcessPool(t *testing.T) {
func TestFileWorkerPool(t *testing.T) {
t.Run("can enqueue tasks", func(t *testing.T) {
fileProcessPool := &FileProcessPool{tasks: make(chan File, 1)}
fileProcessPool := &FileWorkerPool{tasks: make(chan File, 1)}

info := getFileInfo(t, helloTxtFileFixture)
fileProcessPool.Enqueue(File{Path: helloTxtFileFixture, Info: info})
Expand Down

0 comments on commit d294dde

Please sign in to comment.