Skip to content

Commit

Permalink
Add support for optional package lists for feeds
Browse files Browse the repository at this point in the history
This allows feeds to be configured with options (starting with packages)
which acts as a list of packages to poll with the intention of avoiding
lossy firehose feeds. This is currently implemented for pypi.
  • Loading branch information
Qinusty committed Apr 30, 2021
1 parent ce53f3e commit 90e3d12
Show file tree
Hide file tree
Showing 21 changed files with 531 additions and 80 deletions.
28 changes: 21 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ behavior data for anyone interested.
A YAML configuration file can be provided with the following format:

```
enabled_feeds:
- pypi
- npm
- goproxy
- rubygems
- crates
feeds:
- type: pypi
- type: npm
- type: goproxy
- type: rubygems
- type: crates
publisher:
type: 'gcp_pubsub'
Expand All @@ -47,7 +47,21 @@ timer: false
```

`poll_rate` string formatted for [duration parser](https://golang.org/pkg/time/#ParseDuration).This is used as an initial value to generate a cutoff point for feed events relative to the given time at execution, with subsequent events using the previous time at execution as the cutoff point.
`timer` will configure interal polling of the `enabled_feeds` at the given `poll_rate` period. To specify this configuration file, define its path in your environment under the `PACKAGE_FEEDS_CONFIG_PATH` variable.
`timer` will configure interal polling of the `feeds` at the given `poll_rate` period. To specify this configuration file, define its path in your environment under the `PACKAGE_FEEDS_CONFIG_PATH` variable.

## FeedOptions

Feeds can be configured with additional options, not all feeds will support these features. See the appropriate feed `README.md` for supported options.
Below is an example of such options with pypi being configured to poll a specific set of packages

```
feeds:
- type: pypi
options:
packages:
- fooPackage
- barPackage
```

## Legacy Configuration

Expand Down
6 changes: 5 additions & 1 deletion cmd/scheduled-feed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ func main() {
log.Infof("using %q publisher", pub.Name())

feeds, err := appConfig.GetScheduledFeeds()
log.Infof("watching feeds: %v", strings.Join(appConfig.EnabledFeeds, ", "))
feedNames := []string{}
for k := range feeds {
feedNames = append(feedNames, k)
}
log.Infof("watching feeds: %v", strings.Join(feedNames, ", "))
if err != nil {
log.Fatal(err)
}
Expand Down
72 changes: 57 additions & 15 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (
"testing"

"github.com/ossf/package-feeds/config"
"github.com/ossf/package-feeds/feeds"
"github.com/ossf/package-feeds/feeds/pypi"
"github.com/ossf/package-feeds/feeds/scheduler"
"github.com/ossf/package-feeds/publisher/stdout"
)

const (
TestConfigStr = `
enabled_feeds:
- rubygems
- goproxy
- npm
feeds:
- type: rubygems
- type: goproxy
- type: npm
publisher:
type: "gcp"
Expand All @@ -26,8 +28,8 @@ poll_rate: 5m
timer: true
`
TestConfigStrUnknownFeedType = `
enabled_feeds:
- foo
feeds:
- type: foo
`
TestConfigStrUnknownField = `
foo:
Expand All @@ -40,11 +42,11 @@ func TestDefault(t *testing.T) {
t.Parallel()

c := config.Default()
feeds, err := c.GetScheduledFeeds()
scheduledFeeds, err := c.GetScheduledFeeds()
if err != nil {
t.Fatalf("failed to initialize feeds: %v", err)
}
_ = scheduler.New(feeds)
_ = scheduler.New(scheduledFeeds)
}

func TestGetScheduledFeeds(t *testing.T) {
Expand All @@ -54,16 +56,16 @@ func TestGetScheduledFeeds(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if len(c.EnabledFeeds) != 3 {
t.Fatalf("EnabledFeeds is expected to be 3 but was `%v`", len(c.EnabledFeeds))
if len(c.Feeds) != 3 {
t.Fatalf("Feeds is expected to be 3 but was `%v`", len(c.Feeds))
}
feeds, err := c.GetScheduledFeeds()
scheduledFeeds, err := c.GetScheduledFeeds()
if err != nil {
t.Fatal(err)
}
for _, val := range c.EnabledFeeds {
if _, ok := feeds[val]; !ok {
t.Errorf("expected `%v` feed was not found in scheduled feeds after GetScheduledFeeds()", val)
for _, feed := range c.Feeds {
if _, ok := scheduledFeeds[feed.Type]; !ok {
t.Errorf("expected `%v` feed was not found in scheduled feeds after GetScheduledFeeds()", feed.Type)
}
}
}
Expand All @@ -81,7 +83,7 @@ func TestLoadFeedConfigUnknownFeedType(t *testing.T) {
}
}

func TestPubConfigToPublisherStdout(t *testing.T) {
func TestPublisherConfigToPublisherStdout(t *testing.T) {
t.Parallel()

c := config.PublisherConfig{
Expand All @@ -97,6 +99,46 @@ func TestPubConfigToPublisherStdout(t *testing.T) {
}
}

func TestPublisherConfigToFeed(t *testing.T) {
t.Parallel()

packages := []string{
"foo",
"bar",
"baz",
}

c := config.FeedConfig{
Type: pypi.FeedName,
Options: feeds.FeedOptions{
Packages: &packages,
},
}
feed, err := c.ToFeed()
if err != nil {
t.Fatalf("failed to create pypi feed from configuration: %v", err)
}

pypiFeed, ok := feed.(*pypi.Feed)
if !ok {
t.Fatal("failed to cast feed as pypi feed")
}

feedPackages := pypiFeed.GetPackageList()
if feedPackages == nil {
t.Fatalf("failed to initialize pypi feed package list to poll")
}
if feedPackages != nil && len(*feedPackages) != len(packages) {
t.Errorf("pypi package list does not match config provided package list")
} else {
for i := 0; i < len(packages); i++ {
if (*feedPackages)[i] != packages[i] {
t.Errorf("pypi package '%v' does not match configured package '%v'", (*feedPackages)[i], packages[i])
}
}
}
}

func TestStrictConfigDecoding(t *testing.T) {
t.Parallel()

Expand Down
91 changes: 51 additions & 40 deletions config/scheduledfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ func NewConfigFromBytes(yamlBytes []byte) (*ScheduledFeedConfig, error) {
}

// Applies environment variables to the configuration.
func (config *ScheduledFeedConfig) applyEnvVars() {
func (sc *ScheduledFeedConfig) applyEnvVars() {
// Support legacy env var definition for gcp pub sub.
pubURL := os.Getenv("OSSMALWARE_TOPIC_URL")
if pubURL != "" {
config.PubConfig = PublisherConfig{
sc.PubConfig = PublisherConfig{
Type: gcppubsub.PublisherType,
Config: map[string]interface{}{
"url": pubURL,
Expand All @@ -72,49 +72,57 @@ func (config *ScheduledFeedConfig) applyEnvVars() {
port, err := strconv.Atoi(portStr)

if portProvided && err == nil {
config.HTTPPort = port
sc.HTTPPort = port
}
}

func AddTo(ls *[]int, value int) {
*ls = append(*ls, value)
}

// Constructs a map of ScheduledFeeds to enable based on the EnabledFeeds provided
// from configuration, indexed by the feed type.
func (config *ScheduledFeedConfig) GetScheduledFeeds() (map[string]feeds.ScheduledFeed, error) {
var err error
// Constructs a map of ScheduledFeeds to enable based on the Feeds
// provided from configuration, indexed by the feed type.
func (sc *ScheduledFeedConfig) GetScheduledFeeds() (map[string]feeds.ScheduledFeed, error) {
scheduledFeeds := map[string]feeds.ScheduledFeed{}
for _, entry := range config.EnabledFeeds {
switch entry {
case crates.FeedName:
scheduledFeeds[entry] = crates.Feed{}
case goproxy.FeedName:
scheduledFeeds[entry] = goproxy.Feed{}
case npm.FeedName:
scheduledFeeds[entry] = npm.Feed{}
case nuget.FeedName:
scheduledFeeds[entry] = nuget.Feed{}
case pypi.FeedName:
scheduledFeeds[entry] = pypi.Feed{}
case packagist.FeedName:
scheduledFeeds[entry] = packagist.Feed{}
case rubygems.FeedName:
scheduledFeeds[entry] = rubygems.Feed{}
default:
err = fmt.Errorf("%w : %v", errUnknownFeed, entry)

for _, entry := range sc.Feeds {
feed, err := entry.ToFeed()
if err != nil {
return nil, err
}
scheduledFeeds[entry.Type] = feed
}

if err != nil {
return nil, fmt.Errorf("failed to parse enabled_feeds entries: %w", err)
}
return scheduledFeeds, nil
}

// Produces a Publisher object from the provided PublisherConfig
// Constructs the appropriate feed for the given type, providing the
// options to the feed.
func FeedTypeToFeed(feedType string, feedOptions feeds.FeedOptions) (feeds.ScheduledFeed, error) {
switch feedType {
case crates.FeedName:
return crates.New(feedOptions)
case goproxy.FeedName:
return goproxy.New(feedOptions)
case npm.FeedName:
return npm.New(feedOptions)
case nuget.FeedName:
return nuget.New(feedOptions)
case pypi.FeedName:
return pypi.New(feedOptions)
case packagist.FeedName:
return packagist.New(feedOptions)
case rubygems.FeedName:
return rubygems.New(feedOptions)
default:
return nil, fmt.Errorf("%w : %v", errUnknownFeed, feedType)
}
}

// Produces a Publisher object from the PublisherConfig
// The PublisherConfig.Type value is evaluated and the appropriate Publisher is
// constructed from the Config field.
// constructed from the Config field. If the type is not a recognised Publisher type,
// an error is returned.
func (pc PublisherConfig) ToPublisher(ctx context.Context) (publisher.Publisher, error) {
var err error
switch pc.Type {
Expand All @@ -135,9 +143,12 @@ func (pc PublisherConfig) ToPublisher(ctx context.Context) (publisher.Publisher,
case stdout.PublisherType:
return stdout.New(), nil
default:
err = fmt.Errorf("%w : %v", errUnknownPub, pc.Type)
return nil, fmt.Errorf("%w : %v", errUnknownPub, pc.Type)
}
return nil, err
}

func (fc FeedConfig) ToFeed() (feeds.ScheduledFeed, error) {
return FeedTypeToFeed(fc.Type, fc.Options)
}

// Decode an input using mapstruct decoder with strictness enabled, errors will be returned in
Expand All @@ -155,14 +166,14 @@ func strictDecode(input, out interface{}) error {

func Default() *ScheduledFeedConfig {
config := &ScheduledFeedConfig{
EnabledFeeds: []string{
crates.FeedName,
goproxy.FeedName,
npm.FeedName,
nuget.FeedName,
packagist.FeedName,
pypi.FeedName,
rubygems.FeedName,
Feeds: []FeedConfig{
{Type: crates.FeedName},
{Type: goproxy.FeedName},
{Type: npm.FeedName},
{Type: nuget.FeedName},
{Type: packagist.FeedName},
{Type: pypi.FeedName},
{Type: rubygems.FeedName},
},
PubConfig: PublisherConfig{
Type: stdout.PublisherType,
Expand Down
17 changes: 12 additions & 5 deletions config/structs.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package config

import "github.com/ossf/package-feeds/feeds"

type ScheduledFeedConfig struct {
PubConfig PublisherConfig `yaml:"publisher"`
EnabledFeeds []string `yaml:"enabled_feeds"`
HTTPPort int `yaml:"http_port,omitempty"`
PollRate string `yaml:"poll_rate"`
Timer bool `yaml:"timer"`
PubConfig PublisherConfig `yaml:"publisher"`
Feeds []FeedConfig `yaml:"feeds"`
HTTPPort int `yaml:"http_port,omitempty"`
PollRate string `yaml:"poll_rate"`
Timer bool `yaml:"timer"`
}

type PublisherConfig struct {
Type string `mapstructure:"type"`
Config interface{} `mapstructure:"config"`
}

type FeedConfig struct {
Type string `mapstructure:"type"`
Options feeds.FeedOptions `mapstructure:"options"`
}
13 changes: 13 additions & 0 deletions feeds/crates/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Crates Feed

This feed allows polling of package updates from the crates package repository.

## Configuration options

The `packages` field is not supported by the crates feed.


```
feeds:
- type: crates
```
10 changes: 10 additions & 0 deletions feeds/crates/crates.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ func fetchPackages() ([]*Package, error) {

type Feed struct{}

func New(feedOptions feeds.FeedOptions) (*Feed, error) {
if feedOptions.Packages != nil {
return nil, feeds.UnsupportedOptionError{
Feed: FeedName,
Option: "packages",
}
}
return &Feed{}, nil
}

func (feed Feed) Latest(cutoff time.Time) ([]*feeds.Package, error) {
pkgs := []*feeds.Package{}
packages, err := fetchPackages()
Expand Down
Loading

0 comments on commit 90e3d12

Please sign in to comment.