Skip to content

Commit

Permalink
Change how number of pending messages is calculated and add more erro…
Browse files Browse the repository at this point in the history
…r handling. (#533)

* Fix scale from zero

* be sure to check message count also

* fix comment wording

* Better debugging for 404 on channel endpoint

* Compare count to last seq

* Change how number of messages are tracked

* Clean up logging

* Rename variable so easier to understand

* Log the combined queue name

* Update the correct line

* Simplify `hasPendingMessage` logic
  • Loading branch information
cwoolum authored and ahmelsayed committed Jan 7, 2020
1 parent f2aaa35 commit b2b57f3
Showing 1 changed file with 43 additions and 15 deletions.
58 changes: 43 additions & 15 deletions pkg/scalers/stan_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,47 +108,75 @@ func parseStanMetadata(metadata map[string]string) (stanMetadata, error) {

// IsActive determines if we need to scale from zero
func (s *stanScaler) IsActive(ctx context.Context) (bool, error) {
resp, err := http.Get(s.getMonitoringEndpoint())
monitoringEndpoint := s.getMonitoringEndpoint()

resp, err := http.Get(monitoringEndpoint)
if err != nil {
stanLog.Error(err, "Unable to access the nats streaming broker monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.natsServerMonitoringEndpoint)
return false, err
}

if resp.StatusCode == 404 {
baseResp, _ := http.Get(s.getSTANChannelsEndpoint())

if baseResp.StatusCode == 404 {
stanLog.Info("Streaming broker endpoint returned 404. Please ensure it has been created", "url", monitoringEndpoint, "channelName", s.metadata.subject)

} else {
stanLog.Info("Unable to connect to STAN. Please ensure you have configured the ScaledObject with the correct endpoint.", "baseResp.StatusCode", baseResp.StatusCode, "natsServerMonitoringEndpoint", s.metadata.natsServerMonitoringEndpoint)
}

return false, err
}

defer resp.Body.Close()
json.NewDecoder(resp.Body).Decode(&s.channelInfo)

return s.hasPendingMessage() || s.getMaxMsgLag() > 0, nil
}

func (s *stanScaler) getMonitoringEndpoint() string {
return "http://" + s.metadata.natsServerMonitoringEndpoint + "/streaming/channelsz?" + "channel=" + s.metadata.subject + "&subs=1"
func (s *stanScaler) getSTANChannelsEndpoint() string {
return "http://" + s.metadata.natsServerMonitoringEndpoint + "/streaming/channelsz"
}

func (s *stanScaler) getTotalMessages() int64 {
return s.channelInfo.MsgCount
func (s *stanScaler) getMonitoringEndpoint() string {
return s.getSTANChannelsEndpoint() + "?channel=" + s.metadata.subject + "&subs=1"
}

func (s *stanScaler) getMaxMsgLag() int64 {
var maxValue int64
maxValue = 0
maxValue := int64(0)
combinedQueueName := s.metadata.durableName + ":" + s.metadata.queueGroup

for _, subs := range s.channelInfo.Subscriber {
if subs.LastSent > maxValue && subs.QueueName == (s.metadata.durableName+":"+s.metadata.queueGroup) {
if subs.LastSent > maxValue && subs.QueueName == combinedQueueName {
maxValue = subs.LastSent
}
}

return s.channelInfo.MsgCount - maxValue
return s.channelInfo.LastSequence - maxValue
}

func (s *stanScaler) hasPendingMessage() bool {
var hasPending bool
hasPending = false
func (s *stanScaler) hasPendingMessage() bool {
subscriberFound := false
combinedQueueName := s.metadata.durableName + ":" + s.metadata.queueGroup

for _, subs := range s.channelInfo.Subscriber {
if subs.PendingCount > 0 && subs.QueueName == (s.metadata.durableName+":"+s.metadata.queueGroup) {
hasPending = true
if subs.QueueName == combinedQueueName {
subscriberFound = true

if subs.PendingCount > 0 {
return true
}

break
}
}

return hasPending
if !subscriberFound {
stanLog.Info("The STAN subscription was not found.", "combinedQueueName", combinedQueueName)
}

return false
}

func (s *stanScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
Expand Down

0 comments on commit b2b57f3

Please sign in to comment.