Skip to content

Commit

Permalink
feat(bigtable): Built-in client side metrics (googleapis#10046)
Browse files Browse the repository at this point in the history
  • Loading branch information
bhshkh authored Jul 25, 2024
1 parent cec4f45 commit a747f0a
Show file tree
Hide file tree
Showing 16 changed files with 2,370 additions and 92 deletions.
339 changes: 280 additions & 59 deletions bigtable/bigtable.go

Large diffs are not rendered by default.

19 changes: 11 additions & 8 deletions bigtable/bigtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"google.golang.org/grpc"
)

var disableMetricsConfig = ClientConfig{MetricsProvider: NoopMetricsProvider{}}

func TestPrefix(t *testing.T) {
for _, test := range []struct {
prefix, succ string
Expand Down Expand Up @@ -253,8 +255,9 @@ func TestApplyErrors(t *testing.T) {
ctx := context.Background()
table := &Table{
c: &Client{
project: "P",
instance: "I",
project: "P",
instance: "I",
metricsTracerFactory: &builtinMetricsTracerFactory{},
},
table: "t",
}
Expand Down Expand Up @@ -581,9 +584,9 @@ func TestReadRowsInvalidRowSet(t *testing.T) {
if err := adminClient.CreateTable(ctx, testEnv.config.Table); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}
client, err := NewClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
t.Fatalf("NewClientWithConfig failed: %v", err)
}
defer client.Close()
table := client.Open(testEnv.config.Table)
Expand Down Expand Up @@ -657,9 +660,9 @@ func TestReadRowsRequestStats(t *testing.T) {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}

client, err := NewClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
t.Fatalf("NewClientWithConfig failed: %v", err)
}
defer client.Close()
table := client.Open(testEnv.config.Table)
Expand Down Expand Up @@ -785,9 +788,9 @@ func TestMutateRowsWithAggregates(t *testing.T) {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}

client, err := NewClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
t.Fatalf("NewClientWithConfig failed: %v", err)
}
defer client.Close()
table := client.Open(testEnv.config.Table)
Expand Down
2 changes: 1 addition & 1 deletion bigtable/bttest/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func ExampleNewServer() {
log.Fatalln(err)
}

