Skip to content

Commit

Permalink
OTT-1104: Move the stat-server from HB to prebid (prebid#504)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashishshinde-pubm committed Jun 12, 2023
1 parent 5e821e9 commit 688f883
Show file tree
Hide file tree
Showing 14 changed files with 3,505 additions and 1 deletion.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ require (
gopkg.in/yaml.v3 v3.0.1
)

require github.com/go-sql-driver/mysql v1.7.0
require (
github.com/go-sql-driver/mysql v1.7.0
github.com/golang/mock v1.6.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
64 changes: 64 additions & 0 deletions modules/pubmatic/openwrap/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package metrics

// MetricsEngine is a generic interface to record PBS metrics into the desired backend
type MetricsEngine interface {
RecordOpenWrapServerPanicStats()
RecordPublisherPartnerStats(publisher, partner string)
RecordPublisherPartnerImpStats(publisher, partner string, impCount int)
RecordPublisherPartnerNoCookieStats(publisher, partner string)
RecordPartnerTimeoutErrorStats(publisher, partner string)
RecordNobiderStatusErrorStats(publisher, partner string)
RecordNobidErrorStats(publisher, partner string)
RecordUnkownPrebidErrorStats(publisher, partner string)
RecordSlotNotMappedErrorStats(publisher, partner string)
RecordMisConfigurationErrorStats(publisher, partner string)
RecordPublisherProfileRequests(publisher, profileID string)
RecordPublisherInvalidProfileImpressions(publisher, profileID string, impCount int)
RecordPublisherNoConsentRequests(publisher string)
RecordPublisherNoConsentImpressions(publisher string, impCount int)
RecordPublisherRequestStats(publisher string)
RecordNobidErrPrebidServerRequests(publisher string)
RecordNobidErrPrebidServerResponse(publisher string)
RecordInvalidCreativeStats(publisher, partner string)
RecordPlatformPublisherPartnerReqStats(platform, publisher, partner string)
RecordPlatformPublisherPartnerResponseStats(platform, publisher, partner string)
RecordPublisherResponseEncodingErrorStats(publisher string)
RecordPartnerResponseTimeStats(publisher, partner string, responseTime int)
RecordPublisherResponseTimeStats(publisher string, responseTime int)
RecordPublisherWrapperLoggerFailure(publisher, profileID, versionID string)
RecordCacheErrorRequests(endpoint string, publisher string, profileID string)
RecordPublisherInvalidProfileRequests(endpoint, publisher, profileID string)
RecordBadRequests(endpoint string, errorCode int)
RecordPrebidTimeoutRequests(publisher, profileID string)
RecordSSTimeoutRequests(publisher, profileID string)
RecordUidsCookieNotPresentErrorStats(publisher, profileID string)
RecordVideoInstlImpsStats(publisher, profileID string)
RecordImpDisabledViaConfigStats(impType, publisher, profileID string)
RecordPreProcessingTimeStats(publisher string, processingTime int)
RecordStatsKeyCTVPrebidFailedImpression(errorcode int, publisher string, profile string)
RecordCTVRequests(endpoint string, platform string)
RecordPublisherRequests(endpoint string, publisher string, platform string)
RecordCTVHTTPMethodRequests(endpoint string, publisher string, method string)
RecordCTVInvalidReasonCount(errorCode int, publisher string)
RecordCTVIncompleteAdPodsCount(impCount int, reason string, publisher string)
RecordCTVReqImpsWithDbConfigCount(publisher string)
RecordCTVReqImpsWithReqConfigCount(publisher string)
RecordAdPodGeneratedImpressionsCount(impCount int, publisher string)
RecordRequestAdPodGeneratedImpressionsCount(impCount int, publisher string)
RecordAdPodSecondsMissedCount(seconds int, publisher string)
RecordReqImpsWithAppContentCount(publisher string)
RecordReqImpsWithSiteContentCount(publisher string)
RecordAdPodImpressionYield(maxDuration int, minDuration int, publisher string)
RecordCTVReqCountWithAdPod(publisherID, profileID string)
RecordCTVKeyBidDuration(duration int, publisherID, profileID string)
RecordAdomainPresentStats(creativeType, publisher, partner string)
RecordAdomainAbsentStats(creativeType, publisher, partner string)
RecordCatPresentStats(creativeType, publisher, partner string)
RecordCatAbsentStats(creativeType, publisher, partner string)
RecordPBSAuctionRequestsStats()
RecordInjectTrackerErrorCount(adformat, publisher, partner string)
RecordBidResponseByDealCountInPBS(publisher, profile, aliasBidder, dealId string)
RecordBidResponseByDealCountInHB(publisher, profile, aliasBidder, dealId string)
RecordPartnerTimeoutInPBS(publisher, profile, aliasBidder string)
RecordVideoImpDisabledViaConnTypeStats(publisher, profileID string)
}
167 changes: 167 additions & 0 deletions modules/pubmatic/openwrap/metrics/stats/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package stats

import (
"bytes"
"encoding/json"
"fmt"
"net"
"net/http"
"time"

"github.com/alitto/pond"
"github.com/golang/glog"
)

type HttpClient interface {
Do(req *http.Request) (*http.Response, error)
}

// TrySubmit attempts to send a task to this worker pool for execution. If the queue is full,
// it will not wait for a worker to become idle. It returns true if it was able to dispatch
// the task and false otherwise.
type WorkerPool interface {
TrySubmit(task func()) bool
}

// Client is a StatClient. All stats related operation will be done using this.
type Client struct {
config *config
httpClient HttpClient
endpoint string
pubChan chan stat
pubTicker *time.Ticker
statMap map[string]int
shutDownChan chan struct{}
pool WorkerPool
}

// NewClient will validate the Config provided and return a new Client
func NewClient(cfg *config) (*Client, error) {
if err := cfg.validate(); err != nil {
return nil, fmt.Errorf("invalid stats client configurations:%s", err.Error())
}

client := &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: time.Duration(cfg.DialTimeout) * time.Second,
KeepAlive: time.Duration(cfg.KeepAliveDuration) * time.Minute,
}).DialContext,
MaxIdleConns: cfg.MaxIdleConns,
MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
ResponseHeaderTimeout: time.Duration(cfg.ResponseHeaderTimeout) * time.Second,
},
}

