Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): first_response_latencies and connectivity_error_count metrics #10616

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ import (
_ "google.golang.org/grpc/balancer/rls"
)

const prodAddr = "bigtable.googleapis.com:443"
const mtlsProdAddr = "bigtable.mtls.googleapis.com:443"
const featureFlagsHeaderKey = "bigtable-features"
const (
prodAddr = "bigtable.googleapis.com:443"
mtlsProdAddr = "bigtable.mtls.googleapis.com:443"
featureFlagsHeaderKey = "bigtable-features"
methodNameReadRows = "ReadRows"
)

// Client is a client for reading and writing data to tables in an instance.
//
Expand Down Expand Up @@ -375,7 +378,7 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) {
var prevRowKey string
attrMap := make(map[string]interface{})
err = gaxInvokeWithRecorder(ctx, mt, "ReadRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
err = gaxInvokeWithRecorder(ctx, mt, methodNameReadRows, func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
req := &btpb.ReadRowsRequest{
AppProfileId: t.c.appProfile,
}
Expand Down Expand Up @@ -421,6 +424,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
for {
proto.Reset(res)
err := stream.RecvMsg(res)
mt.currOp.setFirstRespTime(time.Now())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when multiple responses are received?

what happens when there are no rows returned?

is the behavior consistent with java?

Does this work across retries?

Copy link
Contributor Author

@bhshkh bhshkh Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when multiple responses are received?
Updated the code to set only on the first response.

if err == io.EOF {
*trailerMD = stream.Trailer()
break
Expand Down Expand Up @@ -1577,6 +1581,13 @@ func recordOperationCompletion(mt *builtinMetricsTracer) {
// graph will be less confusing
mt.instrumentRetryCount.Add(mt.ctx, mt.currOp.attemptCount-1, metric.WithAttributes(retryCntAttrs...))
}

// Record first_reponse_latencies
firstRespLatAttrs, _ := mt.toOtelMetricAttrs(metricNameRetryCount)
if mt.method == methodNameReadRows {
elapsedTimeMs = convertToMs(mt.currOp.firstRespTime.Sub(mt.currOp.startTime))
mt.instrumentFirstRespLatencies.Record(mt.ctx, elapsedTimeMs, metric.WithAttributes(firstRespLatAttrs...))
}
}

// gaxInvokeWithRecorder:
Expand Down Expand Up @@ -1609,7 +1620,8 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method

// Get location attributes from metadata and set it in tracer
// Ignore get location error since the metric can still be recorded with rest of the attributes
clusterID, zoneID, _ := extractLocation(attemptHeaderMD, attempTrailerMD)
clusterID, zoneID, locationErr := extractLocation(attemptHeaderMD, attempTrailerMD)
mt.currOp.currAttempt.setLocationErr(locationErr)
mt.currOp.currAttempt.setClusterID(clusterID)
mt.currOp.currAttempt.setZoneID(zoneID)

Expand Down Expand Up @@ -1640,9 +1652,15 @@ func recordAttemptCompletion(mt *builtinMetricsTracer) {
attemptLatAttrs, _ := mt.toOtelMetricAttrs(metricNameAttemptLatencies)
mt.instrumentAttemptLatencies.Record(mt.ctx, elapsedTime, metric.WithAttributes(attemptLatAttrs...))

// Record server_latencies
// Record server_latencies and connectivity_error_count
connErrCountAttrs, _ := mt.toOtelMetricAttrs(metricNameConnErrCount)
serverLatAttrs, _ := mt.toOtelMetricAttrs(metricNameServerLatencies)
if mt.currOp.currAttempt.serverLatencyErr == nil {
mt.instrumentServerLatencies.Record(mt.ctx, mt.currOp.currAttempt.serverLatency, metric.WithAttributes(serverLatAttrs...))
}
if mt.currOp.currAttempt.serverLatencyErr == nil && mt.currOp.currAttempt.locationErr == nil {
mt.instrumentConnErrCount.Add(mt.ctx, 0, metric.WithAttributes(connErrCountAttrs...))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to add a zero?

Copy link
Contributor Author

@bhshkh bhshkh Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {
mt.instrumentConnErrCount.Add(mt.ctx, 1, metric.WithAttributes(connErrCountAttrs...))
}
}
15 changes: 10 additions & 5 deletions bigtable/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,11 +912,14 @@ func TestIntegration_ExportBuiltInMetrics(t *testing.T) {
t.Fatalf("Apply: %v", err)
}
}
err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
return true
}, RowFilter(ColumnFilter("col")))
if err != nil {
t.Fatalf("ReadRows: %v", err)

for i := 0; i < 10; i++ {
err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
return true
}, RowFilter(ColumnFilter("col")))
if err != nil {
t.Fatalf("ReadRows: %v", err)
}
}

// Validate that metrics are exported
Expand All @@ -937,6 +940,8 @@ func TestIntegration_ExportBuiltInMetrics(t *testing.T) {
metricNameOperationLatencies,
metricNameAttemptLatencies,
metricNameServerLatencies,
metricNameFirstRespLatencies,
metricNameConnErrCount,
}

// Try for 5m with 10s sleep between retries
Expand Down
2 changes: 2 additions & 0 deletions bigtable/metric_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func extractServerLatency(headerMD metadata.MD, trailerMD metadata.MD) (float64,
}

// Obtain cluster and zone from response metadata
// Check both headers and trailers because in different environments the metadata could
// be returned in headers or trailers
func extractLocation(headerMD metadata.MD, trailerMD metadata.MD) (string, string, error) {
var locationMetadata []string

Expand Down
61 changes: 59 additions & 2 deletions bigtable/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"log"
"os"
"sync"
"time"

"cloud.google.com/go/bigtable/internal"
Expand Down Expand Up @@ -59,7 +60,9 @@ const (
metricNameOperationLatencies = "operation_latencies"
metricNameAttemptLatencies = "attempt_latencies"
metricNameServerLatencies = "server_latencies"
metricNameFirstRespLatencies = "first_response_latencies"
metricNameRetryCount = "retry_count"
metricNameConnErrCount = "connectivity_error_count"

// Metric units
metricUnitMS = "ms"
Expand Down Expand Up @@ -102,12 +105,24 @@ var (
},
recordedPerAttempt: true,
},
metricNameFirstRespLatencies: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: false,
},
metricNameRetryCount: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: true,
},
metricNameConnErrCount: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: true,
},
}

