-
Notifications
You must be signed in to change notification settings - Fork 753
/
config.go
260 lines (230 loc) · 10.8 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
package config
import (
"context"
"net/http"
"time"
"github.com/prebid/prebid-server/v3/metrics"
"github.com/golang/glog"
"github.com/julienschmidt/httprouter"
"github.com/prebid/prebid-server/v3/config"
"github.com/prebid/prebid-server/v3/stored_requests"
"github.com/prebid/prebid-server/v3/stored_requests/backends/db_fetcher"
"github.com/prebid/prebid-server/v3/stored_requests/backends/db_provider"
"github.com/prebid/prebid-server/v3/stored_requests/backends/empty_fetcher"
"github.com/prebid/prebid-server/v3/stored_requests/backends/file_fetcher"
"github.com/prebid/prebid-server/v3/stored_requests/backends/http_fetcher"
"github.com/prebid/prebid-server/v3/stored_requests/caches/memory"
"github.com/prebid/prebid-server/v3/stored_requests/caches/nil_cache"
"github.com/prebid/prebid-server/v3/stored_requests/events"
apiEvents "github.com/prebid/prebid-server/v3/stored_requests/events/api"
databaseEvents "github.com/prebid/prebid-server/v3/stored_requests/events/database"
httpEvents "github.com/prebid/prebid-server/v3/stored_requests/events/http"
"github.com/prebid/prebid-server/v3/util/task"
)
// CreateStoredRequests returns three things:
//
// 1. A Fetcher which can be used to get Stored Requests
// 2. A function which should be called on shutdown for graceful cleanups.
//
// If any errors occur, the program will exit with an error message.
// It probably means you have a bad config or networking issue.
//
// As a side-effect, it will add some endpoints to the router if the config calls for it.
// In the future we should look for ways to simplify this so that it's not doing two things.
func CreateStoredRequests(cfg *config.StoredRequests, metricsEngine metrics.MetricsEngine, client *http.Client, router *httprouter.Router, provider db_provider.DbProvider) (fetcher stored_requests.AllFetcher, shutdown func()) {
// Create database connection if given options for one
if cfg.Database.ConnectionInfo.Database != "" {
if provider == nil {
glog.Infof("Connecting to Database for Stored %s. Driver=%s, DB=%s, host=%s, port=%d, user=%s",
cfg.DataType(),
cfg.Database.ConnectionInfo.Driver,
cfg.Database.ConnectionInfo.Database,
cfg.Database.ConnectionInfo.Host,
cfg.Database.ConnectionInfo.Port,
cfg.Database.ConnectionInfo.Username)
provider = db_provider.NewDbProvider(cfg.DataType(), cfg.Database.ConnectionInfo)
}
// Error out if config is trying to use multiple database connections for different stored requests (not supported yet)
if provider.Config() != cfg.Database.ConnectionInfo {
glog.Fatal("Multiple database connection settings found in config, only a single database connection is currently supported.")
}
}
eventProducers := newEventProducers(cfg, client, provider, metricsEngine, router)
fetcher = newFetcher(cfg, client, provider)
var shutdown1 func()
if cfg.InMemoryCache.Type != "" {
cache := newCache(cfg)
fetcher = stored_requests.WithCache(fetcher, cache, metricsEngine)
shutdown1 = addListeners(cache, eventProducers)
}
shutdown = func() {
if shutdown1 != nil {
shutdown1()
}
if provider == nil {
return
}
if err := provider.Close(); err != nil {
glog.Errorf("Error closing DB connection: %v", err)
}
}
return
}
// NewStoredRequests returns:
//
// 1. A function which should be called on shutdown for graceful cleanups.
// 2. A Fetcher which can be used to get Stored Requests for /openrtb2/auction
// 3. A Fetcher which can be used to get Stored Requests for /openrtb2/amp
// 4. A Fetcher which can be used to get Account data
// 5. A Fetcher which can be used to get Category Mapping data
// 6. A Fetcher which can be used to get Stored Requests for /openrtb2/video
//
// If any errors occur, the program will exit with an error message.
// It probably means you have a bad config or networking issue.
//
// As a side-effect, it will add some endpoints to the router if the config calls for it.
// In the future we should look for ways to simplify this so that it's not doing two things.
func NewStoredRequests(cfg *config.Configuration, metricsEngine metrics.MetricsEngine, client *http.Client, router *httprouter.Router) (shutdown func(),
fetcher stored_requests.Fetcher,
ampFetcher stored_requests.Fetcher,
accountsFetcher stored_requests.AccountFetcher,
categoriesFetcher stored_requests.CategoryFetcher,
videoFetcher stored_requests.Fetcher,
storedRespFetcher stored_requests.Fetcher) {
var provider db_provider.DbProvider
fetcher1, shutdown1 := CreateStoredRequests(&cfg.StoredRequests, metricsEngine, client, router, provider)
fetcher2, shutdown2 := CreateStoredRequests(&cfg.StoredRequestsAMP, metricsEngine, client, router, provider)
fetcher3, shutdown3 := CreateStoredRequests(&cfg.CategoryMapping, metricsEngine, client, router, provider)
fetcher4, shutdown4 := CreateStoredRequests(&cfg.StoredVideo, metricsEngine, client, router, provider)
fetcher5, shutdown5 := CreateStoredRequests(&cfg.Accounts, metricsEngine, client, router, provider)
fetcher6, shutdown6 := CreateStoredRequests(&cfg.StoredResponses, metricsEngine, client, router, provider)
fetcher = fetcher1.(stored_requests.Fetcher)
ampFetcher = fetcher2.(stored_requests.Fetcher)
categoriesFetcher = fetcher3.(stored_requests.CategoryFetcher)
videoFetcher = fetcher4.(stored_requests.Fetcher)
accountsFetcher = fetcher5.(stored_requests.AccountFetcher)
storedRespFetcher = fetcher6.(stored_requests.Fetcher)
shutdown = func() {
shutdown1()
shutdown2()
shutdown3()
shutdown4()
shutdown5()
shutdown6()
}
return
}
func addListeners(cache stored_requests.Cache, eventProducers []events.EventProducer) (shutdown func()) {
listeners := make([]*events.EventListener, 0, len(eventProducers))
for _, ep := range eventProducers {
listener := events.SimpleEventListener()
go listener.Listen(cache, ep)
listeners = append(listeners, listener)
}
return func() {
for _, l := range listeners {
l.Stop()
}
}
}
func newFetcher(cfg *config.StoredRequests, client *http.Client, provider db_provider.DbProvider) (fetcher stored_requests.AllFetcher) {
idList := make(stored_requests.MultiFetcher, 0, 3)
if cfg.Files.Enabled {
fFetcher := newFilesystem(cfg.DataType(), cfg.Files.Path)
idList = append(idList, fFetcher)
}
if cfg.Database.FetcherQueries.QueryTemplate != "" {
glog.Infof("Loading Stored %s data via Database.\nQuery: %s", cfg.DataType(), cfg.Database.FetcherQueries.QueryTemplate)
idList = append(idList, db_fetcher.NewFetcher(provider,
cfg.Database.FetcherQueries.QueryTemplate, cfg.Database.FetcherQueries.QueryTemplate))
} else if cfg.Database.CacheInitialization.Query != "" && cfg.Database.PollUpdates.Query != "" {
//in this case data will be loaded to cache via poll for updates event
idList = append(idList, empty_fetcher.EmptyFetcher{})
}
if cfg.HTTP.Endpoint != "" {
glog.Infof("Loading Stored %s data via HTTP. endpoint=%s", cfg.DataType(), cfg.HTTP.Endpoint)
idList = append(idList, http_fetcher.NewFetcher(client, cfg.HTTP.Endpoint))
}
fetcher = consolidate(cfg.DataType(), idList)
return
}
func newCache(cfg *config.StoredRequests) stored_requests.Cache {
cache := stored_requests.Cache{
Requests: &nil_cache.NilCache{},
Imps: &nil_cache.NilCache{},
Responses: &nil_cache.NilCache{},
Accounts: &nil_cache.NilCache{},
}
switch {
case cfg.InMemoryCache.Type == "none":
glog.Warningf("No %s cache configured. The %s Fetcher backend will be used for all data requests", cfg.DataType(), cfg.DataType())
case cfg.DataType() == config.AccountDataType:
cache.Accounts = memory.NewCache(cfg.InMemoryCache.Size, cfg.InMemoryCache.TTL, "Accounts")
default:
cache.Requests = memory.NewCache(cfg.InMemoryCache.RequestCacheSize, cfg.InMemoryCache.TTL, "Requests")
cache.Imps = memory.NewCache(cfg.InMemoryCache.ImpCacheSize, cfg.InMemoryCache.TTL, "Imps")
cache.Responses = memory.NewCache(cfg.InMemoryCache.RespCacheSize, cfg.InMemoryCache.TTL, "Responses")
}
return cache
}
func newEventProducers(cfg *config.StoredRequests, client *http.Client, provider db_provider.DbProvider, metricsEngine metrics.MetricsEngine, router *httprouter.Router) (eventProducers []events.EventProducer) {
if cfg.CacheEvents.Enabled {
eventProducers = append(eventProducers, newEventsAPI(router, cfg.CacheEvents.Endpoint))
}
if cfg.HTTPEvents.RefreshRate != 0 && cfg.HTTPEvents.Endpoint != "" {
eventProducers = append(eventProducers, newHttpEvents(client, cfg.HTTPEvents.TimeoutDuration(), cfg.HTTPEvents.RefreshRateDuration(), cfg.HTTPEvents.Endpoint))
}
if cfg.Database.CacheInitialization.Query != "" {
dbEventCfg := databaseEvents.DatabaseEventProducerConfig{
Provider: provider,
RequestType: cfg.DataType(),
CacheInitQuery: cfg.Database.CacheInitialization.Query,
CacheInitTimeout: time.Duration(cfg.Database.CacheInitialization.Timeout) * time.Millisecond,
CacheUpdateQuery: cfg.Database.PollUpdates.Query,
CacheUpdateTimeout: time.Duration(cfg.Database.PollUpdates.Timeout) * time.Millisecond,
MetricsEngine: metricsEngine,
}
dbEventProducer := databaseEvents.NewDatabaseEventProducer(dbEventCfg)
fetchInterval := time.Duration(cfg.Database.PollUpdates.RefreshRate) * time.Second
dbEventTickerTask := task.NewTickerTask(fetchInterval, dbEventProducer)
dbEventTickerTask.Start()
eventProducers = append(eventProducers, dbEventProducer)
}
return
}
func newEventsAPI(router *httprouter.Router, endpoint string) events.EventProducer {
producer, handler := apiEvents.NewEventsAPI()
router.POST(endpoint, handler)
router.DELETE(endpoint, handler)
return producer
}
func newHttpEvents(client *http.Client, timeout time.Duration, refreshRate time.Duration, endpoint string) events.EventProducer {
ctxProducer := func() (ctx context.Context, canceller func()) {
return context.WithTimeout(context.Background(), timeout)
}
return httpEvents.NewHTTPEvents(client, endpoint, ctxProducer, refreshRate)
}
func newFilesystem(dataType config.DataType, configPath string) stored_requests.AllFetcher {
glog.Infof("Loading Stored %s data from filesystem at path %s", dataType, configPath)
fetcher, err := file_fetcher.NewFileFetcher(configPath)
if err != nil {
glog.Fatalf("Failed to create a %s FileFetcher: %v", dataType, err)
}
return fetcher
}
// consolidate returns a single Fetcher from an array of fetchers of any size.
func consolidate(dataType config.DataType, fetchers []stored_requests.AllFetcher) stored_requests.AllFetcher {
if len(fetchers) == 0 {
switch dataType {
case config.RequestDataType:
glog.Warning("No Stored Request support configured. request.imp[i].ext.prebid.storedrequest will be ignored. If you need this, check your app config")
default:
glog.Warningf("No Stored %s support configured. If you need this, check your app config", dataType)
}
return empty_fetcher.EmptyFetcher{}
} else if len(fetchers) == 1 {
return fetchers[0]
} else {
return stored_requests.MultiFetcher(fetchers)
}
}