-
Notifications
You must be signed in to change notification settings - Fork 769
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #183 from roidelapluie/pth
Implement heartbeat metrics
- Loading branch information
Showing
5 changed files
with
168 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
// Scrape heartbeat data. | ||
|
||
package collector | ||
|
||
import ( | ||
"database/sql" | ||
"fmt" | ||
"strconv" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
) | ||
|
||
const ( | ||
// heartbeat is the Metric subsystem we use. | ||
heartbeat = "heartbeat" | ||
// heartbeatQuery is the query used to fetch the stored and current | ||
// timestamps. %s will be replaced by the database and table name. | ||
// The second column allows gets the server timestamp at the exact same | ||
// time the query is run. | ||
heartbeatQuery = "SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(NOW(6)), server_id from `%s`.`%s`" | ||
) | ||
|
||
// Metric descriptors. | ||
var ( | ||
HeartbeatStoredDesc = prometheus.NewDesc( | ||
prometheus.BuildFQName(namespace, heartbeat, "stored_timestamp_seconds"), | ||
"Timestamp stored in the heartbeat table.", | ||
[]string{"server_id"}, nil, | ||
) | ||
HeartbeatNowDesc = prometheus.NewDesc( | ||
prometheus.BuildFQName(namespace, heartbeat, "now_timestamp_seconds"), | ||
"Timestamp of the current server.", | ||
[]string{"server_id"}, nil, | ||
) | ||
) | ||
|
||
// ScrapeHeartbeat scrapes from the heartbeat table. | ||
// This is mainly targeting pt-heartbeat, but will work with any heartbeat | ||
// implementation that writes to a table with two columns: | ||
// CREATE TABLE heartbeat ( | ||
// ts varchar(26) NOT NULL, | ||
// server_id int unsigned NOT NULL PRIMARY KEY, | ||
// ); | ||
func ScrapeHeartbeat(db *sql.DB, ch chan<- prometheus.Metric, collectDatabase, collectTable *string) error { | ||
query := fmt.Sprintf(heartbeatQuery, *collectDatabase, *collectTable) | ||
heartbeatRows, err := db.Query(query) | ||
if err != nil { | ||
return err | ||
} | ||
defer heartbeatRows.Close() | ||
|
||
var ( | ||
now, ts sql.RawBytes | ||
serverId int | ||
) | ||
|
||
for heartbeatRows.Next() { | ||
if err := heartbeatRows.Scan(&ts, &now, &serverId); err != nil { | ||
return err | ||
} | ||
|
||
tsFloatVal, err := strconv.ParseFloat(string(ts), 64) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
nowFloatVal, err := strconv.ParseFloat(string(now), 64) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
serverId := strconv.Itoa(serverId) | ||
|
||
ch <- prometheus.MustNewConstMetric( | ||
HeartbeatNowDesc, | ||
prometheus.GaugeValue, | ||
nowFloatVal, | ||
serverId, | ||
) | ||
ch <- prometheus.MustNewConstMetric( | ||
HeartbeatStoredDesc, | ||
prometheus.GaugeValue, | ||
tsFloatVal, | ||
serverId, | ||
) | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package collector | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
dto "github.com/prometheus/client_model/go" | ||
"github.com/smartystreets/goconvey/convey" | ||
"gopkg.in/DATA-DOG/go-sqlmock.v1" | ||
) | ||
|
||
func TestScrapeHeartbeat(t *testing.T) { | ||
db, mock, err := sqlmock.New() | ||
if err != nil { | ||
t.Fatalf("error opening a stub database connection: %s", err) | ||
} | ||
defer db.Close() | ||
|
||
columns := []string{"UNIX_TIMESTAMP(ts)", "UNIX_TIMESTAMP(NOW(6))", "server_id"} | ||
rows := sqlmock.NewRows(columns). | ||
AddRow("1487597613.001320", "1487598113.448042", 1) | ||
mock.ExpectQuery(sanitizeQuery("SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(NOW(6)), server_id from `heartbeat`.`heartbeat`")).WillReturnRows(rows) | ||
|
||
ch := make(chan prometheus.Metric) | ||
go func() { | ||
database := "heartbeat" | ||
table := "heartbeat" | ||
if err = ScrapeHeartbeat(db, ch, &database, &table); err != nil { | ||
t.Errorf("error calling function on test: %s", err) | ||
} | ||
close(ch) | ||
}() | ||
|
||
counterExpected := []MetricResult{ | ||
{labels: labelMap{"server_id": "1"}, value: 1487598113.448042, metricType: dto.MetricType_GAUGE}, | ||
{labels: labelMap{"server_id": "1"}, value: 1487597613.00132, metricType: dto.MetricType_GAUGE}, | ||
} | ||
convey.Convey("Metrics comparison", t, func() { | ||
for _, expect := range counterExpected { | ||
got := readMetric(<-ch) | ||
convey.So(got, convey.ShouldResemble, expect) | ||
} | ||
}) | ||
|
||
// Ensure all SQL queries were executed | ||
if err := mock.ExpectationsWereMet(); err != nil { | ||
t.Errorf("there were unfulfilled expections: %s", err) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters