Skip to content

Commit

Permalink
Merge pull request #266 from jakopako/feature/limit-threads
Browse files Browse the repository at this point in the history
limit nr of workers
  • Loading branch information
jakopako authored Dec 31, 2023
2 parents 948d3ef + da76463 commit 55b9a20
Showing 1 changed file with 45 additions and 24 deletions.
69 changes: 45 additions & 24 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"flag"
"fmt"
"log"
"math"
"os"
"sync"

Expand All @@ -16,19 +17,20 @@ import (

var version = "dev"

func runScraper(s *scraper.Scraper, itemsChannel chan map[string]interface{}, globalConfig *scraper.GlobalConfig) {
log.Printf("scraping %s\n", s.Name)
// This could probably be improved. We could pass the channel to
// GetItems instead of waiting for the scraper to finish.
items, err := s.GetItems(globalConfig, false)
if err != nil {
log.Printf("%s ERROR: %s", s.Name, err)
return
}
log.Printf("fetched %d %s items\n", len(items), s.Name)
for _, item := range items {
itemsChannel <- item
func worker(sc chan scraper.Scraper, ic chan map[string]interface{}, gc *scraper.GlobalConfig, threadNr int) {
for s := range sc {
log.Printf("thread %d: scraping %s\n", threadNr, s.Name)
items, err := s.GetItems(gc, false)
if err != nil {
log.Printf("%s ERROR: %s", s.Name, err)
continue
}
log.Printf("thread %d: fetched %d %s items\n", threadNr, len(items), s.Name)
for _, item := range items {
ic <- item
}
}
log.Printf("thread %d: done working\n", threadNr)
}

func main() {
Expand Down Expand Up @@ -107,9 +109,9 @@ func main() {
return
}

var scraperWg sync.WaitGroup
var workerWg sync.WaitGroup
var writerWg sync.WaitGroup
itemsChannel := make(chan map[string]interface{}, len(config.Scrapers))
ic := make(chan map[string]interface{})

var writer output.Writer
if *toStdout {
Expand All @@ -130,21 +132,40 @@ func main() {
if config.Global.UserAgent == "" {
config.Global.UserAgent = "goskyr web scraper (github.com/jakopako/goskyr)"
}
for _, s := range config.Scrapers {
if *singleScraper == "" || *singleScraper == s.Name {
scraperWg.Add(1)
go func(scr scraper.Scraper) {
defer scraperWg.Done()
runScraper(&scr, itemsChannel, &config.Global)
}(s)

sc := make(chan scraper.Scraper)

// fill worker queue
go func() {
for _, s := range config.Scrapers {
if *singleScraper == "" || *singleScraper == s.Name {
sc <- s
}
}
close(sc)
}()

// start workers
nrWorkers := 1
if *singleScraper == "" {
nrWorkers = int(math.Min(20, float64(len(config.Scrapers))))
}
log.Printf("running with %d threads\n", nrWorkers)
workerWg.Add(nrWorkers)
for i := 0; i < nrWorkers; i++ {
go func(j int) {
defer workerWg.Done()
worker(sc, ic, &config.Global, j)
}(i)
}

// start writer
writerWg.Add(1)
go func() {
defer writerWg.Done()
writer.Write(itemsChannel)
writer.Write(ic)
}()
scraperWg.Wait()
close(itemsChannel)
workerWg.Wait()
close(ic)
writerWg.Wait()
}

0 comments on commit 55b9a20

Please sign in to comment.