Skip to content

Commit

Permalink
further progress
Browse files Browse the repository at this point in the history
  • Loading branch information
eparker-tulip committed Jul 11, 2024
1 parent d0264d6 commit ec77b92
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 42 deletions.
41 changes: 0 additions & 41 deletions lib/oplog/saturation_metric.go

This file was deleted.

3 changes: 3 additions & 0 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOr
sendMetricsData()
}

// Increment saturation metric for each publication
redispub.MetricSaturationInc()

// determine which shard this message should route to
// inIdx and outIdx may be different if there are different #s of read and write routines
outIdx := assignToShard(pub.ParallelismKey, len(out))
Expand Down
13 changes: 12 additions & 1 deletion lib/redispub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ var metricLastOplogEntryStaleness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Help: "Gauge recording the difference between this server's clock and the timestamp on the last published oplog entry.",
}, []string{"ordinal"})

var metricSaturation = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "otr",
Name: "saturation_delta",
Help: "Delta between incoming and outgoing",
})

// PublishStream reads Publications from the given channel and publishes them
// to Redis.
func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts *PublishOpts, stop <-chan bool, ordinal int) {
Expand Down Expand Up @@ -141,6 +147,10 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts
}
}

func MetricSaturationInc() {
metricSaturation.Inc()
}

func publishSingleMessageWithRetries(p *Publication, maxRetries int, sleepTime time.Duration, publishFn func(p *Publication) error) error {
if p == nil {
return errors.New("Nil Redis publication")
Expand All @@ -160,7 +170,8 @@ func publishSingleMessageWithRetries(p *Publication, maxRetries int, sleepTime t
retries++
time.Sleep(sleepTime)
} else {
// success, return
// success, decrement outstanding publication count and return
metricSaturation.Dec()
return nil
}
}
Expand Down

0 comments on commit ec77b92

Please sign in to comment.