c := &Client{
config: cfg,
httpClient: client,
endpoint: cfg.Endpoint,
pubChan: make(chan stat, cfg.MaxChannelLength),
pubTicker: time.NewTicker(time.Duration(cfg.PublishingInterval) * time.Minute),
statMap: make(map[string]int),
shutDownChan: make(chan struct{}),
pool: pond.New(cfg.PoolMaxWorkers, cfg.PoolMaxCapacity),
}

go c.process()

return c, nil
}

// ShutdownProcess will perform the graceful shutdown operation
func (sc *Client) ShutdownProcess() {
sc.shutDownChan <- struct{}{}
}

// PublishStat will push a stat to pubChan channel.
func (sc *Client) PublishStat(key string, value int) {
sc.pubChan <- stat{Key: key, Value: value}
}

// process function will keep listening on the pubChan
// It will publish the stats to server if
// (1) number of stats reaches the PublishingThreshold or,
// (2) PublishingInterval timeout occurs
func (sc *Client) process() {

for {
select {
case stat := <-sc.pubChan:
sc.statMap[stat.Key] = sc.statMap[stat.Key] + stat.Value
if len(sc.statMap) >= sc.config.PublishingThreshold {
sc.prepareStatsForPublishing()
sc.pubTicker.Reset(time.Duration(sc.config.PublishingInterval) * time.Minute)
}

case <-sc.pubTicker.C:
sc.prepareStatsForPublishing()

case <-sc.shutDownChan:
sc.prepareStatsForPublishing()
return
}
}
}

// prepareStatsForPublishing creates copy of map containing stat-key and value
// and calls publishStatsToServer to publishes it to the stat-server
func (sc *Client) prepareStatsForPublishing() {
if len(sc.statMap) != 0 {
collectedStats := sc.statMap
sc.statMap = map[string]int{}
status := sc.pool.TrySubmit(func() {
sc.publishStatsToServer(collectedStats)
})
if !status {
glog.Errorf("[stats_fail] Failed to submit the publishStatsToServer task containing %d record to pool", len(collectedStats))
}
}
}