// Generates unique client ID in the format go-<random UUID>@<hostname>
Expand Down Expand Up @@ -141,7 +156,10 @@ type builtinMetricsTracerFactory struct {
operationLatencies metric.Float64Histogram
serverLatencies metric.Float64Histogram
attemptLatencies metric.Float64Histogram
retryCount metric.Int64Counter
firstRespLatencies metric.Float64Histogram

retryCount metric.Int64Counter
connErrCount metric.Int64Counter
}

func newBuiltinMetricsTracerFactory(ctx context.Context, project, instance, appProfile string, metricsProvider MetricsProvider) (*builtinMetricsTracerFactory, error) {
Expand Down Expand Up @@ -240,12 +258,30 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err
return err
}

// Create first_response_latencies
tf.firstRespLatencies, err = meter.Float64Histogram(
metricNameFirstRespLatencies,
metric.WithDescription("Latency from operation start until the response headers were received. The publishing of the measurement will be delayed until the attempt response has been received."),
metric.WithUnit(metricUnitMS),
metric.WithExplicitBucketBoundaries(bucketBounds...),
)
if err != nil {
return err
}

// Create retry_count
tf.retryCount, err = meter.Int64Counter(
metricNameRetryCount,
metric.WithDescription("The number of additional RPCs sent after the initial attempt."),
metric.WithUnit(metricUnitCount),
)

// Create connectivity_error_count
tf.connErrCount, err = meter.Int64Counter(
metricNameConnErrCount,
metric.WithDescription("Number of requests that failed to reach the Google datacenter. (Requests without google response headers"),
metric.WithUnit(metricUnitCount),
)
return err
}

