Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(bigtable): reduce latency #11032

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 81 additions & 34 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Client struct {
project, instance string
appProfile string
metricsTracerFactory *builtinMetricsTracerFactory
workers *pool
}

// ClientConfig has configurations for the client.
Expand Down Expand Up @@ -120,7 +121,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
metricsProvider := config.MetricsProvider
if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" {
// Do not emit metrics when emulator is being used
metricsProvider = NoopMetricsProvider{}
// metricsProvider = NoopMetricsProvider{}
}

// Create a OpenTelemetry metrics configuration
Expand All @@ -136,6 +137,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
instance: instance,
appProfile: config.AppProfile,
metricsTracerFactory: metricsTracerFactory,
workers: newPool(defaultPoolSize),
}, nil
}

Expand Down Expand Up @@ -363,18 +365,26 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
defer func() { trace.EndSpan(ctx, err) }()

mt := t.newBuiltinMetricsTracer(ctx, true)
defer recordOperationCompletion(mt)
// defer recordOperationCompletion(mt)
t.c.workers.addJob(&job{
mt: *mt,
handler: recordOperationCompletion,
})

err = t.readRows(ctx, arg, f, mt, opts...)
mt.currOp.setEndTime(time.Now())
statusCode, statusErr := convertToGrpcStatusErr(err)
mt.currOp.setStatus(statusCode.String())

fmt.Println(time.Now(), " Returning response to user")
return statusErr
}

