diff --git a/main.go b/main.go index 037e344..5985f39 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "math" "os" "runtime/debug" + "sort" "sync" "github.com/jakopako/goskyr/autoconfig" @@ -19,24 +20,47 @@ import ( var version = "dev" -func worker(sc chan scraper.Scraper, ic chan map[string]interface{}, gc *scraper.GlobalConfig, threadNr int) { +func worker(sc <-chan scraper.Scraper, ic chan<- map[string]interface{}, stc chan<- scraper.ScrapingStats, gc *scraper.GlobalConfig, threadNr int) { workerLogger := slog.With(slog.Int("thread", threadNr)) for s := range sc { scraperLogger := workerLogger.With(slog.String("name", s.Name)) scraperLogger.Info("starting scraping task") - items, err := s.GetItems(gc, false) + result, err := s.Scrape(gc, false) if err != nil { scraperLogger.Error(fmt.Sprintf("%s: %s", s.Name, err)) continue } - scraperLogger.Info(fmt.Sprintf("fetched %d items", len(items))) - for _, item := range items { + scraperLogger.Info(fmt.Sprintf("fetched %d items", result.Stats.NrItems)) + for _, item := range result.Items { ic <- item } + stc <- *result.Stats } workerLogger.Info("done working") } +func collectAllStats(stc <-chan scraper.ScrapingStats) []scraper.ScrapingStats { + result := []scraper.ScrapingStats{} + for st := range stc { + result = append(result, st) + } + return result +} + +func printAllStats(stats []scraper.ScrapingStats) { + // TODO: nicer format/layout, table format and colors depending on nrs, e.g. red for errors + statsString := "" + // sort by name alphabetically + sort.Slice(stats, func(i, j int) bool { + return stats[i].Name < stats[j].Name + }) + for _, s := range stats { + statsString += fmt.Sprintf("name: %s, items: %d, errors: %d\n", s.Name, s.NrItems, s.NrErrors) + } + // TODO add total of everything + fmt.Println(statsString) +} + func main() { singleScraper := flag.String("s", "", "The name of the scraper to be run.") toStdout := flag.Bool("stdout", false, "If set to true the scraped data will be written to stdout despite any other existing writer configurations. In combination with the -generate flag the newly generated config will be written to stdout instead of to a file.") @@ -143,6 +167,7 @@ func main() { var workerWg sync.WaitGroup var writerWg sync.WaitGroup + var statsWg sync.WaitGroup ic := make(chan map[string]interface{}) var writer output.Writer @@ -167,6 +192,7 @@ func main() { } sc := make(chan scraper.Scraper) + stc := make(chan scraper.ScrapingStats) // fill worker queue go func() { @@ -190,7 +216,7 @@ func main() { for i := 0; i < nrWorkers; i++ { go func(j int) { defer workerWg.Done() - worker(sc, ic, &config.Global, j) + worker(sc, ic, stc, &config.Global, j) }(i) } @@ -201,7 +227,20 @@ func main() { defer writerWg.Done() writer.Write(ic) }() + + // start stats collection + statsWg.Add(1) + slog.Debug("starting stats collection") + go func() { + defer statsWg.Done() + allStats := collectAllStats(stc) + writerWg.Wait() // only print stats in the end + printAllStats(allStats) + }() + workerWg.Wait() close(ic) + close(stc) writerWg.Wait() + statsWg.Wait() } diff --git a/ml/ml.go b/ml/ml.go index aebd75b..daccfde 100644 --- a/ml/ml.go +++ b/ml/ml.go @@ -138,12 +138,12 @@ func writeFeaturesToFile(filename string, featuresChan <-chan *Features, wg *syn func calculateScraperFeatures(s scraper.Scraper, featuresChan chan<- *Features, wordMap map[string]bool, globalConfig *scraper.GlobalConfig, wg *sync.WaitGroup) { defer wg.Done() log.Printf("calculating features for %s\n", s.Name) - items, err := s.GetItems(globalConfig, true) + result, err := s.Scrape(globalConfig, true) if err != nil { log.Printf("%s ERROR: %s", s.Name, err) return } - for _, item := range items { + for _, item := range result.Items { for fName, fValue := range item { fValueString := fValue.(string) f := calculateFeatures(fName, fValueString, wordMap) diff --git a/output/api.go b/output/api.go index 2b0e2b7..173fcee 100644 --- a/output/api.go +++ b/output/api.go @@ -72,7 +72,8 @@ func (f *APIWriter) Write(items chan map[string]interface{}) { batch = append(batch, item) if len(batch) == 100 { if err := postBatch(client, batch, apiURL, apiUser, apiPassword); err != nil { - fmt.Printf("%v\n", err) + logger.Error(fmt.Sprintf("%v\n", err)) + // fmt.Printf("%v\n", err) } else { nrItemsWritten = nrItemsWritten + 100 } diff --git a/scraper/scraper.go b/scraper/scraper.go index cf04683..6cb1812 100644 --- a/scraper/scraper.go +++ b/scraper/scraper.go @@ -249,13 +249,24 @@ type Scraper struct { fetcher fetch.Fetcher } -// GetItems fetches and returns all items from a website according to the +type ScrapingStats struct { + Name string + NrItems int + NrErrors int +} + +type ScrapingResult struct { + Items []map[string]interface{} + Stats *ScrapingStats +} + +// Scrape fetches and returns all items from a website according to the // Scraper's paramaters. When rawDyn is set to true the items returned are // not processed according to their type but instead the raw values based // only on the location are returned (ignore regex_extract??). And only those // of dynamic fields, ie fields that don't have a predefined value and that are // present on the main page (not subpages). This is used by the ML feature generation. -func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string]interface{}, error) { +func (c Scraper) Scrape(globalConfig *GlobalConfig, rawDyn bool) (*ScrapingResult, error) { scrLogger := slog.With(slog.String("name", c.Name)) // initialize fetcher @@ -269,11 +280,16 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string } } - var items []map[string]interface{} + result := &ScrapingResult{ + Items: []map[string]interface{}{}, + Stats: &ScrapingStats{ + Name: c.Name, + }, + } scrLogger.Debug("initializing filters") if err := c.initializeFilters(); err != nil { - return items, err + return result, err } hasNextPage := true @@ -282,7 +298,7 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string hasNextPage, pageURL, doc, err := c.fetchPage(nil, currentPage, c.URL, globalConfig.UserAgent, c.Interaction) if err != nil { - return items, err + return result, err } for hasNextPage { @@ -306,6 +322,7 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string } if err != nil { scrLogger.Error(fmt.Sprintf("error while parsing field %s: %v. Skipping item %v.", f.Name, err, currentItem)) + result.Stats.NrErrors++ return } } @@ -332,11 +349,13 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string subRes, err := c.fetcher.Fetch(subpageURL, fetch.FetchOpts{}) if err != nil { scrLogger.Error(fmt.Sprintf("%v. Skipping item %v.", err, currentItem)) + result.Stats.NrErrors++ return } subDoc, err := goquery.NewDocumentFromReader(strings.NewReader(subRes)) if err != nil { scrLogger.Error(fmt.Sprintf("error while reading document: %v. Skipping item %v", err, currentItem)) + result.Stats.NrErrors++ return } subDocs[subpageURL] = subDoc @@ -345,6 +364,7 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string err = extractField(&f, currentItem, subDocs[subpageURL].Selection, baseURLSubpage) if err != nil { scrLogger.Error(fmt.Sprintf("error while parsing field %s: %v. Skipping item %v.", f.Name, err, currentItem)) + result.Stats.NrErrors++ return } // filter fast! @@ -359,20 +379,21 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string filter := c.filterItem(currentItem) if filter { currentItem = c.removeHiddenFields(currentItem) - items = append(items, currentItem) + result.Items = append(result.Items, currentItem) + result.Stats.NrItems++ } }) currentPage++ hasNextPage, pageURL, doc, err = c.fetchPage(doc, currentPage, pageURL, globalConfig.UserAgent, nil) if err != nil { - return items, err + return result, err } } - c.guessYear(items, time.Now()) + c.guessYear(result.Items, time.Now()) - return items, nil + return result, nil } func (c *Scraper) guessYear(items []map[string]interface{}, ref time.Time) {