Skip to content

Commit

Permalink
refactor: dry file channels
Browse files Browse the repository at this point in the history
  • Loading branch information
ybirader committed Aug 13, 2023
1 parent f422230 commit e19ee6b
Showing 1 changed file with 26 additions and 21 deletions.
47 changes: 26 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
type Archiver struct {
Dest *os.File
w *zip.Writer
filesToProcess chan File
filesToWrite chan File
numberOfWorkers int
}

Expand All @@ -36,19 +38,18 @@ func (a *Archiver) ArchiveDir(root string) error {
}

func (a *Archiver) walkDir(root string) error {
filesToProcess := make(chan File)
filesToWrite := make(chan File)
a.initializeChannels()

wg := new(sync.WaitGroup)
wg.Add(a.numberOfWorkers)

awg := new(sync.WaitGroup)

for i := 0; i < a.numberOfWorkers; i++ {
go a.processFiles(filesToProcess, filesToWrite, wg)
go a.processFiles(wg)
}

awg.Add(1)
go a.writeFiles(filesToWrite, awg)
go a.writeFiles(awg)

err := filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
if err != nil {
Expand All @@ -61,7 +62,7 @@ func (a *Archiver) walkDir(root string) error {

f := File{Path: path, Info: info}

filesToProcess <- f
a.filesToProcess <- f

return nil
})
Expand All @@ -70,29 +71,28 @@ func (a *Archiver) walkDir(root string) error {
return err
}

close(filesToProcess)
close(a.filesToProcess)
wg.Wait()
close(filesToWrite)
close(a.filesToWrite)

awg.Wait()

return nil
}

func (a *Archiver) ArchiveFiles(files ...string) error {
filesToProcess := make(chan File)
filesToWrite := make(chan File)
a.initializeChannels()

wg := new(sync.WaitGroup)
wg.Add(a.numberOfWorkers)

awg := new(sync.WaitGroup)

for i := 0; i < a.numberOfWorkers; i++ {
go a.processFiles(filesToProcess, filesToWrite, wg)
go a.processFiles(wg)
}

awg.Add(1)
go a.writeFiles(filesToWrite, awg)
go a.writeFiles(awg)

for _, path := range files {
info, err := os.Lstat(path)
Expand All @@ -101,12 +101,12 @@ func (a *Archiver) ArchiveFiles(files ...string) error {
}

f := File{Path: path, Info: info}
filesToProcess <- f
a.filesToProcess <- f
}

close(filesToProcess)
close(a.filesToProcess)
wg.Wait()
close(filesToWrite)
close(a.filesToWrite)

awg.Wait()

Expand All @@ -122,18 +122,23 @@ func (a *Archiver) Close() error {
return nil
}

func (a *Archiver) processFiles(filesToProcess <-chan File, filesToWrite chan<- File, wg *sync.WaitGroup) {
func (a *Archiver) initializeChannels() {
a.filesToProcess = make(chan File)
a.filesToWrite = make(chan File)
}

func (a *Archiver) processFiles(wg *sync.WaitGroup) {
defer wg.Done()

for file := range filesToProcess {
filesToWrite <- file
for file := range a.filesToProcess {
a.filesToWrite <- file
}
}

func (a *Archiver) writeFiles(filesToWrite <-chan File, wg *sync.WaitGroup) {
func (a *Archiver) writeFiles(wg *sync.WaitGroup) {
defer wg.Done()

for file := range filesToWrite {
for file := range a.filesToWrite {
a.archive(&file)
}
}
Expand Down

0 comments on commit e19ee6b

Please sign in to comment.