Skip to content

Commit

Permalink
add docappender.blocked.add counter
Browse files Browse the repository at this point in the history
This monotonic counter increments every time Appender.Add
detects that bulkItems channel length and capacity are
equal before sending the created BulkIndexerItem to the
channel.

As bulkItems is a buffered channel, monitoring length and
capacity allow us to detect possible blocking events when
sending to the channel due to exhausted capacity.

As Appender.Add could be called from multiple goroutines,
we are not guaranteed that every time this metric is
incremented the channel send will block, but should give us
a reasonably approximated picture.
  • Loading branch information
endorama committed Dec 10, 2024
1 parent d7a82ed commit 21b4580
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 0 deletions.
5 changes: 5 additions & 0 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Appender struct {
availableBulkRequests int64
activeCreated int64
activeDestroyed int64
blockedAdd int64

scalingInfo atomic.Value

Expand Down Expand Up @@ -278,6 +279,10 @@ func (a *Appender) Add(ctx context.Context, index string, document io.WriterTo)
Index: index,
Body: document,
}
if len(a.bulkItems) == cap(a.bulkItems) {
a.addCount(1, &a.blockedAdd, a.metrics.blockedAdd)
}

select {
case <-ctx.Done():
return ctx.Err()
Expand Down
6 changes: 6 additions & 0 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type metrics struct {
availableBulkRequests metric.Int64UpDownCounter
activeCreated metric.Int64Counter
activeDestroyed metric.Int64Counter
blockedAdd metric.Int64Counter
}

type histogramMetric struct {
Expand Down Expand Up @@ -132,6 +133,11 @@ func newMetrics(cfg Config) (metrics, error) {
description: "The number of times an active indexer was destroyed.",
p: &ms.activeDestroyed,
},
{
name: "docappender.blocked.add",
description: "The number of times Add could block due to exhausted capacity in bulkItems channel",
p: &ms.blockedAdd,
},
}
for _, m := range counters {
err := newInt64Counter(meter, m)
Expand Down

0 comments on commit 21b4580

Please sign in to comment.