client, err := bigtable.NewClient(ctx, proj, instance, option.WithGRPCConn(conn))
client, err := bigtable.NewClientWithConfig(ctx, proj, instance, bigtable.ClientConfig{MetricsProvider: bigtable.NoopMetricsProvider{}}, option.WithGRPCConn(conn))
if err != nil {
log.Fatalln(err)
}
Expand Down
2 changes: 1 addition & 1 deletion bigtable/conformance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestConformance(t *testing.T) {
t.Fatal(err)
}
defer conn.Close()
c, err := NewClient(ctx, "some-project", "some-instance", option.WithGRPCConn(conn))
c, err := NewClientWithConfig(ctx, "some-project", "some-instance", disableMetricsConfig, option.WithGRPCConn(conn))
if err != nil {
t.Fatal(err)
}
Expand Down
24 changes: 21 additions & 3 deletions bigtable/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type IntegrationEnv interface {
// NewInstanceAdminClient will return nil if instance administration is unsupported in this environment
NewInstanceAdminClient() (*InstanceAdminClient, error)
NewClient() (*Client, error)
NewClientWithConfig(ClientConfig) (*Client, error)
Close()
Peer() *peer.Peer
}
Expand Down Expand Up @@ -240,6 +241,15 @@ func (e *EmulatedEnv) NewInstanceAdminClient() (*InstanceAdminClient, error) {

// NewClient builds a new connected data client for this environment
func (e *EmulatedEnv) NewClient() (*Client, error) {
return e.newEmulatedClient(ClientConfig{})
}

// NewClient builds a new connected data client with provided config for this environment
func (e *EmulatedEnv) NewClientWithConfig(config ClientConfig) (*Client, error) {
return e.newEmulatedClient(config)
}

func (e *EmulatedEnv) newEmulatedClient(config ClientConfig) (*Client, error) {
o, err := btopt.DefaultClientOptions(e.server.Addr, e.server.Addr, Scope, clientUserAgent)
if err != nil {
return nil, err
Expand All @@ -263,7 +273,7 @@ func (e *EmulatedEnv) NewClient() (*Client, error) {
if err != nil {
return nil, err
}
return NewClient(ctx, e.config.Project, e.config.Instance, option.WithGRPCConn(conn))
return NewClientWithConfig(ctx, e.config.Project, e.config.Instance, config, option.WithGRPCConn(conn))
}

// ProdEnv encapsulates the state necessary to connect to the external Bigtable service
Expand Down Expand Up @@ -334,6 +344,15 @@ func (e *ProdEnv) NewInstanceAdminClient() (*InstanceAdminClient, error) {

// NewClient builds a connected data client for this environment
func (e *ProdEnv) NewClient() (*Client, error) {
return e.newProdClient(ClientConfig{})
}

// NewClientWithConfig builds a connected data client with provided config for this environment
func (e *ProdEnv) NewClientWithConfig(config ClientConfig) (*Client, error) {
return e.newProdClient(config)
}

func (e *ProdEnv) newProdClient(config ClientConfig) (*Client, error) {
clientOpts := headersInterceptor.CallOptions()
if endpoint := e.config.DataEndpoint; endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
Expand All @@ -343,6 +362,5 @@ func (e *ProdEnv) NewClient() (*Client, error) {
// For DirectPath tests, we need to add an interceptor to check the peer IP.
clientOpts = append(clientOpts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(e.peerInfo))))
}

return NewClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...)
return NewClientWithConfig(context.Background(), e.config.Project, e.config.Instance, config, clientOpts...)
}
15 changes: 10 additions & 5 deletions bigtable/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ require (
github.com/google/go-cmp v0.6.0
github.com/googleapis/cloud-bigtable-clients-test v0.0.2
github.com/googleapis/gax-go/v2 v2.13.0
go.opentelemetry.io/otel v1.24.0 // Use older version compatible with Go 1.20
go.opentelemetry.io/otel/metric v1.24.0 // Use older version compatible with Go 1.20
go.opentelemetry.io/otel/sdk v1.24.0 // Use older version compatible with Go 1.20
go.opentelemetry.io/otel/sdk/metric v1.24.0 // Use older version compatible with Go 1.20
google.golang.org/api v0.189.0
google.golang.org/genproto v0.0.0-20240722135656-d784300faade
google.golang.org/genproto/googleapis/rpc v0.0.0-20240722135656-d784300faade
Expand All @@ -18,6 +22,12 @@ require (
rsc.io/binaryregexp v0.2.0
)

require (
cloud.google.com/go/monitoring v1.20.1
github.com/google/uuid v1.6.0
google.golang.org/genproto/googleapis/api v0.0.0-20240722135656-d784300faade
)

require (
cel.dev/expr v0.15.0 // indirect
cloud.google.com/go/auth v0.7.2 // indirect
Expand All @@ -34,14 +44,10 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/net v0.27.0 // indirect
Expand All @@ -50,5 +56,4 @@ require (
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240722135656-d784300faade // indirect
)
4 changes: 4 additions & 0 deletions bigtable/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ cloud.google.com/go/iam v1.1.10 h1:ZSAr64oEhQSClwBL670MsJAW5/RLiC6kfw3Bqmd5ZDI=
cloud.google.com/go/iam v1.1.10/go.mod h1:iEgMq62sg8zx446GCaijmA2Miwg5o3UbO+nI47WHJps=
cloud.google.com/go/longrunning v0.5.9 h1:haH9pAuXdPAMqHvzX0zlWQigXT7B0+CL4/2nXXdBo5k=
cloud.google.com/go/longrunning v0.5.9/go.mod h1:HD+0l9/OOW0za6UWdKJtXoFAX/BGg/3Wj8p10NeWF7c=
cloud.google.com/go/monitoring v1.20.1 h1:XmM6uk4+mI2ZhWdI2n/2GNhJdpeQN+1VdG2UWEDhX48=
cloud.google.com/go/monitoring v1.20.1/go.mod h1:FYSe/brgfuaXiEzOQFhTjsEsJv+WePyK71X7Y8qo6uQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g=
Expand Down Expand Up @@ -100,6 +102,8 @@ go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGX
go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8=
go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0=
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
101 changes: 100 additions & 1 deletion bigtable/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"cloud.google.com/go/internal/optional"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/uid"
monitoring "cloud.google.com/go/monitoring/apiv3/v2"
"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"github.com/google/go-cmp/cmp"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
Expand All @@ -46,6 +48,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
Expand Down Expand Up @@ -276,6 +279,7 @@ func TestIntegration_ReadRowList(t *testing.T) {
t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
}
}

func TestIntegration_ReadRowListReverse(t *testing.T) {
ctx := context.Background()
_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
Expand Down Expand Up @@ -749,6 +753,101 @@ func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) {
wg.Wait()
}

func TestIntegration_ExportBuiltInMetrics(t *testing.T) {
ctx := context.Background()

// Reduce sampling period for faster test runs
origSamplePeriod := defaultSamplePeriod
defaultSamplePeriod = time.Minute
defer func() {
defaultSamplePeriod = origSamplePeriod
}()

// record start time
testStartTime := time.Now()
tsListStart := &timestamppb.Timestamp{
Seconds: testStartTime.Unix(),
Nanos: int32(testStartTime.Nanosecond()),
}

testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()

if testing.Short() || !testEnv.Config().UseProd {
t.Skip("Skip long running tests in short mode or non-prod environments")
}

columnFamilyName := "export"
if err := adminClient.CreateColumnFamily(ctx, tableName, columnFamilyName); err != nil {
t.Fatalf("Creating column family: %v", err)
}

for i := 0; i < 10; i++ {
mut := NewMutation()
mut.Set(columnFamilyName, "col", 1000, []byte("test"))
if err := table.Apply(ctx, fmt.Sprintf("row-%v", i), mut); err != nil {
t.Fatalf("Apply: %v", err)
}
}
err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
return true
}, RowFilter(ColumnFilter("col")))
if err != nil {
t.Fatalf("ReadRows: %v", err)
}

// Validate that metrics are exported
elapsedTime := time.Since(testStartTime)
if elapsedTime < 2*defaultSamplePeriod {
// Ensure at least 2 datapoints are recorded
time.Sleep(2*defaultSamplePeriod - elapsedTime)
}

// Sleep some more
time.Sleep(30 * time.Second)

monitoringClient, err := monitoring.NewMetricClient(ctx)
if err != nil {
t.Errorf("Failed to create metric client: %v", err)
}
metricNamesValidate := []string{
metricNameOperationLatencies,
metricNameAttemptLatencies,
metricNameServerLatencies,
}

// Try for 5m with 10s sleep between retries
testutil.Retry(t, 10, 30*time.Second, func(r *testutil.R) {
for _, metricName := range metricNamesValidate {
timeListEnd := time.Now()
tsListEnd := &timestamppb.Timestamp{
Seconds: timeListEnd.Unix(),
Nanos: int32(timeListEnd.Nanosecond()),
}

// ListTimeSeries can list only one metric type at a time.
// So, call ListTimeSeries with different metric names
iter := monitoringClient.ListTimeSeries(ctx, &monitoringpb.ListTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", testEnv.Config().Project),
Interval: &monitoringpb.TimeInterval{
StartTime: tsListStart,
EndTime: tsListEnd,
},
Filter: fmt.Sprintf("metric.type = starts_with(\"bigtable.googleapis.com/client/%v\")", metricName),
})

// Assert at least 1 datapoint was exported
_, err := iter.Next()
if err != nil {
r.Errorf("%v not exported\n", metricName)
}
}
})
}

