Skip to content

Commit

Permalink
feat: return metric with an explicit timestamp from the configured co…
Browse files Browse the repository at this point in the history
…lumn (#429)

* feat: collect timestamp from the column
* docs: add info on the new field
  • Loading branch information
burningalchemist authored Jan 11, 2024
1 parent 711f5f1 commit d0b1974
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 1 deletion.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ metrics:
# Static metric value (optional). Useful in case we are interested in string data (key_labels) only. It's mutually
# exclusive with `values` field.
# static_value: 1
# Timestamp value (optional). Should point at the existing column containing valid timestamps to return a metric
# with an explicit timestamp.
# timestamp_value: CreatedAt
query: |
SELECT Market, max(UpdateTime) AS LastUpdateTime
FROM MarketPrices
Expand Down
1 change: 1 addition & 0 deletions config/metric_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type MetricConfig struct {

NoPreparedStatement bool `yaml:"no_prepared_statement,omitempty"` // do not prepare statement
StaticValue *float64 `yaml:"static_value,omitempty"`
TimestampValue string `yaml:"timestamp_value,omitempty"` // optional column name containing a valid timestamp value

valueType prometheus.ValueType // TypeString converted to prometheus.ValueType
query *QueryConfig // QueryConfig resolved from QueryRef or generated from Query
Expand Down
3 changes: 3 additions & 0 deletions documentation/sql_exporter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ collectors:
# Arbitrary key/value pair
env: dev
region: europe
# Optional timestamp_value to point at the existing timestamp column to return a metric with an explicit
# timestamp.
# timestamp_value: CreatedAt
# This query returns exactly one value per row, in the `counter` column.
values: [counter]
query: |
Expand Down
26 changes: 25 additions & 1 deletion metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"fmt"
"sort"
"time"

"github.com/burningalchemist/sql_exporter/config"
"github.com/burningalchemist/sql_exporter/errors"
Expand Down Expand Up @@ -85,7 +86,15 @@ func (mf MetricFamily) Collect(row map[string]any, ch chan<- Metric) {
}
value := row[v].(sql.NullFloat64)
if value.Valid {
ch <- NewMetric(&mf, value.Float64, labelValues...)
metric := NewMetric(&mf, value.Float64, labelValues...)
if mf.config.TimestampValue == "" {
ch <- metric
} else {
ts := row[mf.config.TimestampValue].(sql.NullTime)
if ts.Valid {
ch <- NewMetricWithTimestamp(ts.Time, metric)
}
}
}
}
if mf.config.StaticValue != nil {
Expand Down Expand Up @@ -286,3 +295,18 @@ func NewInvalidMetric(err errors.WithContext) Metric {
func (m invalidMetric) Desc() MetricDesc { return nil }

func (m invalidMetric) Write(*dto.Metric) errors.WithContext { return m.err }

type timestampedMetric struct {
Metric
t time.Time
}

func (m timestampedMetric) Write(pb *dto.Metric) errors.WithContext {
e := m.Metric.Write(pb)
pb.TimestampMs = proto.Int64(m.t.Unix()*1000 + int64(m.t.Nanosecond()/1000000))
return e
}

func NewMetricWithTimestamp(t time.Time, m Metric) Metric {
return timestampedMetric{Metric: m, t: t}
}
12 changes: 12 additions & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type (
const (
columnTypeKey columnType = 1
columnTypeValue columnType = 2
columnTypeTime columnType = 3
)

// NewQuery returns a new Query that will populate the given metric families.
Expand All @@ -49,6 +50,9 @@ func NewQuery(logContext string, qc *config.QueryConfig, metricFamilies ...*Metr
return nil, err
}
}
if err := setColumnType(logContext, mf.config.TimestampValue, columnTypeTime, columnTypes); err != nil {
return nil, err
}
}

q := Query{
Expand Down Expand Up @@ -153,6 +157,9 @@ func (q *Query) scanDest(rows *sql.Rows) ([]any, errors.WithContext) {
case columnTypeValue:
dest = append(dest, new(sql.NullFloat64))
have[column] = true
case columnTypeTime:
dest = append(dest, new(sql.NullTime))
have[column] = true
default:
if column == "" {
klog.Infof("[%s] Unnamed column %d returned by query", q.logContext, i)
Expand Down Expand Up @@ -199,6 +206,11 @@ func (q *Query) scanRow(rows *sql.Rows, dest []any) (map[string]any, errors.With
klog.V(3).Infof("[%s] Key column %q is NULL", q.logContext, column)
}
result[column] = *dest[i].(*sql.NullString)
case columnTypeTime:
if !dest[i].(*sql.NullTime).Valid {
klog.V(3).Infof("[%s] Time column %q is invalid or NULL", q.logContext, column)
}
result[column] = *dest[i].(*sql.NullTime)
case columnTypeValue:
if !dest[i].(*sql.NullFloat64).Valid {
klog.V(3).Infof("[%s] Value column %q is NULL", q.logContext, column)
Expand Down

0 comments on commit d0b1974

Please sign in to comment.