Skip to content

Commit 7c9493f

Browse files
committed
Skip nulls and log a message
Signed-off-by: Felix Yuan <felix.yuan@reddit.com>
1 parent f727569 commit 7c9493f

File tree

2 files changed

+77
-144
lines changed

2 files changed

+77
-144
lines changed

collector/pg_stat_walreceiver.go

+77-61
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"fmt"
1919

2020
"github.com/go-kit/log"
21+
"github.com/go-kit/log/level"
2122
"github.com/prometheus/client_golang/prometheus"
2223
)
2324

@@ -158,132 +159,147 @@ func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *insta
158159
}
159160
}
160161
if !upstreamHost.Valid {
162+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream host is null")
161163
continue
162164
}
163165

164166
if !slotName.Valid {
167+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because slotname host is null")
165168
continue
166169
}
167170
labels := []string{upstreamHost.String, slotName.String}
171+
if !status.Valid {
172+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because status is null")
173+
continue
174+
}
168175

169-
statusMetric := -3.0
170-
if status.Valid {
171-
switch status.String {
172-
case "stopped":
173-
statusMetric = 0.0
174-
break
175-
case "starting":
176-
statusMetric = 1.0
177-
break
178-
case "streaming":
179-
statusMetric = 2.0
180-
break
181-
case "waiting":
182-
statusMetric = 3.0
183-
break
184-
case "restarting":
185-
statusMetric = 4.0
186-
break
187-
case "stopping":
188-
statusMetric = -1.0
189-
break
190-
default:
191-
statusMetric = -2.0
192-
break
193-
}
176+
var statusMetric float64
177+
switch status.String {
178+
case "stopped":
179+
statusMetric = 0.0
180+
break
181+
case "starting":
182+
statusMetric = 1.0
183+
break
184+
case "streaming":
185+
statusMetric = 2.0
186+
break
187+
case "waiting":
188+
statusMetric = 3.0
189+
break
190+
case "restarting":
191+
statusMetric = 4.0
192+
break
193+
case "stopping":
194+
statusMetric = -1.0
195+
break
196+
default:
197+
statusMetric = -2.0
198+
break
199+
}
200+
201+
if !receiveStartLsn.Valid {
202+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_lsn is null")
203+
continue
204+
}
205+
if !receiveStartTli.Valid {
206+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_tli is null")
207+
continue
208+
}
209+
if hasFlushedLSN && !flushedLsn.Valid {
210+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because flushed_lsn is null")
211+
continue
212+
}
213+
if !receivedTli.Valid {
214+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because received_tli is null")
215+
continue
216+
}
217+
if !lastMsgSendTime.Valid {
218+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_send_time is null")
219+
continue
220+
}
221+
if !lastMsgReceiptTime.Valid {
222+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_receipt_time is null")
223+
continue
194224
}
225+
if !latestEndLsn.Valid {
226+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_lsn is null")
227+
continue
228+
}
229+
if !latestEndTime.Valid {
230+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_time is null")
231+
continue
232+
}
233+
if !upstreamNode.Valid {
234+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream_node is null")
235+
continue
236+
}
237+
238+
receiveStartLsnMetric := float64(receiveStartLsn.Int64)
195239
ch <- prometheus.MustNewConstMetric(
196240
statWalReceiverStatus,
197241
prometheus.GaugeValue,
198242
statusMetric,
199243
labels...)
200244

201-
receiveStartLsnMetric := 0.0
202-
if receiveStartLsn.Valid {
203-
receiveStartLsnMetric = float64(receiveStartLsn.Int64)
204-
}
205245
ch <- prometheus.MustNewConstMetric(
206246
statWalReceiverReceiveStartLsn,
207247
prometheus.CounterValue,
208248
receiveStartLsnMetric,
209249
labels...)
210250

211-
receiveStartTliMetric := 0.0
212-
if receiveStartTli.Valid {
213-
receiveStartTliMetric = float64(receiveStartTli.Int64)
214-
}
251+
receiveStartTliMetric := float64(receiveStartTli.Int64)
215252
ch <- prometheus.MustNewConstMetric(
216253
statWalReceiverReceiveStartTli,
217254
prometheus.GaugeValue,
218255
receiveStartTliMetric,
219256
labels...)
220257

221258
if hasFlushedLSN {
222-
flushedLsnMetric := 0.0
223-
if flushedLsn.Valid {
224-
flushedLsnMetric = float64(flushedLsn.Int64)
225-
}
259+
flushedLsnMetric := float64(flushedLsn.Int64)
226260
ch <- prometheus.MustNewConstMetric(
227261
statWalReceiverFlushedLSN,
228262
prometheus.CounterValue,
229263
flushedLsnMetric,
230264
labels...)
231265
}
232266

233-
receivedTliMetric := 0.0
234-
if receivedTli.Valid {
235-
receivedTliMetric = float64(receivedTli.Int64)
236-
}
267+
receivedTliMetric := float64(receivedTli.Int64)
237268
ch <- prometheus.MustNewConstMetric(
238269
statWalReceiverReceivedTli,
239270
prometheus.GaugeValue,
240271
receivedTliMetric,
241272
labels...)
242273

243-
lastMsgSendTimeMetric := 0.0
244-
if lastMsgSendTime.Valid {
245-
lastMsgSendTimeMetric = float64(lastMsgSendTime.Float64)
246-
}
274+
lastMsgSendTimeMetric := float64(lastMsgSendTime.Float64)
247275
ch <- prometheus.MustNewConstMetric(
248276
statWalReceiverLastMsgSendTime,
249277
prometheus.CounterValue,
250278
lastMsgSendTimeMetric,
251279
labels...)
252280

253-
lastMsgReceiptTimeMetric := 0.0
254-
if lastMsgReceiptTime.Valid {
255-
lastMsgReceiptTimeMetric = float64(lastMsgReceiptTime.Float64)
256-
}
281+
lastMsgReceiptTimeMetric := float64(lastMsgReceiptTime.Float64)
257282
ch <- prometheus.MustNewConstMetric(
258283
statWalReceiverLastMsgReceiptTime,
259284
prometheus.CounterValue,
260285
lastMsgReceiptTimeMetric,
261286
labels...)
262287

263-
latestEndLsnMetric := 0.0
264-
if latestEndLsn.Valid {
265-
latestEndLsnMetric = float64(latestEndLsn.Int64)
266-
}
288+
latestEndLsnMetric := float64(latestEndLsn.Int64)
267289
ch <- prometheus.MustNewConstMetric(
268290
statWalReceiverLatestEndLsn,
269291
prometheus.CounterValue,
270292
latestEndLsnMetric,
271293
labels...)
272294

273-
latestEndTimeMetric := 0.0
274-
if latestEndTime.Valid {
275-
latestEndTimeMetric = float64(latestEndTime.Float64)
276-
}
295+
latestEndTimeMetric := float64(latestEndTime.Float64)
277296
ch <- prometheus.MustNewConstMetric(
278297
statWalReceiverLatestEndTime,
279298
prometheus.CounterValue,
280299
latestEndTimeMetric,
281300
labels...)
282301

283-
upstreamNodeMetric := 0.0
284-
if upstreamNode.Valid {
285-
upstreamNodeMetric = float64(upstreamNode.Int64)
286-
}
302+
upstreamNodeMetric := float64(upstreamNode.Int64)
287303
ch <- prometheus.MustNewConstMetric(
288304
statWalReceiverUpstreamNode,
289305
prometheus.GaugeValue,

collector/pg_stat_walreceiver_test.go

-83
Original file line numberDiff line numberDiff line change
@@ -186,86 +186,3 @@ func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) {
186186
}
187187

188188
}
189-
190-
func TestPGStatWalReceiverCollectorWithFlushedLSNNull(t *testing.T) {
191-
db, mock, err := sqlmock.New()
192-
if err != nil {
193-
t.Fatalf("Error opening a stub db connection: %s", err)
194-
}
195-
defer db.Close()
196-
197-
inst := &instance{db: db}
198-
infoSchemaColumns := []string{
199-
"column_name",
200-
}
201-
202-
infoSchemaRows := sqlmock.NewRows(infoSchemaColumns).
203-
AddRow(
204-
"flushed_lsn",
205-
)
206-
207-
mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows)
208-
209-
columns := []string{
210-
"upstream_host",
211-
"slot_name",
212-
"status",
213-
"receive_start_lsn",
214-
"receive_start_tli",
215-
"flushed_lsn",
216-
"received_tli",
217-
"last_msg_send_time",
218-
"last_msg_receipt_time",
219-
"latest_end_lsn",
220-
"latest_end_time",
221-
"upstream_node",
222-
}
223-
rows := sqlmock.NewRows(columns).
224-
AddRow(
225-
"foo",
226-
"bar",
227-
nil,
228-
nil,
229-
nil,
230-
nil,
231-
nil,
232-
nil,
233-
nil,
234-
nil,
235-
nil,
236-
nil,
237-
)
238-
mock.ExpectQuery(sanitizeQuery(queryWithFlushedLSN)).WillReturnRows(rows)
239-
240-
ch := make(chan prometheus.Metric)
241-
go func() {
242-
defer close(ch)
243-
c := PGStatWalReceiverCollector{}
244-
245-
if err := c.Update(context.Background(), inst, ch); err != nil {
246-
t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err)
247-
}
248-
}()
249-
expected := []MetricResult{
250-
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: -3.0, metricType: dto.MetricType_GAUGE},
251-
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER},
252-
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_GAUGE},
253-
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER},
254-
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_GAUGE},
255-
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER},
256-
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER},
257-
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER},
258-
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_COUNTER},
259-
{labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 0, metricType: dto.MetricType_GAUGE},
260-
}
261-
convey.Convey("Metrics comparison", t, func() {
262-
for _, expect := range expected {
263-
m := readMetric(<-ch)
264-
convey.So(expect, convey.ShouldResemble, m)
265-
}
266-
})
267-
if err := mock.ExpectationsWereMet(); err != nil {
268-
t.Errorf("there were unfulfilled exceptions: %s", err)
269-
}
270-
271-
}

0 commit comments

Comments
 (0)