Skip to content

Commit

Permalink
Tune NPM timeouts and behavior for http2 (#339)
Browse files Browse the repository at this point in the history
* Tune NPM timeouts and behavior for http2

Signed-off-by: Caleb Brown <calebbrown@google.com>

* Fix lint errors

Signed-off-by: Caleb Brown <calebbrown@google.com>

* Remove errant delay.

Signed-off-by: Caleb Brown <calebbrown@google.com>

---------

Signed-off-by: Caleb Brown <calebbrown@google.com>
  • Loading branch information
calebbrown authored May 11, 2023
1 parent 05f6f2b commit 0922729
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 73 deletions.
10 changes: 4 additions & 6 deletions cmd/scheduled-feed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ import (
)

func main() {
// Increase overall idle conns. Generally this should be greater than the
// -PerHost value below multiplied by the number of feeds.
http.DefaultTransport.(*http.Transport).MaxIdleConns = 200
// Increase idle conns per host to increase the reuse of existing
// connections between requests. This is configured to match the size of the
// worker pool inthe npm feed.
http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 16
// connections between requests. This only applies to HTTP1. HTTP2 requests
// are multiplexed over a single TCP connection. HTTP2 is supported by
// all the current feeds.
http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 8

configPath, useConfig := os.LookupEnv("PACKAGE_FEEDS_CONFIG_PATH")
var err error
Expand Down
97 changes: 30 additions & 67 deletions pkg/feeds/npm/npm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"sort"
"sync"
"time"

"github.com/ossf/package-feeds/pkg/events"
Expand All @@ -26,14 +26,17 @@ const (
// Lower numbers will sometimes fail too. Default value if not specified is 50.
rssLimit = 200

// workers controls the number of concurrent workers used to fetch packages
// simulataneously during fetchAllPackages.
workers = 16
// maxJitterMillis is the upper bound of random jitter introcude while
// issuing requests to NPM. Random jitter will be generated between 0 and
// maxJitterMillis.
maxJitterMillis = 1000
)

var (
httpClient = &http.Client{
Timeout: 45 * time.Second,
// Timeout is large to allow for large response bodies multiplexed over
// HTTP2 to download simultaneously.
Timeout: 90 * time.Second,
}

errJSON = errors.New("error unmarshaling json response internally")
Expand Down Expand Up @@ -176,70 +179,33 @@ func fetchAllPackages(registryURL string) ([]*feeds.Package, []error) {
uniquePackages[pkg.Title]++
}

// Start a collection of workers to fetch all the packages.
// This limits the number of concurrent requests to avoid flooding the NPM
// registry API with too many simultaneous requests.
workChannel := make(chan struct {
pkgTitle string
count int
})

// Define the fetcher function that grabs the repos from NPM
fetcherFn := func(pkgTitle string, count int) {
pkgs, err := fetchPackage(registryURL, pkgTitle)
if err != nil {
if !errors.Is(err, errUnpublished) {
err = feeds.PackagePollError{Name: pkgTitle, Err: err}
}
errChannel <- err
return
}
// Apply count slice, guard against a given events corresponding
// version entry being unpublished by the time the specific
// endpoint has been processed. This seemingly happens silently
// without being recorded in the json. An `event` could be logged
// here.
if len(pkgs) > count {
packageChannel <- pkgs[:count]
} else {
packageChannel <- pkgs
}
}
for pkgTitle, count := range uniquePackages {
go func(pkgTitle string, count int) {
// Before requesting, wait, so all the requests don't arrive at once.
jitter := time.Duration(rand.Intn(maxJitterMillis)) * time.Millisecond //nolint:gosec
time.Sleep(jitter)

// The WaitGroup is used to ensure all the goroutines are complete before
// returning.
var wg sync.WaitGroup

// Start the fetcher workers.
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
w, more := <-workChannel
if !more {
// If we have no more work then return.
return
pkgs, err := fetchPackage(registryURL, pkgTitle)
if err != nil {
if !errors.Is(err, errUnpublished) {
err = feeds.PackagePollError{Name: pkgTitle, Err: err}
}
fetcherFn(w.pkgTitle, w.count)
errChannel <- err
return
}
}()
// Apply count slice, guard against a given events corresponding
// version entry being unpublished by the time the specific
// endpoint has been processed. This seemingly happens silently
// without being recorded in the json. An `event` could be logged
// here.
if len(pkgs) > count {
packageChannel <- pkgs[:count]
} else {
packageChannel <- pkgs
}
}(pkgTitle, count)
}

// Start a goroutine to push work to the workers.
go func() {
// Populate the worker feed.
for pkgTitle, count := range uniquePackages {
workChannel <- struct {
pkgTitle string
count int
}{pkgTitle: pkgTitle, count: count}
}

// Close the channel to indicate that there is no more work.
close(workChannel)
}()

// Collect all the worker.
for i := 0; i < len(uniquePackages); i++ {
select {
Expand All @@ -258,9 +224,6 @@ func fetchAllPackages(registryURL string) ([]*feeds.Package, []error) {
}
}

// Ensure the goroutines are all complete.
wg.Wait()

return pkgs, errs
}

Expand Down

0 comments on commit 0922729

Please sign in to comment.