func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) {
var prevRowKey string
attrMap := make(map[string]interface{})
err = gaxInvokeWithRecorder(ctx, mt, "ReadRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
var stream btpb.Bigtable_ReadRowsClient
err = t.c.gaxInvokeWithRecorder(ctx, mt, "ReadRows", &stream, func(ctx context.Context, headerMD, _ *metadata.MD, streamAddr *btpb.Bigtable_ReadRowsClient, _ gax.CallSettings) error {
req := &btpb.ReadRowsRequest{
AppProfileId: t.c.appProfile,
}
Expand All @@ -401,7 +411,8 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
defer cancel()

startTime := time.Now()
stream, err := t.c.client.ReadRows(ctx, req)
stream, err = t.c.client.ReadRows(ctx, req)
streamAddr = &stream
if err != nil {
return err
}
Expand All @@ -421,11 +432,13 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
proto.Reset(res)
err := stream.RecvMsg(res)
if err == io.EOF {
*trailerMD = stream.Trailer()
// Waiting for trailers
// *trailerMD = stream.Trailer()
break
}
if err != nil {
*trailerMD = stream.Trailer()
// Waiting for trailers
// *trailerMD = stream.Trailer()
// Reset arg for next Invoke call.
if arg == nil {
// Should be lowest possible key value, an empty byte array
Expand Down Expand Up @@ -462,7 +475,8 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
for {
proto.Reset(res)
if err := stream.RecvMsg(res); err != nil {
*trailerMD = stream.Trailer()
// Waiting for trailers
// *trailerMD = stream.Trailer()
// The stream has ended. We don't return an error
// because the caller has intentionally interrupted the scan.
return nil
Expand Down Expand Up @@ -1005,7 +1019,7 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/Apply")
defer func() { trace.EndSpan(ctx, err) }()
mt := t.newBuiltinMetricsTracer(ctx, false)
defer recordOperationCompletion(mt)
defer recordOperationCompletion(mt, nil, nil)

err = t.apply(ctx, mt, row, m, opts...)
statusCode, statusErr := convertToGrpcStatusErr(err)
Expand Down Expand Up @@ -1036,7 +1050,7 @@ func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string,
callOptions = retryOptions
}
var res *btpb.MutateRowResponse
err := gaxInvokeWithRecorder(ctx, mt, "MutateRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
err := t.c.gaxInvokeWithRecorder(ctx, mt, "MutateRow", nil, func(ctx context.Context, headerMD, trailerMD *metadata.MD, streamAddr *btpb.Bigtable_ReadRowsClient, _ gax.CallSettings) error {
var err error
res, err = t.c.client.MutateRow(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD))
return err
Expand Down Expand Up @@ -1073,7 +1087,7 @@ func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string,
callOptions = retryOptions
}
var cmRes *btpb.CheckAndMutateRowResponse
err = gaxInvokeWithRecorder(ctx, mt, "CheckAndMutateRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
err = t.c.gaxInvokeWithRecorder(ctx, mt, "CheckAndMutateRow", nil, func(ctx context.Context, headerMD, trailerMD *metadata.MD, streamAddr *btpb.Bigtable_ReadRowsClient, _ gax.CallSettings) error {
var err error
cmRes, err = t.c.client.CheckAndMutateRow(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD))
return err
Expand Down Expand Up @@ -1265,9 +1279,9 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio
func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...ApplyOption) (err error) {
attrMap := make(map[string]interface{})
mt := t.newBuiltinMetricsTracer(ctx, true)
defer recordOperationCompletion(mt)
defer recordOperationCompletion(mt, nil, nil)

err = gaxInvokeWithRecorder(ctx, mt, "MutateRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
err = t.c.gaxInvokeWithRecorder(ctx, mt, "MutateRows", nil, func(ctx context.Context, headerMD, trailerMD *metadata.MD, streamAddr *btpb.Bigtable_ReadRowsClient, _ gax.CallSettings) error {
attrMap["rowCount"] = len(group)
trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk")
err := t.doApplyBulk(ctx, group, headerMD, trailerMD, opts...)
Expand Down Expand Up @@ -1413,7 +1427,7 @@ func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadMod
ctx = mergeOutgoingMetadata(ctx, t.md)

mt := t.newBuiltinMetricsTracer(ctx, false)
defer recordOperationCompletion(mt)
defer recordOperationCompletion(mt, nil, nil)

updatedRow, err := t.applyReadModifyWrite(ctx, mt, row, m)
statusCode, statusErr := convertToGrpcStatusErr(err)
Expand All @@ -1434,7 +1448,7 @@ func (t *Table) applyReadModifyWrite(ctx context.Context, mt *builtinMetricsTrac
}

var r Row
err := gaxInvokeWithRecorder(ctx, mt, "ReadModifyWriteRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
err := t.c.gaxInvokeWithRecorder(ctx, mt, "ReadModifyWriteRow", nil, func(ctx context.Context, headerMD, trailerMD *metadata.MD, streamAddr *btpb.Bigtable_ReadRowsClient, _ gax.CallSettings) error {
res, err := t.c.client.ReadModifyWriteRow(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD))
if err != nil {
return err
Expand Down Expand Up @@ -1494,7 +1508,7 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
ctx = mergeOutgoingMetadata(ctx, t.md)

mt := t.newBuiltinMetricsTracer(ctx, true)
defer recordOperationCompletion(mt)
defer recordOperationCompletion(mt, nil, nil)

rowKeys, err := t.sampleRowKeys(ctx, mt)
statusCode, statusErr := convertToGrpcStatusErr(err)
Expand All @@ -1504,7 +1518,7 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {

func (t *Table) sampleRowKeys(ctx context.Context, mt *builtinMetricsTracer) ([]string, error) {
var sampledRowKeys []string
err := gaxInvokeWithRecorder(ctx, mt, "SampleRowKeys", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
err := t.c.gaxInvokeWithRecorder(ctx, mt, "SampleRowKeys", nil, func(ctx context.Context, headerMD, trailerMD *metadata.MD, streamAddr *btpb.Bigtable_ReadRowsClient, _ gax.CallSettings) error {
sampledRowKeys = nil
req := &btpb.SampleRowKeysRequest{
AppProfileId: t.c.appProfile,
Expand Down Expand Up @@ -1557,19 +1571,27 @@ func (t *Table) newBuiltinMetricsTracer(ctx context.Context, isStreaming bool) *
// recordOperationCompletion records as many operation specific metrics as it can
// Ignores error seen while creating metric attributes since metric can still
// be recorded with rest of the attributes
func recordOperationCompletion(mt *builtinMetricsTracer) {
func recordOperationCompletion(mt *builtinMetricsTracer, _ *btpb.Bigtable_ReadRowsClient, _ metadata.MD) {
fmt.Println("Entering recordOperationCompletion")
if !mt.builtInEnabled {
fmt.Println("!mt.builtInEnabled")
return
}

fmt.Println("Waiting for completed")
<-mt.currOp.completed
fmt.Println("completed")
// Calculate elapsed time
elapsedTimeMs := convertToMs(time.Since(mt.currOp.startTime))
// elapsedTimeMs := convertToMs(time.Since(mt.currOp.startTime))
elapsedTimeMs := convertToMs(mt.currOp.endTime.Sub(mt.currOp.startTime))

// Record operation_latencies
fmt.Println(time.Now(), " recordOperationCompletion Recording operation_latencies")
opLatAttrs, _ := mt.toOtelMetricAttrs(metricNameOperationLatencies)
mt.instrumentOperationLatencies.Record(mt.ctx, elapsedTimeMs, metric.WithAttributes(opLatAttrs...))

// Record retry_count
fmt.Println(time.Now(), " recordOperationCompletion Recording retry_count")
retryCntAttrs, _ := mt.toOtelMetricAttrs(metricNameRetryCount)
if mt.currOp.attemptCount > 1 {
// Only record when retry count is greater than 0 so the retry
Expand All @@ -1584,17 +1606,17 @@ func recordOperationCompletion(mt *builtinMetricsTracer) {
// - does not return errors seen while recording the metrics
//
// - then, calls gax.Invoke with 'callWrapper' as an argument
func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method string,
f func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error, opts ...gax.CallOption) error {
func (c *Client) gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method string, stream *btpb.Bigtable_ReadRowsClient,
f func(ctx context.Context, headerMD, trailerMD *metadata.MD, stream *btpb.Bigtable_ReadRowsClient, _ gax.CallSettings) error, opts ...gax.CallOption) error {
attemptHeaderMD := metadata.New(nil)
attempTrailerMD := metadata.New(nil)
mt.method = method

var callWrapper func(context.Context, gax.CallSettings) error
if !mt.builtInEnabled {
callWrapper = func(ctx context.Context, callSettings gax.CallSettings) error {
// f makes calls to CBT service
return f(ctx, &attemptHeaderMD, &attempTrailerMD, callSettings)
// f makes calls to Cloud Bigtable service
return f(ctx, &attemptHeaderMD, &attempTrailerMD, stream, callSettings)
}
} else {
callWrapper = func(ctx context.Context, callSettings gax.CallSettings) error {
Expand All @@ -1607,25 +1629,33 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method
mt.currOp.currAttempt.setStartTime(time.Now())

// f makes calls to CBT service
err := f(ctx, &attemptHeaderMD, &attempTrailerMD, callSettings)
// TODO: attempTrailerMD not needed in f. Remove
err := f(ctx, &attemptHeaderMD, &attempTrailerMD, stream, callSettings)

mt.currOp.currAttempt.setEndTime(time.Now())

// Set attempt status
statusCode, _ := convertToGrpcStatusErr(err)
mt.currOp.currAttempt.setStatus(statusCode.String())

// Set server latency in tracer
// serverLatency, serverLatencyErr := extractServerLatency(attemptHeaderMD, attempTrailerMD)
// mt.currOp.currAttempt.setServerLatencyErr(serverLatencyErr)
// mt.currOp.currAttempt.setServerLatency(serverLatency)

// Get location attributes from metadata and set it in tracer
// Ignore get location error since the metric can still be recorded with rest of the attributes
clusterID, zoneID, _ := extractLocation(attemptHeaderMD, attempTrailerMD)
mt.currOp.currAttempt.setClusterID(clusterID)
mt.currOp.currAttempt.setZoneID(zoneID)

// Set server latency in tracer
serverLatency, serverLatencyErr := extractServerLatency(attemptHeaderMD, attempTrailerMD)
mt.currOp.currAttempt.setServerLatencyErr(serverLatencyErr)
mt.currOp.currAttempt.setServerLatency(serverLatency)
// clusterID, zoneID, _ := extractLocation(attemptHeaderMD, attempTrailerMD)
// mt.currOp.currAttempt.setClusterID(clusterID)
// mt.currOp.currAttempt.setZoneID(zoneID)

// Record attempt specific metrics
recordAttemptCompletion(mt)
// recordAttemptCompletion(mt)
c.workers.addJob(&job{
stream: stream,
handler: recordAttemptCompletion,
mt: *mt,
})
return err
}
}
Expand All @@ -1635,19 +1665,36 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method
// recordAttemptCompletion records as many attempt specific metrics as it can
// Ignore errors seen while creating metric attributes since metric can still
// be recorded with rest of the attributes
func recordAttemptCompletion(mt *builtinMetricsTracer) {
func recordAttemptCompletion(mt *builtinMetricsTracer, streamAddr *btpb.Bigtable_ReadRowsClient, attemptHeaderMD metadata.MD) {
fmt.Println("Entering recordAttemptCompletion")
stream := *streamAddr
fmt.Println("Waiting on trailers")
attempTrailerMD := stream.Trailer()
fmt.Println("Trailers received")

clusterID, zoneID, _ := extractLocation(attemptHeaderMD, attempTrailerMD)
mt.currOp.currAttempt.setClusterID(clusterID)
mt.currOp.currAttempt.setZoneID(zoneID)
fmt.Println("clusterID: ", clusterID, ", zoneID: ", zoneID)
serverLatency, serverLatencyErr := extractServerLatency(attemptHeaderMD, attempTrailerMD)
mt.currOp.currAttempt.setServerLatencyErr(serverLatencyErr)
mt.currOp.currAttempt.setServerLatency(serverLatency)

if !mt.builtInEnabled {
return
}

// Calculate elapsed time
elapsedTime := convertToMs(time.Since(mt.currOp.currAttempt.startTime))
// elapsedTime := convertToMs(time.Since(mt.currOp.currAttempt.startTime))
elapsedTime := convertToMs(mt.currOp.endTime.Sub(mt.currOp.startTime))

// Record attempt_latencies
fmt.Println(time.Now(), " recordAttemptCompletion Recording attempt_latencies")
attemptLatAttrs, _ := mt.toOtelMetricAttrs(metricNameAttemptLatencies)
mt.instrumentAttemptLatencies.Record(mt.ctx, elapsedTime, metric.WithAttributes(attemptLatAttrs...))

// Record server_latencies
fmt.Println(time.Now(), " recordAttemptCompletion Recording server_latencies")
serverLatAttrs, _ := mt.toOtelMetricAttrs(metricNameServerLatencies)
if mt.currOp.currAttempt.serverLatencyErr == nil {
mt.instrumentServerLatencies.Record(mt.ctx, mt.currOp.currAttempt.serverLatency, metric.WithAttributes(serverLatAttrs...))
Expand Down
26 changes: 25 additions & 1 deletion bigtable/bttest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,31 @@ func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter, s
rrr.Chunks[len(rrr.Chunks)-1].RowStatus = &btpb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: true}
}

return stream.Send(rrr)
fmt.Println(time.Now(), " Server Sending response on stream")
sendErr := stream.Send(rrr)

go func() {
locationMDKey := "x-goog-ext-425905942-bin"
serverTimingMDKey := "server-timing"
clusterID2 := "cluster-id-2"
zoneID1 := "zone-id-1"
testTrailers, _ := proto.Marshal(&btpb.ResponseParams{
ClusterId: &clusterID2,
ZoneId: &zoneID1,
})
time.Sleep(10 * time.Second)
fmt.Println(time.Now(), " Server sending trailer on stream")
stream.SetTrailer(metadata.MD{
locationMDKey: []string{string(testTrailers)},
serverTimingMDKey: []string{"gfet4t7; dur=5678"},
})
// sendErr := stream.SendMsg(nil)

}()

fmt.Println(" Returing sendErr")
fmt.Println(time.Now(), " Returing sendErr")
return sendErr
}

// filterRow modifies a row with the given filter. Returns true if at least one cell from the row matches,
Expand Down
10 changes: 7 additions & 3 deletions bigtable/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
module cloud.google.com/go/bigtable

go 1.21
go 1.22.7

toolchain go1.23.0

replace google.golang.org/grpc => /usr/local/google/home/bahaaiman/Documents/cfdb-workspace-01/bigtable-client-docker-image/grpc-go

require (
cloud.google.com/go v0.115.1
Expand Down Expand Up @@ -29,13 +33,13 @@ require (
)

require (
cel.dev/expr v0.16.0 // indirect
cel.dev/expr v0.16.1 // indirect
cloud.google.com/go/auth v0.9.3 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.4 // indirect
cloud.google.com/go/compute/metadata v0.5.0 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cncf/xds/go v0.0.0-20240822171458-6449f94b4d59 // indirect
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 // indirect
github.com/envoyproxy/go-control-plane v0.13.0 // indirect
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand Down
Loading