// publishStatsToServer sends the stats to the stat-server
// in case of failure, it retries to send for Client.config.Retries number of times.
func (sc *Client) publishStatsToServer(statMap map[string]int) int {

sb, err := json.Marshal(statMap)
if err != nil {
glog.Errorf("[stats_fail] Json unmarshal fail: %v", err)
return statusSetupFail
}

req, err := http.NewRequest(http.MethodPost, sc.endpoint, bytes.NewBuffer(sb))
if err != nil {
glog.Errorf("[stats_fail] Failed to form request to sent stats to server: %v", err)
return statusSetupFail
}

req.Header.Add(contentType, applicationJSON)
for retry := 0; retry < sc.config.Retries; retry++ {

startTime := time.Now()
resp, err := sc.httpClient.Do(req)
elapsedTime := time.Since(startTime)

code := 0
if resp != nil {
code = resp.StatusCode
defer resp.Body.Close()
}

if err == nil && code == http.StatusOK {
glog.Infof("[stats_success] retry:[%d] nstats:[%d] time:[%v]", retry, len(statMap), elapsedTime)
return statusPublishSuccess
}

if retry == (sc.config.Retries - 1) {
glog.Errorf("[stats_fail] retry:[%d] status:[%d] nstats:[%d] time:[%v] error:[%v]", retry, code, len(statMap), elapsedTime, err)
break
}

if sc.config.retryInterval > 0 {
time.Sleep(time.Duration(sc.config.retryInterval) * time.Second)
}
}

return statusPublishFail
}
81 changes: 81 additions & 0 deletions modules/pubmatic/openwrap/metrics/stats/client_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package stats

import (
"errors"
)

// config will have the information required to initialise a stats client
type config struct {
Endpoint string // stat-server's endpoint
PublishingInterval int // interval (in minutes) to publish stats to server
PublishingThreshold int // publish stats if number of stat-records present in map is higher than this threshold
Retries int // max retries to publish stats to server
DialTimeout int // http connection dial-timeout (in seconds)
KeepAliveDuration int // http connection keep-alive-duration (in minutes)
MaxIdleConns int // maximum idle connections across all hosts
MaxIdleConnsPerHost int // maximum idle connections per host
retryInterval int // if failed to publish stat then wait for retryInterval seconds for next attempt
ResponseHeaderTimeout int // amount of time (in seconds) to wait for server's response header
MaxChannelLength int // max number of stat keys
PoolMaxWorkers int // max number of workers that will actually send the data to stats-server
PoolMaxCapacity int // number of tasks that can be submitted to the pool without blocking
}

func (c *config) validate() (err error) {
if c.Endpoint == "" {
return errors.New("stat server endpoint cannot be empty")
}

if c.PublishingInterval < minPublishingInterval {
c.PublishingInterval = minPublishingInterval
} else if c.PublishingInterval > maxPublishingInterval {
c.PublishingInterval = maxPublishingInterval
}

if c.Retries > 0 {
if c.Retries > (c.PublishingInterval*60)/minRetryDuration {
c.Retries = (c.PublishingInterval * 60) / minRetryDuration
c.retryInterval = minRetryDuration
} else {
c.retryInterval = (c.PublishingInterval * 60) / c.Retries
}
}

if c.DialTimeout < minDialTimeout {
c.DialTimeout = minDialTimeout
}

if c.KeepAliveDuration < minKeepAliveDuration {
c.KeepAliveDuration = minKeepAliveDuration
}

if c.MaxIdleConns < 0 {
c.MaxIdleConns = 0
}

if c.MaxIdleConnsPerHost < 0 {
c.MaxIdleConnsPerHost = 0
}

if c.PublishingThreshold < minPublishingThreshold {
c.PublishingThreshold = minPublishingThreshold
}

if c.ResponseHeaderTimeout < minResponseHeaderTimeout {
c.ResponseHeaderTimeout = minResponseHeaderTimeout
}

if c.MaxChannelLength < minChannelLength {
c.MaxChannelLength = minChannelLength
}

if c.PoolMaxWorkers < minPoolWorker {
c.PoolMaxWorkers = minPoolWorker
}

if c.PoolMaxCapacity < minPoolCapacity {
c.PoolMaxCapacity = minPoolCapacity
}

return nil
}
Loading

0 comments on commit 688f883

Please sign in to comment.