Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/services/vrf/listener_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (lsn *listenerV1) Start() error {
func (lsn *listenerV1) extractConfirmedLogs() []request {
lsn.reqsMu.Lock()
defer lsn.reqsMu.Unlock()
updateQueueSize(lsn.job.Name.ValueOrZero(), lsn.job.ExternalJobID, v1, len(lsn.reqs))
var toProcess, toKeep []request
for i := 0; i < len(lsn.reqs); i++ {
if lsn.reqs[i].confirmedAtBlock <= lsn.getLatestHead() {
Expand Down Expand Up @@ -311,6 +312,7 @@ func (lsn *listenerV1) getConfirmedAt(req *solidity_vrf_coordinator_interface.VR
"blockHash", req.Raw.BlockHash,
"reqID", hex.EncodeToString(req.RequestID[:]),
"newConfs", newConfs)
incDupeReqs(lsn.job.Name.ValueOrZero(), lsn.job.ExternalJobID, v1)
}
return req.Raw.BlockNumber + newConfs
}
Expand Down Expand Up @@ -397,6 +399,7 @@ func (lsn *listenerV1) ProcessRequest(req *solidity_vrf_coordinator_interface.VR
"keyHash", hex.EncodeToString(req.KeyHash[:]),
"reqTxHash", req.Raw.TxHash,
)
incProcessedReqs(lsn.job.Name.ValueOrZero(), lsn.job.ExternalJobID, v1)
}
}
}
Expand All @@ -415,6 +418,7 @@ func (lsn *listenerV1) HandleLog(lb log.Broadcast) {
wasOverCapacity := lsn.reqLogs.Deliver(lb)
if wasOverCapacity {
lsn.l.Error("l mailbox is over capacity - dropped the oldest l")
incDroppedReqs(lsn.job.Name.ValueOrZero(), lsn.job.ExternalJobID, v1, reasonMailboxSize)
}
}

Expand Down
11 changes: 8 additions & 3 deletions core/services/vrf/listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (lsn *listenerV2) getLatestHead() uint64 {
func (lsn *listenerV2) getAndRemoveConfirmedLogsBySub(latestHead uint64) map[uint64][]pendingRequest {
lsn.reqsMu.Lock()
defer lsn.reqsMu.Unlock()
updateQueueSize(lsn.job.Name.ValueOrZero(), lsn.job.ExternalJobID, v2, uniqueReqs(lsn.reqs))
var toProcess = make(map[uint64][]pendingRequest)
var toKeep []pendingRequest
for i := 0; i < len(lsn.reqs); i++ {
Expand Down Expand Up @@ -330,6 +331,7 @@ func (lsn *listenerV2) processRequestsPerSub(
rlog.Infow("Request too old, dropping it")
lsn.markLogAsConsumed(req.lb)
processed[vrfRequest.RequestId.String()] = struct{}{}
incDroppedReqs(lsn.job.Name.ValueOrZero(), lsn.job.ExternalJobID, v2, reasonAge)
continue
}

Expand Down Expand Up @@ -395,6 +397,7 @@ func (lsn *listenerV2) processRequestsPerSub(
// And loop to attempt to enqueue another fulfillment
startBalanceNoReserveLink = startBalanceNoReserveLink.Sub(startBalanceNoReserveLink, maxLink)
processed[vrfRequest.RequestId.String()] = struct{}{}
incProcessedReqs(lsn.job.Name.ValueOrZero(), lsn.job.ExternalJobID, v2)
}
// Remove all the confirmed logs
var toKeep []pendingRequest
Expand All @@ -412,7 +415,7 @@ func (lsn *listenerV2) processRequestsPerSub(
"total reqs", len(reqs),
"total processed", len(processed),
"total remaining", len(toKeep),
"total unique", len(toRequestSet(reqs)),
"total unique", uniqueReqs(reqs),
)
}

Expand Down Expand Up @@ -576,6 +579,7 @@ func (lsn *listenerV2) getConfirmedAt(req *vrf_coordinator_v2.VRFCoordinatorV2Ra
"blockHash", req.Raw.BlockHash,
"reqID", req.RequestId.String(),
"newConfs", newConfs)
incDupeReqs(lsn.job.Name.ValueOrZero(), lsn.job.ExternalJobID, v2)
}
return req.Raw.BlockNumber + newConfs
}
Expand Down Expand Up @@ -647,6 +651,7 @@ func (lsn *listenerV2) HandleLog(lb log.Broadcast) {
wasOverCapacity := lsn.reqLogs.Deliver(lb)
if wasOverCapacity {
lsn.l.Error("Log mailbox is over capacity - dropped the oldest log")
incDroppedReqs(lsn.job.Name.ValueOrZero(), lsn.job.ExternalJobID, v2, reasonMailboxSize)
}
}

Expand All @@ -655,12 +660,12 @@ func (lsn *listenerV2) JobID() int32 {
return lsn.job.ID
}

func toRequestSet(reqs []pendingRequest) map[string]struct{} {
func uniqueReqs(reqs []pendingRequest) int {
s := map[string]struct{}{}
for _, r := range reqs {
s[r.req.RequestId.String()] = struct{}{}
}
return s
return len(s)
}

// GasProofVerification is an upper limit on the gas used for verifying the VRF proof on-chain.
Expand Down
67 changes: 67 additions & 0 deletions core/services/vrf/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package vrf

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
uuid "github.com/satori/go.uuid"
)

// version describes a VRF version.
type version string

const (
v1 version = "v1"
v2 version = "v2"
)

// dropReason describes a reason why a VRF request is dropped from the queue.
type dropReason string

const (
// reasonMailboxSize describes when a VRF request is dropped due to the log mailbox being
// over capacity.
reasonMailboxSize dropReason = "mailbox_size"

// reasonAge describes when a VRF request is dropped due to its age.
reasonAge dropReason = "age"
)

var (
metricQueueSize = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "vrf_request_queue_size",
Help: "The number of VRF requests currently in the in-memory queue.",
}, []string{"job_name", "external_job_id", "vrf_version"})

metricProcessedReqs = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "vrf_processed_request_count",
Help: "The number of VRF requests processed.",
}, []string{"job_name", "external_job_id", "vrf_version"})

metricDroppedRequests = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "vrf_dropped_request_count",
Help: "The number of VRF requests dropped due to reasons such as expiry or mailbox size.",
}, []string{"job_name", "external_job_id", "vrf_version", "drop_reason"})

metricDupeRequests = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "vrf_duplicate_requests",
Help: "The number of times the VRF listener receives duplicate requests, which could indicate a reorg.",
}, []string{"job_name", "external_job_id", "vrf_version"})
)

func updateQueueSize(jobName string, extJobID uuid.UUID, vrfVersion version, size int) {
metricQueueSize.WithLabelValues(jobName, extJobID.String(), string(vrfVersion)).
Set(float64(size))
}

func incProcessedReqs(jobName string, extJobID uuid.UUID, vrfVersion version) {
metricProcessedReqs.WithLabelValues(jobName, extJobID.String(), string(vrfVersion)).Inc()
}

func incDroppedReqs(jobName string, extJobID uuid.UUID, vrfVersion version, reason dropReason) {
metricDroppedRequests.WithLabelValues(
jobName, extJobID.String(), string(vrfVersion), string(reason)).Inc()
}

func incDupeReqs(jobName string, extJobID uuid.UUID, vrfVersion version) {
metricDupeRequests.WithLabelValues(jobName, extJobID.String(), string(vrfVersion)).Inc()
}