Skip to content

Commit

Permalink
Add support for optional package lists for feeds
Browse files Browse the repository at this point in the history
This allows feeds to be configured with options (starting with packages)
which acts as a list of packages to poll with the intention of avoiding
lossy firehose feeds. This is currently implemented for pypi.
  • Loading branch information
Qinusty committed May 7, 2021
1 parent b4d8d54 commit bc1e1e9
Show file tree
Hide file tree
Showing 24 changed files with 537 additions and 98 deletions.
28 changes: 21 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ behavior data for anyone interested.
A YAML configuration file can be provided with the following format:

```
enabled_feeds:
- pypi
- npm
- goproxy
- rubygems
- crates
feeds:
- type: pypi
- type: npm
- type: goproxy
- type: rubygems
- type: crates
publisher:
type: 'gcp_pubsub'
Expand All @@ -47,7 +47,21 @@ timer: false
```

`poll_rate` string formatted for [duration parser](https://golang.org/pkg/time/#ParseDuration).This is used as an initial value to generate a cutoff point for feed events relative to the given time at execution, with subsequent events using the previous time at execution as the cutoff point.
`timer` will configure interal polling of the `enabled_feeds` at the given `poll_rate` period. To specify this configuration file, define its path in your environment under the `PACKAGE_FEEDS_CONFIG_PATH` variable.
`timer` will configure interal polling of the `feeds` at the given `poll_rate` period. To specify this configuration file, define its path in your environment under the `PACKAGE_FEEDS_CONFIG_PATH` variable.

## FeedOptions

Feeds can be configured with additional options, not all feeds will support these features. See the appropriate feed `README.md` for supported options.
Below is an example of such options with pypi being configured to poll a specific set of packages

```
feeds:
- type: pypi
options:
packages:
- fooPackage
- barPackage
```

## Legacy Configuration

Expand Down
6 changes: 5 additions & 1 deletion cmd/scheduled-feed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ func main() {
log.Infof("using %q publisher", pub.Name())

scheduledFeeds, err := appConfig.GetScheduledFeeds()
log.Infof("watching feeds: %v", strings.Join(appConfig.EnabledFeeds, ", "))
feedNames := []string{}
for k := range scheduledFeeds {
feedNames = append(feedNames, k)
}
log.Infof("watching feeds: %v", strings.Join(feedNames, ", "))
if err != nil {
log.Fatal(err)
}
Expand Down
72 changes: 57 additions & 15 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import (

"github.com/ossf/package-feeds/config"
"github.com/ossf/package-feeds/events"
"github.com/ossf/package-feeds/feeds"
"github.com/ossf/package-feeds/feeds/pypi"
"github.com/ossf/package-feeds/feeds/scheduler"
"github.com/ossf/package-feeds/publisher/stdout"
)

const (
TestConfigStr = `
enabled_feeds:
- rubygems
- goproxy
- npm
feeds:
- type: rubygems
- type: goproxy
- type: npm
publisher:
type: "gcp"
Expand All @@ -27,8 +29,8 @@ poll_rate: 5m
timer: true
`
TestConfigStrUnknownFeedType = `
enabled_feeds:
- foo
feeds:
- type: foo
`
TestConfigStrUnknownField = `
foo:
Expand All @@ -52,11 +54,11 @@ func TestDefault(t *testing.T) {
t.Parallel()

c := config.Default()
feeds, err := c.GetScheduledFeeds()
scheduledFeeds, err := c.GetScheduledFeeds()
if err != nil {
t.Fatalf("failed to initialize feeds: %v", err)
}
_ = scheduler.New(feeds)
_ = scheduler.New(scheduledFeeds)
}

func TestGetScheduledFeeds(t *testing.T) {
Expand All @@ -66,16 +68,16 @@ func TestGetScheduledFeeds(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if len(c.EnabledFeeds) != 3 {
t.Fatalf("EnabledFeeds is expected to be 3 but was `%v`", len(c.EnabledFeeds))
if len(c.Feeds) != 3 {
t.Fatalf("Feeds is expected to be 3 but was `%v`", len(c.Feeds))
}
feeds, err := c.GetScheduledFeeds()
scheduledFeeds, err := c.GetScheduledFeeds()
if err != nil {
t.Fatal(err)
}
for _, val := range c.EnabledFeeds {
if _, ok := feeds[val]; !ok {
t.Errorf("expected `%v` feed was not found in scheduled feeds after GetScheduledFeeds()", val)
for _, feed := range c.Feeds {
if _, ok := scheduledFeeds[feed.Type]; !ok {
t.Errorf("expected `%v` feed was not found in scheduled feeds after GetScheduledFeeds()", feed.Type)
}
}
}
Expand All @@ -93,7 +95,7 @@ func TestLoadFeedConfigUnknownFeedType(t *testing.T) {
}
}

func TestPubConfigToPublisherStdout(t *testing.T) {
func TestPublisherConfigToPublisherStdout(t *testing.T) {
t.Parallel()

c := config.PublisherConfig{
Expand All @@ -109,6 +111,46 @@ func TestPubConfigToPublisherStdout(t *testing.T) {
}
}

func TestPublisherConfigToFeed(t *testing.T) {
t.Parallel()

packages := []string{
"foo",
"bar",
"baz",
}

c := config.FeedConfig{
Type: pypi.FeedName,
Options: feeds.FeedOptions{
Packages: &packages,
},
}
feed, err := c.ToFeed(events.NewNullHandler())
if err != nil {
t.Fatalf("failed to create pypi feed from configuration: %v", err)
}

pypiFeed, ok := feed.(*pypi.Feed)
if !ok {
t.Fatal("failed to cast feed as pypi feed")
}

feedPackages := pypiFeed.GetPackageList()
if feedPackages == nil {
t.Fatalf("failed to initialize pypi feed package list to poll")
}
if feedPackages != nil && len(*feedPackages) != len(packages) {
t.Errorf("pypi package list does not match config provided package list")
} else {
for i := 0; i < len(packages); i++ {
if (*feedPackages)[i] != packages[i] {
t.Errorf("pypi package '%v' does not match configured package '%v'", (*feedPackages)[i], packages[i])
}
}
}
}

func TestStrictConfigDecoding(t *testing.T) {
t.Parallel()

Expand Down
99 changes: 53 additions & 46 deletions config/scheduledfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func NewConfigFromBytes(yamlBytes []byte) (*ScheduledFeedConfig, error) {
}

// Applies environment variables to the configuration.
func (config *ScheduledFeedConfig) applyEnvVars() {
func (sc *ScheduledFeedConfig) applyEnvVars() {
// Support legacy env var definition for gcp pub sub.
pubURL := os.Getenv("OSSMALWARE_TOPIC_URL")
if pubURL != "" {
config.PubConfig = PublisherConfig{
sc.PubConfig = PublisherConfig{
Type: gcppubsub.PublisherType,
Config: map[string]interface{}{
"url": pubURL,
Expand All @@ -75,61 +75,45 @@ func (config *ScheduledFeedConfig) applyEnvVars() {
port, err := strconv.Atoi(portStr)

if portProvided && err == nil {
config.HTTPPort = port
sc.HTTPPort = port
}
}

func AddTo(ls *[]int, value int) {
*ls = append(*ls, value)
}

// Constructs a map of ScheduledFeeds to enable based on the EnabledFeeds provided
// from configuration, indexed by the feed type.
func (config *ScheduledFeedConfig) GetScheduledFeeds() (map[string]feeds.ScheduledFeed, error) {
var err error
// Constructs a map of ScheduledFeeds to enable based on the Feeds
// provided from configuration, indexed by the feed type.
func (sc *ScheduledFeedConfig) GetScheduledFeeds() (map[string]feeds.ScheduledFeed, error) {
scheduledFeeds := map[string]feeds.ScheduledFeed{}
eventHandler, err := config.GetEventHandler()
eventHandler, err := sc.GetEventHandler()
if err != nil {
return nil, err
}
for _, entry := range config.EnabledFeeds {
switch entry {
case crates.FeedName:
scheduledFeeds[entry] = crates.New(eventHandler)
case goproxy.FeedName:
scheduledFeeds[entry] = goproxy.Feed{}
case npm.FeedName:
scheduledFeeds[entry] = npm.New(eventHandler)
case nuget.FeedName:
scheduledFeeds[entry] = nuget.Feed{}
case pypi.FeedName:
scheduledFeeds[entry] = pypi.New(eventHandler)
case packagist.FeedName:
scheduledFeeds[entry] = packagist.Feed{}
case rubygems.FeedName:
scheduledFeeds[entry] = rubygems.New(eventHandler)
default:
err = fmt.Errorf("%w : %v", errUnknownFeed, entry)

for _, entry := range sc.Feeds {
feed, err := entry.ToFeed(eventHandler)
if err != nil {
return nil, err
}
scheduledFeeds[entry.Type] = feed
}

if err != nil {
return nil, fmt.Errorf("failed to parse enabled_feeds entries: %w", err)
}
return scheduledFeeds, nil
}

func (config *ScheduledFeedConfig) GetEventHandler() (*events.Handler, error) {
func (sc *ScheduledFeedConfig) GetEventHandler() (*events.Handler, error) {
var err error
if config.EventsConfig == nil {
config.eventHandler = events.NewNullHandler()
} else if config.eventHandler == nil {
config.eventHandler, err = config.EventsConfig.ToEventHandler()
if sc.EventsConfig == nil {
sc.eventHandler = events.NewNullHandler()
} else if sc.eventHandler == nil {
sc.eventHandler, err = sc.EventsConfig.ToEventHandler()
if err != nil {
return nil, err
}
}
return config.eventHandler, nil
return sc.eventHandler, nil
}

func (ec *EventsConfig) ToEventHandler() (*events.Handler, error) {
Expand All @@ -145,7 +129,8 @@ func (ec *EventsConfig) ToEventHandler() (*events.Handler, error) {

// Produces a Publisher object from the provided PublisherConfig
// The PublisherConfig.Type value is evaluated and the appropriate Publisher is
// constructed from the Config field.
// constructed from the Config field. If the type is not a recognised Publisher type,
// an error is returned.
func (pc PublisherConfig) ToPublisher(ctx context.Context) (publisher.Publisher, error) {
var err error
switch pc.Type {
Expand All @@ -166,9 +151,31 @@ func (pc PublisherConfig) ToPublisher(ctx context.Context) (publisher.Publisher,
case stdout.PublisherType:
return stdout.New(), nil
default:
err = fmt.Errorf("%w : %v", errUnknownPub, pc.Type)
return nil, fmt.Errorf("%w : %v", errUnknownPub, pc.Type)
}
}

// Constructs the appropriate feed for the given type, providing the
// options to the feed.
func (fc FeedConfig) ToFeed(eventHandler *events.Handler) (feeds.ScheduledFeed, error) {
switch fc.Type {
case crates.FeedName:
return crates.New(fc.Options, eventHandler)
case goproxy.FeedName:
return goproxy.New(fc.Options)
case npm.FeedName:
return npm.New(fc.Options, eventHandler)
case nuget.FeedName:
return nuget.New(fc.Options)
case pypi.FeedName:
return pypi.New(fc.Options, eventHandler)
case packagist.FeedName:
return packagist.New(fc.Options)
case rubygems.FeedName:
return rubygems.New(fc.Options, eventHandler)
default:
return nil, fmt.Errorf("%w : %v", errUnknownFeed, fc.Type)
}
return nil, err
}

// Decode an input using mapstruct decoder with strictness enabled, errors will be returned in
Expand All @@ -186,14 +193,14 @@ func strictDecode(input, out interface{}) error {

func Default() *ScheduledFeedConfig {
config := &ScheduledFeedConfig{
EnabledFeeds: []string{
crates.FeedName,
goproxy.FeedName,
npm.FeedName,
nuget.FeedName,
packagist.FeedName,
pypi.FeedName,
rubygems.FeedName,
Feeds: []FeedConfig{
{Type: crates.FeedName},
{Type: goproxy.FeedName},
{Type: npm.FeedName},
{Type: nuget.FeedName},
{Type: packagist.FeedName},
{Type: pypi.FeedName},
{Type: rubygems.FeedName},
},
PubConfig: PublisherConfig{
Type: stdout.PublisherType,
Expand Down
18 changes: 13 additions & 5 deletions config/structs.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package config

import "github.com/ossf/package-feeds/events"
import (
"github.com/ossf/package-feeds/events"
"github.com/ossf/package-feeds/feeds"
)

type ScheduledFeedConfig struct {
// Configures the publisher for pushing packages after polling
// Configures the publisher for pushing packages after polling.
PubConfig PublisherConfig `yaml:"publisher"`

// Configures the feeds to be used for polling from package repositories
EnabledFeeds []string `yaml:"enabled_feeds"`
// Configures the feeds to be used for polling from package repositories.
Feeds []FeedConfig `yaml:"feeds"`

HTTPPort int `yaml:"http_port,omitempty"`
PollRate string `yaml:"poll_rate"`
Timer bool `yaml:"timer"`

// Configures the EventHandler instance to be used throughout the package-feeds application
// Configures the EventHandler instance to be used throughout the package-feeds application.
EventsConfig *EventsConfig `yaml:"events"`

eventHandler *events.Handler
Expand All @@ -24,6 +27,11 @@ type PublisherConfig struct {
Config interface{} `mapstructure:"config"`
}

type FeedConfig struct {
Type string `mapstructure:"type"`
Options feeds.FeedOptions `mapstructure:"options"`
}

type EventsConfig struct {
Sink string `yaml:"sink"`
EventFilter events.Filter `yaml:"filter"`
Expand Down
13 changes: 13 additions & 0 deletions feeds/crates/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Crates Feed

This feed allows polling of package updates from the crates package repository.

## Configuration options

The `packages` field is not supported by the crates feed.


```
feeds:
- type: crates
```
Loading

0 comments on commit bc1e1e9

Please sign in to comment.