Skip to content

Commit

Permalink
Add lazy_load option to preserve existing behaviorImplements the file…
Browse files Browse the repository at this point in the history
…s event producer to populate the cacheto be consistent with the other sources.Default behavior enables a static unbounded cache so all the objectsare preloaded, and do not expire.If a lru cache is explicitly defined, it will be used instead.Setting lazy_load=true will disable preload and operate inlazy load backed by cache mode.

Addressing feedback

Address https://github.com/prebid/prebid-server/pull/1411/files#r476907655
Address https://github.com/prebid/prebid-server/pull/1411/files#r476926516

Fixed bug uncovered by new unit test.
  • Loading branch information
laurb9 committed Aug 26, 2020
1 parent 8ce0e21 commit 0c48b0b
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 50 deletions.
12 changes: 12 additions & 0 deletions config/stored_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type StoredRequests struct {
Files bool `mapstructure:"filesystem"`
//If data should be loaded from file system, path should be specified in configuration
Path string `mapstructure:"directorypath"`
// Lazy-load the data.
// false (default): preload all the files in cache
// true: load data only when incoming requests ask for the stored resource
FilesLazyLoad bool `mapstructure:"filesystem_lazy_load"`
// Postgres configures Fetchers and EventProducers which read from a Postgres DB.
// Fetchers are in stored_requests/backends/db_fetcher/postgres.go
// EventProducers are in stored_requests/events/postgres
Expand Down Expand Up @@ -105,6 +109,10 @@ type FileFetcherConfig struct {
Enabled bool `mapstructure:"enabled"`
// Path to the directory this file fetcher gets data from.
Path string `mapstructure:"directorypath"`
// Lazy-load the data.
// false (default): preload all the files in cache
// true: load data only when incoming requests ask for the stored resource
LazyLoad bool `mapstructure:"lazy_load"`
}

// HTTPFetcherConfigSlim configures a stored_requests/backends/http_fetcher/fetcher.go
Expand Down Expand Up @@ -134,6 +142,10 @@ func (cfg *StoredRequests) validate(errs configErrors) configErrors {
if cfg.Postgres.CacheInitialization.Query != "" {
errs = append(errs, errors.New("stored_requests.postgres.initialize_caches.query must be empty if stored_requests.in_memory_cache=none"))
}
if cfg.Files && !cfg.FilesLazyLoad {
// auto configure an in-memory static cache to replicate previous behavior (load and store all)
cfg.InMemoryCache.Type = "unbounded"
}
}
errs = cfg.InMemoryCache.validate(errs)
errs = cfg.Postgres.validate(errs)
Expand Down
4 changes: 4 additions & 0 deletions endpoints/openrtb2/amp_auction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,10 @@ func (cf *mockAmpStoredReqFetcher) FetchRequests(ctx context.Context, requestIDs
return cf.data, nil, nil
}

func (cf mockAmpStoredReqFetcher) FetchAllRequests(ctx context.Context) (requestData map[string]json.RawMessage, impData map[string]json.RawMessage, errs []error) {
return cf.data, nil, nil
}

type mockAmpExchange struct {
lastRequest *openrtb.BidRequest
}
Expand Down
4 changes: 4 additions & 0 deletions endpoints/openrtb2/auction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1716,6 +1716,10 @@ func (cf mockStoredReqFetcher) FetchRequests(ctx context.Context, requestIDs []s
return testStoredRequestData, testStoredImpData, nil
}

func (cf mockStoredReqFetcher) FetchAllRequests(ctx context.Context) (requestData map[string]json.RawMessage, impData map[string]json.RawMessage, errs []error) {
return testStoredRequestData, testStoredImpData, nil
}

type mockExchange struct {
lastRequest *openrtb.BidRequest
}
Expand Down
4 changes: 4 additions & 0 deletions endpoints/openrtb2/video_auction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,10 @@ func (cf mockVideoStoredReqFetcher) FetchRequests(ctx context.Context, requestID
return testVideoStoredRequestData, testVideoStoredImpData, nil
}

func (cf mockVideoStoredReqFetcher) FetchAllRequests(ctx context.Context) (requestData map[string]json.RawMessage, impData map[string]json.RawMessage, errs []error) {
return testVideoStoredRequestData, testVideoStoredImpData, nil
}

type mockExchangeVideo struct {
lastRequest *openrtb.BidRequest
cache *mockCacheClient
Expand Down
9 changes: 9 additions & 0 deletions stored_requests/backends/empty_fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package empty_fetcher
import (
"context"
"encoding/json"

"github.com/prebid/prebid-server/stored_requests"
)

Expand All @@ -27,6 +28,14 @@ func (fetcher EmptyFetcher) FetchRequests(ctx context.Context, requestIDs []stri
return
}

func (fetcher EmptyFetcher) FetchAllRequests(ctx context.Context) (requestData map[string]json.RawMessage, impData map[string]json.RawMessage, errs []error) {
return
}

func (fetcher EmptyFetcher) FetchCategories(ctx context.Context, primaryAdServer, publisherId, iabCategory string) (string, error) {
return "", nil
}

func (fetcher EmptyFetcher) FetchAllCategories(ctx context.Context) (categories map[string]json.RawMessage, errs []error) {
return
}
55 changes: 41 additions & 14 deletions stored_requests/backends/file_fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,24 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"

"github.com/golang/glog"
"github.com/prebid/prebid-server/stored_requests"
)

// NewFileFetcher lazy-loads various kinds of objects from the given directory
// Expected directory structure:
// Expected directory structure depends on data type - stored requests or categories,
// and may include:
// - stored_requests/{id}.json for stored requests
// - stored_imps/{id}.json for stored imps
// - {adserver}.json non-publisher specific primary adserver categories
// - {adserver}/{adserver}_{account_id}.json publisher specific categories for primary adserver
func NewFileFetcher(directory string) (stored_requests.AllFetcher, error) {
_, err := ioutil.ReadDir(directory)
if err != nil {
if _, err := ioutil.ReadDir(directory); err != nil {
return &fileFetcher{}, err
}
// read - but don't store - all the files to warm os cache
go filepath.Walk(directory, func(path string, info os.FileInfo, err error) error {
if err == nil && filepath.Ext(path) == ".json" {
if _, err := ioutil.ReadFile(path); err != nil {
glog.Warningf("Error reading %s: %v", path, err)
}
}
return nil
})
return &fileFetcher{
StoredRequestsDir: path.Join(directory, "stored_requests"),
StoredImpsDir: path.Join(directory, "stored_imps"),
Expand Down Expand Up @@ -74,13 +64,50 @@ func fetchObjects(dir string, dataType string, ids []string) (jsons map[string]j
return jsons, errs
}

func fetchAllObjects(dir string, glob string, dataType string) (jsons map[string]json.RawMessage, errs []error) {
var ids, files []string
var err error
path := filepath.Join(dir, glob)
if files, err = filepath.Glob(path); err != nil {
return nil, append(errs, fmt.Errorf("Error scanning for %s in %s: %v", dataType, path, err))
}
for _, f := range files {
fn := f[len(dir)+1:] // remove "<dir>/"
id := strings.TrimSuffix(fn, filepath.Ext(fn))
ids = append(ids, id)
}
return fetchObjects(dir, dataType, ids)
}

// FetchRequests fetches the stored requests for the given IDs.
func (fetcher *fileFetcher) FetchRequests(ctx context.Context, requestIDs []string, impIDs []string) (requests map[string]json.RawMessage, imps map[string]json.RawMessage, errs []error) {
requests, reqErrs := fetchObjects(fetcher.StoredRequestsDir, "Request", requestIDs)
imps, impErrs := fetchObjects(fetcher.StoredImpsDir, "Imp", impIDs)
return requests, imps, append(reqErrs, impErrs...)
}

// FetchAllRequests returns comprehensive maps containing all the requests and imps in filesystem
func (fetcher *fileFetcher) FetchAllRequests(ctx context.Context) (requests map[string]json.RawMessage, imps map[string]json.RawMessage, errs []error) {
requests, errs = fetchAllObjects(fetcher.StoredRequestsDir, "*.json", "Request")
imps, impErrs := fetchAllObjects(fetcher.StoredImpsDir, "*.json", "Imp")
return requests, imps, append(errs, impErrs...)
}

// FetchAllCategories loads and stores all the category mappings defined in the filesystem
func (fetcher *fileFetcher) FetchAllCategories(ctx context.Context) (categories map[string]json.RawMessage, errs []error) {
categories, errs = fetchAllObjects(fetcher.CategoriesDir, "*/*.json", "Category")
for name, mapping := range categories {
data := make(map[string]stored_requests.Category)
if err := json.Unmarshal(mapping, &data); err != nil {
errs = append(errs, fmt.Errorf(`Unable to unmarshal categories from "%s/%s.json"`, fetcher.CategoriesDir, name))
delete(categories, name)
} else {
fetcher.Categories[name] = data
}
}
return categories, errs
}

// FetchCategories fetches the ad-server/publisher specific category for the given IAB category
func (fetcher *fileFetcher) FetchCategories(ctx context.Context, primaryAdServer, publisherId, iabCategory string) (string, error) {
var fileName string
Expand Down
56 changes: 32 additions & 24 deletions stored_requests/backends/file_fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ func TestFileFetcher(t *testing.T) {
validateStoredReqOne(t, storedReqs)
validateStoredReqTwo(t, storedReqs)
validateImp(t, storedImps)

storedReqs, storedImps, errs = fetcher.(stored_requests.Fetcher).FetchAllRequests(context.Background())
assertErrorCount(t, 0, errs)

validateStoredReqOne(t, storedReqs)
validateStoredReqTwo(t, storedReqs)
validateImp(t, storedImps)
}

func TestInvalidDirectory(t *testing.T) {
Expand Down Expand Up @@ -97,63 +104,64 @@ func assertErrorCount(t *testing.T, num int, errs []error) {
}
}

func newCategoryFetcher(directory string) (stored_requests.CategoryFetcher, error) {
func newCategoryFetcher(t *testing.T, directory string) stored_requests.CategoryFetcher {
fetcher, err := NewFileFetcher(directory)
if err != nil {
return nil, err
t.Errorf("Failed to create a category Fetcher: %v", err)
}
catfetcher, ok := fetcher.(stored_requests.CategoryFetcher)
if !ok {
return nil, fmt.Errorf("Failed to type cast fetcher to CategoryFetcher")
t.Errorf("Failed to type cast fetcher to CategoryFetcher")
}
return catfetcher, nil
return catfetcher
}

func TestCategoriesFetcherAllCategories(t *testing.T) {
fetcher := newCategoryFetcher(t, "./test/category-mapping")
categories, errs := fetcher.FetchAllCategories(nil)
assertErrorCount(t, 1, errs)
assert.EqualError(t, errs[0], `Unable to unmarshal categories from "test/category-mapping/test/test_broken.json"`)
assert.Equalf(t, len(categories), 2, "Expected 2 categories preloaded, got %d", len(categories))
assert.Containsf(t, categories, "test/test", "test/test missing from preloaded data set")
assert.Containsf(t, categories, "test/test_categories", "test/test_categories missing from preloaded data set")
cat, err := fetcher.FetchCategories(nil, "test", "categories", "IAB1-1")
assert.Nil(t, err, "Unexpected error translating category IAB1-1")
assert.Equal(t, "Beverages", cat, "test/test_categories missing expected category IAB1-1")
cat, err = fetcher.FetchCategories(nil, "test", "", "IAB1-1")
assert.Nil(t, err, "Unexpected error translating category IAB1-1")
assert.Equal(t, "VideoGames", cat, "test/test missing expected category IAB1-1")
}

func TestCategoriesFetcherWithPublisher(t *testing.T) {
fetcher, err := newCategoryFetcher("./test/category-mapping")
if err != nil {
t.Errorf("Failed to create a category Fetcher: %v", err)
}
fetcher := newCategoryFetcher(t, "./test/category-mapping")
category, err := fetcher.FetchCategories(nil, "test", "categories", "IAB1-1")
assert.Equal(t, nil, err, "Categories were loaded incorrectly")
assert.Equal(t, "Beverages", category, "Categories were loaded incorrectly")
}

func TestCategoriesFetcherWithoutPublisher(t *testing.T) {
fetcher, err := newCategoryFetcher("./test/category-mapping")
if err != nil {
t.Errorf("Failed to create a category Fetcher: %v", err)
}
fetcher := newCategoryFetcher(t, "./test/category-mapping")
category, err := fetcher.FetchCategories(nil, "test", "", "IAB1-1")
assert.Equal(t, nil, err, "Categories were loaded incorrectly")
assert.Equal(t, "VideoGames", category, "Categories were loaded incorrectly")
}

func TestCategoriesFetcherNoCategory(t *testing.T) {
fetcher, err := newCategoryFetcher("./test/category-mapping")
if err != nil {
t.Errorf("Failed to create a category Fetcher: %v", err)
}
fetcher := newCategoryFetcher(t, "./test/category-mapping")
_, fetchingErr := fetcher.FetchCategories(nil, "test", "", "IAB1-100")
assert.Equal(t, fmt.Errorf("Unable to find category for adserver 'test', publisherId: '', iab category: 'IAB1-100'"),
fetchingErr, "Categories were loaded incorrectly")
}

func TestCategoriesFetcherBrokenJson(t *testing.T) {
fetcher, err := newCategoryFetcher("./test/category-mapping")
if err != nil {
t.Errorf("Failed to create a category Fetcher: %v", err)
}
fetcher := newCategoryFetcher(t, "./test/category-mapping")
_, fetchingErr := fetcher.FetchCategories(nil, "test", "broken", "IAB1-100")
assert.Equal(t, fmt.Errorf("Unable to unmarshal categories for adserver: 'test', publisherId: 'broken'"),
fetchingErr, "Categories were loaded incorrectly")
}

func TestCategoriesFetcherNoCategoriesFile(t *testing.T) {
fetcher, err := newCategoryFetcher("./test/category-mapping")
if err != nil {
t.Errorf("Failed to create a category Fetcher: %v", err)
}
fetcher := newCategoryFetcher(t, "./test/category-mapping")
_, fetchingErr := fetcher.FetchCategories(nil, "test", "not_exists", "IAB1-100")
assert.Equal(t, fmt.Errorf("Unable to find mapping file for adserver: 'test', publisherId: 'not_exists'"),
fetchingErr, "Categories were loaded incorrectly")
Expand Down
22 changes: 11 additions & 11 deletions stored_requests/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prebid/prebid-server/stored_requests/caches/nil_cache"
"github.com/prebid/prebid-server/stored_requests/events"
apiEvents "github.com/prebid/prebid-server/stored_requests/events/api"
fileEvents "github.com/prebid/prebid-server/stored_requests/events/files"
httpEvents "github.com/prebid/prebid-server/stored_requests/events/http"
postgresEvents "github.com/prebid/prebid-server/stored_requests/events/postgres"
)
Expand Down Expand Up @@ -144,6 +145,7 @@ func resolvedStoredRequestsConfig(cfg *config.Configuration) (auc, amp config.St
// Auction endpoint uses non-Amp fields so can just copy the slin data
auc.Files.Enabled = sr.Files
auc.Files.Path = sr.Path
auc.Files.LazyLoad = sr.FilesLazyLoad
auc.Postgres.ConnectionInfo = sr.Postgres.ConnectionInfo
auc.Postgres.FetcherQueries.QueryTemplate = sr.Postgres.FetcherQueries.QueryTemplate
auc.Postgres.CacheInitialization.Timeout = sr.Postgres.CacheInitialization.Timeout
Expand All @@ -160,21 +162,12 @@ func resolvedStoredRequestsConfig(cfg *config.Configuration) (auc, amp config.St
auc.HTTPEvents.Endpoint = sr.HTTPEvents.Endpoint

// Amp endpoint uses all the slim data but some fields get replacyed by Amp* version of similar fields
amp.Files.Enabled = sr.Files
amp.Files.Path = sr.Path
amp.Postgres.ConnectionInfo = sr.Postgres.ConnectionInfo
amp = auc
amp.Postgres.FetcherQueries.QueryTemplate = sr.Postgres.FetcherQueries.AmpQueryTemplate
amp.Postgres.CacheInitialization.Timeout = sr.Postgres.CacheInitialization.Timeout
amp.Postgres.CacheInitialization.Query = sr.Postgres.CacheInitialization.AmpQuery
amp.Postgres.PollUpdates.RefreshRate = sr.Postgres.PollUpdates.RefreshRate
amp.Postgres.PollUpdates.Timeout = sr.Postgres.PollUpdates.Timeout
amp.Postgres.PollUpdates.Query = sr.Postgres.PollUpdates.AmpQuery
amp.HTTP.Endpoint = sr.HTTP.AmpEndpoint
amp.InMemoryCache = sr.InMemoryCache
amp.CacheEvents.Enabled = sr.CacheEventsAPI
amp.CacheEvents.Endpoint = "/storedrequests/amp"
amp.HTTPEvents.RefreshRate = sr.HTTPEvents.RefreshRate
amp.HTTPEvents.Timeout = sr.HTTPEvents.Timeout
amp.HTTP.Endpoint = sr.HTTP.AmpEndpoint
amp.HTTPEvents.Endpoint = sr.HTTPEvents.AmpEndpoint

return
Expand All @@ -201,6 +194,10 @@ func newFetcher(cfg *config.StoredRequestsSlim, client *http.Client, db *sql.DB)

if cfg.Files.Enabled {
fFetcher := newFilesystem(cfg.Files.Path)
if !cfg.Files.LazyLoad {
// Permanently load all categories in fetcher's internal static map
fFetcher.(stored_requests.CategoryFetcher).FetchAllCategories(context.Background())
}
idList = append(idList, fFetcher)
}
if cfg.Postgres.FetcherQueries.QueryTemplate != "" {
Expand Down Expand Up @@ -244,6 +241,9 @@ func newEventProducers(cfg *config.StoredRequestsSlim, client *http.Client, db *
eventProducers = append(eventProducers, newPostgresPolling(cfg.Postgres.PollUpdates, db, updateStartTime))
}
}
if cfg.Files.Enabled && !cfg.Files.LazyLoad {
eventProducers = append(eventProducers, fileEvents.NewFilesLoader(cfg.Files))
}
return
}

Expand Down
47 changes: 47 additions & 0 deletions stored_requests/events/files/files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package files

import (
"context"

"github.com/golang/glog"
"github.com/prebid/prebid-server/config"
"github.com/prebid/prebid-server/stored_requests"
"github.com/prebid/prebid-server/stored_requests/backends/file_fetcher"
"github.com/prebid/prebid-server/stored_requests/events"
)

type fileEventProducer struct {
invalidations chan events.Invalidation
saves chan events.Save
}

func (f *fileEventProducer) Saves() <-chan events.Save {
return f.saves
}
func (f *fileEventProducer) Invalidations() <-chan events.Invalidation {
return f.invalidations
}

// NewFilesLoader returns an EventProducer preloaded with all the stored reqs+imps
func NewFilesLoader(cfg config.FileFetcherConfig) events.EventProducer {
fp := &fileEventProducer{
saves: make(chan events.Save, 1),
invalidations: make(chan events.Invalidation),
}
if fetcher, err := file_fetcher.NewFileFetcher(cfg.Path); err == nil {
reqData, impData, errs := fetcher.(stored_requests.Fetcher).FetchAllRequests(context.Background())
if len(reqData) > 0 || len(impData) > 0 {
fp.saves <- events.Save{
Requests: reqData,
Imps: impData,
}
}
for _, err := range errs {
glog.Warning(err.Error())
}
} else {
glog.Warningf("Failed to prefetch files from %s: %v", cfg.Path, err)
close(fp.saves)
}
return fp
}
Loading

0 comments on commit 0c48b0b

Please sign in to comment.