Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an LRU cache for NPM responses to reduce HTTP traffic. #370

Merged
merged 4 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/ossf/package-feeds
go 1.20

require (
github.com/hashicorp/golang-lru/v2 v2.0.4
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b
github.com/mitchellh/mapstructure v1.5.0
github.com/robfig/cron/v3 v3.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru v0.6.0/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.4 h1:7GHuZcgid37q8o5i3QI9KMT4nCWQQ3Kx3Ov6bb9MfK0=
github.com/hashicorp/golang-lru/v2 v2.0.4/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
Expand Down
84 changes: 71 additions & 13 deletions pkg/feeds/npm/npm.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"

"github.com/ossf/package-feeds/pkg/events"
"github.com/ossf/package-feeds/pkg/feeds"
"github.com/ossf/package-feeds/pkg/utils"
Expand All @@ -30,6 +32,11 @@ const (
// fetchWorkers defines the totoal number of concurrent HTTP1 requests to
// allow at any one time.
fetchWorkers = 10

// cacheEntryLimit defines how many responses to store in the LRU cache.
// The value should be larger than rssLimit to ensure all rss entries can
// be covered by a cache entry.
cacheEntryLimit = 500
)

var (
Expand All @@ -52,9 +59,24 @@ type PackageEvent struct {
Title string `xml:"title"`
}

// cacheEntry is stored in an LRU cache for a given URL key. The cache is used
// only by the fetchPackage function to avoid needlessly requesting the unchanged
// content repeatedly.
type cacheEntry struct {
// ETag stores the value provided in the "Etag" header in a 200 OK response
// returned for the URL key. The ETag is used in requests in the
// "If-None-Match" header to only get the request body if the ETag no longer
// matches.
ETag string

// Versions stores the data retrieved and returned by the fetchPackage
// function below.
Versions []*Package
}

// Returns a slice of PackageEvent{} structs.
func fetchPackageEvents(httpClient *http.Client, baseURL string) ([]PackageEvent, error) {
pkgURL, err := url.Parse(baseURL)
func fetchPackageEvents(feed Feed) ([]PackageEvent, error) {
pkgURL, err := url.Parse(feed.baseURL)
if err != nil {
return nil, err
}
Expand All @@ -64,7 +86,7 @@ func fetchPackageEvents(httpClient *http.Client, baseURL string) ([]PackageEvent
q.Set("limit", fmt.Sprintf("%d", rssLimit))
pkgURL.RawQuery = q.Encode()

resp, err := httpClient.Get(pkgURL.String())
resp, err := feed.client.Get(pkgURL.String())
if err != nil {
return nil, err
}
Expand All @@ -85,18 +107,36 @@ func fetchPackageEvents(httpClient *http.Client, baseURL string) ([]PackageEvent

// Gets the package version & corresponding created date from NPM. Returns
// a slice of {}Package.
func fetchPackage(httpClient *http.Client, baseURL, pkgTitle string) ([]*Package, error) {
versionURL, err := url.JoinPath(baseURL, pkgTitle)
func fetchPackage(feed Feed, pkgTitle string) ([]*Package, error) {
versionURL, err := url.JoinPath(feed.baseURL, pkgTitle)
if err != nil {
return nil, err
}
resp, err := httpClient.Get(versionURL)

req, err := http.NewRequest("GET", versionURL, nil)
if err != nil {
return nil, err
}

e, inCache := feed.cache.Get(versionURL)
if inCache && e != nil {
// We found a cache entry, so do a conditional request that only returns
// content if the etag has changed.
req.Header.Add("If-None-Match", e.ETag)
}

resp, err := feed.client.Do(req)
if err != nil {
return nil, err
}
body, readErr := io.ReadAll(resp.Body)
closeErr := resp.Body.Close()

if inCache && e != nil && utils.IsNotModified(resp) {
// We have a cached value and a 304 was returned, which means we can use
// our cached value as the result of this function call.
return e.Versions, nil
}
if err := utils.CheckResponseStatus(resp); err != nil {
return nil, fmt.Errorf("failed to fetch npm package version data: %w", err)
}
Expand All @@ -107,6 +147,7 @@ func fetchPackage(httpClient *http.Client, baseURL, pkgTitle string) ([]*Package
if closeErr != nil {
return nil, closeErr
}
etag := resp.Header.Get("etag")

// We only care about the `time` field as it contains all the versions in
// date order, from oldest to newest.
Expand Down Expand Up @@ -153,15 +194,25 @@ func fetchPackage(httpClient *http.Client, baseURL, pkgTitle string) ([]*Package
return versionSlice[j].CreatedDate.Before(versionSlice[i].CreatedDate)
})

if etag != "" {
// Add the result to the cache, only if the the etag is actually present.
// An etag should be present, but a server issue may result in the etag
// not being included.
feed.cache.Add(versionURL, &cacheEntry{
ETag: etag,
Versions: versionSlice,
})
}

return versionSlice, nil
}

func fetchAllPackages(httpClient *http.Client, registryURL string) ([]*feeds.Package, []error) {
func fetchAllPackages(feed Feed) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
errs := []error{}
packageChannel := make(chan []*Package)
errChannel := make(chan error)
packageEvents, err := fetchPackageEvents(httpClient, registryURL)
packageEvents, err := fetchPackageEvents(feed)
if err != nil {
// If we can't generate package events then return early.
return pkgs, append(errs, err)
Expand All @@ -183,7 +234,7 @@ func fetchAllPackages(httpClient *http.Client, registryURL string) ([]*feeds.Pac

// Define the fetcher function that grabs the repos from NPM
fetcherFn := func(pkgTitle string, count int) {
pkgs, err := fetchPackage(httpClient, registryURL, pkgTitle)
pkgs, err := fetchPackage(feed, pkgTitle)
if err != nil {
if !errors.Is(err, errUnpublished) {
err = feeds.PackagePollError{Name: pkgTitle, Err: err}
Expand Down Expand Up @@ -260,15 +311,15 @@ func fetchAllPackages(httpClient *http.Client, registryURL string) ([]*feeds.Pac
return pkgs, errs
}

func fetchCriticalPackages(httpClient *http.Client, registryURL string, packages []string) ([]*feeds.Package, []error) {
func fetchCriticalPackages(feed Feed, packages []string) ([]*feeds.Package, []error) {
pkgs := []*feeds.Package{}
errs := []error{}
packageChannel := make(chan []*Package)
errChannel := make(chan error)

for _, pkgTitle := range packages {
go func(pkgTitle string) {
pkgs, err := fetchPackage(httpClient, registryURL, pkgTitle)
pkgs, err := fetchPackage(feed, pkgTitle)
if err != nil {
if !errors.Is(err, errUnpublished) {
err = feeds.PackagePollError{Name: pkgTitle, Err: err}
Expand Down Expand Up @@ -305,6 +356,7 @@ type Feed struct {
baseURL string
options feeds.FeedOptions
client *http.Client
cache *lru.Cache[string, *cacheEntry]
}

func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, error) {
Expand All @@ -320,6 +372,11 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er
tr.MaxConnsPerHost = fetchWorkers
tr.IdleConnTimeout = 0 // No limit, try and reuse the idle connecitons.

cache, err := lru.New[string, *cacheEntry](cacheEntryLimit)
if err != nil {
return nil, err
}

return &Feed{
packages: feedOptions.Packages,
lossyFeedAlerter: feeds.NewLossyFeedAlerter(eventHandler),
Expand All @@ -329,6 +386,7 @@ func New(feedOptions feeds.FeedOptions, eventHandler *events.Handler) (*Feed, er
Transport: tr,
Timeout: 45 * time.Second,
},
cache: cache,
}, nil
}

Expand All @@ -337,9 +395,9 @@ func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, []error) {
var errs []error

if feed.packages == nil {
pkgs, errs = fetchAllPackages(feed.client, feed.baseURL)
pkgs, errs = fetchAllPackages(feed)
} else {
pkgs, errs = fetchCriticalPackages(feed.client, feed.baseURL, *feed.packages)
pkgs, errs = fetchCriticalPackages(feed, *feed.packages)
}

if len(pkgs) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/feeds/npm/npm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestNpmNonUtf8Response(t *testing.T) {
}
srv := testutils.HTTPServerMock(handlers)

pkgs, err := fetchPackageEvents(http.DefaultClient, srv.URL)
pkgs, err := fetchPackageEvents(Feed{client: http.DefaultClient, baseURL: srv.URL})
if err != nil {
t.Fatalf("Failed to fetch packages: %v", err)
}
Expand All @@ -243,7 +243,7 @@ func TestNpmNonXMLResponse(t *testing.T) {
}
srv := testutils.HTTPServerMock(handlers)

pkgs, err := fetchPackageEvents(http.DefaultClient, srv.URL)
pkgs, err := fetchPackageEvents(Feed{client: http.DefaultClient, baseURL: srv.URL})
if err != nil {
t.Fatalf("Failed to fetch packages: %v", err)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/utils/http_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ func CheckResponseStatus(res *http.Response) error {
}
return nil
}

func IsNotModified(res *http.Response) bool {
return res.StatusCode == http.StatusNotModified
}