func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {
ctx := context.Background()
testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
Expand All @@ -757,7 +856,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {
}
defer cleanup()

if !testEnv.Config().UseProd {
if testing.Short() {
t.Skip("Skip long running tests in short mode")
}

Expand Down
3 changes: 2 additions & 1 deletion bigtable/internal/testproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ func (s *goTestProxyServer) CreateClient(ctx context.Context, req *pb.CreateClie
}

config := bigtable.ClientConfig{
AppProfile: req.AppProfileId,
AppProfile: req.AppProfileId,
MetricsProvider: bigtable.NoopMetricsProvider{},
}
c, err := bigtable.NewClientWithConfig(ctx, req.ProjectId, req.InstanceId, config, option.WithGRPCConn(conn))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion bigtable/internal/testproxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func populateTable(bts *bttest.Server) error {
}
}

dataClient, err := bigtable.NewClient(ctx, "client", "instance",
dataClient, err := bigtable.NewClientWithConfig(ctx, "client", "instance", bigtable.ClientConfig{MetricsProvider: bigtable.NoopMetricsProvider{}},
option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock()))
if err != nil {
return fmt.Errorf("testproxy setup: can't create Bigtable client: %v", err)
Expand Down
Loading

0 comments on commit a747f0a

Please sign in to comment.