Expand All @@ -263,7 +299,9 @@ type builtinMetricsTracer struct {
instrumentOperationLatencies metric.Float64Histogram
instrumentServerLatencies metric.Float64Histogram
instrumentAttemptLatencies metric.Float64Histogram
instrumentFirstRespLatencies metric.Float64Histogram
instrumentRetryCount metric.Int64Counter
instrumentConnErrCount metric.Int64Counter

tableName string
method string
Expand All @@ -280,6 +318,10 @@ type opTracer struct {

startTime time.Time

// Only for ReadRows. Time when the response headers are received in a streaming RPC.
firstRespTime time.Time
firstRespTimeOnce sync.Once

// gRPC status code of last completed attempt
status string

Expand All @@ -290,6 +332,12 @@ func (o *opTracer) setStartTime(t time.Time) {
o.startTime = t
}

func (o *opTracer) setFirstRespTime(t time.Time) {
o.firstRespTimeOnce.Do(func() {
o.firstRespTime = t
})
}

func (o *opTracer) setStatus(status string) {
o.status = status
}
Expand All @@ -311,8 +359,11 @@ type attemptTracer struct {
// Server latency in ms
serverLatency float64

// Error seen while getting server latency from headers
// Error seen while getting server latency from headers / trailers
serverLatencyErr error

// Error seen while getting location (cluster and zone) from headers / trailers
locationErr error
}

func (a *attemptTracer) setStartTime(t time.Time) {
Expand All @@ -327,6 +378,10 @@ func (a *attemptTracer) setZoneID(zoneID string) {
a.zoneID = zoneID
}

func (a *attemptTracer) setLocationErr(err error) {
a.locationErr = err
}

func (a *attemptTracer) setStatus(status string) {
a.status = status
}
Expand Down Expand Up @@ -355,7 +410,9 @@ func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Co
instrumentOperationLatencies: tf.operationLatencies,
instrumentServerLatencies: tf.serverLatencies,
instrumentAttemptLatencies: tf.attemptLatencies,
instrumentFirstRespLatencies: tf.firstRespLatencies,
instrumentRetryCount: tf.retryCount,
instrumentConnErrCount: tf.connErrCount,

tableName: tableName,
isStreaming: isStreaming,
Expand Down
10 changes: 7 additions & 3 deletions bigtable/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
attribute.String(metricLabelKeyClientUID, clientUID),
attribute.String(metricLabelKeyClientName, clientName),
}
wantMetricNamesStdout := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies}

wantMetricNames := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameConnErrCount, metricNameConnErrCount, metricNameFirstRespLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies}
wantMetricTypesGCM := []string{}
for _, wantMetricName := range wantMetricNamesStdout {
for _, wantMetricName := range wantMetricNames {
wantMetricTypesGCM = append(wantMetricTypesGCM, builtInMetricsMeterName+wantMetricName)
}
sort.Strings(wantMetricTypesGCM)

// Reduce sampling period to reduce test run time
origSamplePeriod := defaultSamplePeriod
Expand Down Expand Up @@ -202,6 +204,8 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
gotNonNilInstruments := gotClient.metricsTracerFactory.operationLatencies != nil &&
gotClient.metricsTracerFactory.serverLatencies != nil &&
gotClient.metricsTracerFactory.attemptLatencies != nil &&
gotClient.metricsTracerFactory.firstRespLatencies != nil &&
gotClient.metricsTracerFactory.connErrCount != nil &&
gotClient.metricsTracerFactory.retryCount != nil
if test.wantBuiltinEnabled != gotNonNilInstruments {
t.Errorf("NonNilInstruments: got: %v, want: %v", gotNonNilInstruments, test.wantBuiltinEnabled)
Expand Down Expand Up @@ -261,7 +265,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
}
sort.Strings(gotMetricTypes)
if !testutil.Equal(gotMetricTypes, wantMetricTypesGCM) {
t.Errorf("Metric types missing in req. got: %v, want: %v", gotMetricTypes, wantMetricTypesGCM)
t.Errorf("Metric types missing in req. \ngot: %v, \nwant: %v\ndiff: %v", gotMetricTypes, wantMetricTypesGCM, testutil.Diff(gotMetricTypes, wantMetricTypesGCM))
}
}

Expand Down
Loading