Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data race in managedwriter.(*connection).lockingAppend() #9301

Closed
sstemmle opened this issue Jan 25, 2024 · 2 comments · Fixed by #9360
Closed

Data race in managedwriter.(*connection).lockingAppend() #9301

sstemmle opened this issue Jan 25, 2024 · 2 comments · Fixed by #9360
Assignees
Labels
api: bigquery Issues related to the BigQuery API. priority: p3 Desirable enhancement or fix. May not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@sstemmle
Copy link

Client

bigquery/storage (v1.58.0)

Environment

Any

Go Environment

$ go version
1.21.6
$ go env
GO111MODULE=''
GOARCH='amd64'
GOBIN=''
GOCACHE='/root/.cache/go-build'
GOENV='/root/.config/go/env'
GOEXE=''
GOEXPERIMENT=''
GOFLAGS=''
GOHOSTARCH='amd64'
GOHOSTOS='linux'
GOINSECURE=''
GOMODCACHE='/go/pkg/mod'
GONOPROXY=''
GONOSUMDB=''
GOOS='linux'
GOPATH='/go'
GOPRIVATE=''
GOPROXY='https://proxy.golang.org,direct'
GOROOT='/usr/local/go'
GOSUMDB='sum.golang.org'
GOTMPDIR=''
GOTOOLCHAIN='local'
GOTOOLDIR='/usr/local/go/pkg/tool/linux_amd64'
GOVCS=''
GOVERSION='go1.21.6'
GCCGO='gccgo'
GOAMD64='v1'
AR='ar'
CC='gcc'
CXX='g++'
CGO_ENABLED='1'
GOMOD='/dev/null'
GOWORK=''
CGO_CFLAGS='-O2 -g'
CGO_CPPFLAGS=''
CGO_CXXFLAGS='-O2 -g'
CGO_FFLAGS='-O2 -g'
CGO_LDFLAGS='-O2 -g'
PKG_CONFIG='pkg-config'
GOGCCFLAGS='-fPIC -m64 -pthread -Wl,--no-gc-sections -fmessage-length=0 -ffile-prefix-map=/tmp/go-build1646458025=/tmp/go-build -gno-record-gcc-switches'

Code

It happened so far only once in a bigger code base. The below snippet is a simplified version. For this I was not able to reproduce it, but from the stack trace it should be clear what happened.
e.g.

package main

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"

	"cloud.google.com/go/bigquery/storage/managedwriter"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

const (
	projectID   = "project"
	dataset     = "dataset"
	tableName   = "test_table"
	keyfilePath = "keyfile.json"
)

func setupStream(msgDesc *descriptorpb.DescriptorProto) (*managedwriter.ManagedStream, func() error, error) {
	ctx := context.Background()
	client, err := managedwriter.NewClient(ctx, projectID, option.WithCredentialsFile(keyfilePath))
	if err != nil {
		return nil, nil, err
	}

	tableStr := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, dataset, tableName)

	managedStream, err := client.NewManagedStream(ctx, managedwriter.WithDestinationTable(tableStr),
		managedwriter.WithType(managedwriter.DefaultStream),
		managedwriter.WithSchemaDescriptor(msgDesc))
	if err != nil {
		return nil, client.Close, err
	}

	return managedStream, client.Close, nil
}

func main() {
	descrProto, msgDescr, err := getDescriptor()
	if err != nil {
		panic(err)
	}

	stream, cleanup, err := setupStream(descrProto)
	if err != nil {
		panic(err)
	}

	defer cleanup()

	nWriters := 5
	nMsg := 10000

	wg := sync.WaitGroup{}
	wg.Add(nWriters)

	for i := 0; i < nWriters; i++ {
		go func() {
			for j := 0; j < nMsg; j++ {
				msg, _ := getMessage(msgDescr, int64(j))

				res, err := stream.AppendRows(context.Background(), [][]byte{msg})
				if err != nil {
					panic(err)
				}

				_, err = res.GetResult(context.Background())
				if err != nil {
					panic(err)
				}

				if j%100 == 0 {
					time.Sleep(5 * time.Second)
				}
			}
			wg.Done()
		}()
	}

	wg.Wait()
}

func getDescriptor() (*descriptorpb.DescriptorProto, protoreflect.MessageDescriptor, error) {
	descrProto := &descriptorpb.DescriptorProto{
		Name: p("record"),
		Field: []*descriptorpb.FieldDescriptorProto{
			{
				Name:     p("c"),
				Number:   p(int32(1)),
				Label:    p(descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL),
				Type:     p(descriptorpb.FieldDescriptorProto_TYPE_INT64),
				JsonName: p("c"),
			},
		},
	}

	fds := &descriptorpb.FileDescriptorSet{
		File: []*descriptorpb.FileDescriptorProto{
			{
				Name:        p("protos/record.proto"),
				Package:     p("main"),
				MessageType: []*descriptorpb.DescriptorProto{descrProto},
				Syntax:      p("proto3"),
			},
		},
	}

	files, err := protodesc.NewFiles(fds)
	if err != nil {
		return nil, nil, err
	}

	desc, err := files.FindDescriptorByName(protoreflect.FullName("main.record"))
	if err != nil {
		return nil, nil, err
	}

	msgDesc, ok := desc.(protoreflect.MessageDescriptor)
	if !ok {
		return nil, nil, errors.New("unable to assert into MessageDescriptor")
	}

	return descrProto, msgDesc, nil
}

func getMessage(msgDesc protoreflect.MessageDescriptor, c int64) ([]byte, error) {
	rqst := dynamicpb.NewMessage(msgDesc)
	fd := rqst.Descriptor().Fields().ByName(protoreflect.Name("c"))
	rqst.Set(fd, protoreflect.ValueOfInt64(c))

	data, err := proto.Marshal(rqst)
	if err != nil {
		return nil, err
	}

	return data, nil
}

func p[T any](t T) *T {
	p := new(T)
	*p = t
	return p
}

We ran this with the "race" flag of golang.

Expected behavior

No data race is detected.

Actual behavior

go reports the following data race:

==================
WARNING: DATA RACE
Write at 0x00c00278a928 by goroutine 3552:
  cloud.google.com/go/bigquery/storage/managedwriter.(*connection).getStream()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:485 +0x76b
  cloud.google.com/go/bigquery/storage/managedwriter.(*connection).lockingAppend()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:404 +0x851
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).appendWithRetry()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:210 +0x2ca
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).processRetry()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:368 +0x1a8
  cloud.google.com/go/bigquery/storage/managedwriter.connRecvProcessor()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:537 +0x6d2
  cloud.google.com/go/bigquery/storage/managedwriter.(*connectionPool).openWithRetry.func1()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:172 +0xdc

Previous read at 0x00c00278a928 by goroutine 3550:
  cloud.google.com/go/bigquery/storage/managedwriter.(*connection).lockingAppend.func2()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:447 +0x5e
  cloud.google.com/go/bigquery/storage/managedwriter.(*connection).lockingAppend.func1()
      /.../cloud.google.com/go/bigquery/storage/managedwriter/connection.go:367 +0x81
  runtime.deferreturn()
      /usr/local/go/src/runtime/panic.go:477 +0x30
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).appendWithRetry()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:210 +0x2ca
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).AppendRows.func1()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:322 +0xcb

Goroutine 3552 (running) created at:
  cloud.google.com/go/bigquery/storage/managedwriter.(*connectionPool).openWithRetry()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:172 +0xc3d
  cloud.google.com/go/bigquery/storage/managedwriter.(*connection).getStream()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:493 +0x965
  cloud.google.com/go/bigquery/storage/managedwriter.(*connection).lockingAppend()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:404 +0x851
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).appendWithRetry()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:210 +0x2ca
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).AppendRows.func1()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:322 +0xcb

Goroutine 3550 (finished) created at:
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).AppendRows()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:320 +0x7db

See
https://github.com/googleapis/google-cloud-go/blob/bigquery/v1.58.0/bigquery/storage/managedwriter/connection.go#L485
https://github.com/googleapis/google-cloud-go/blob/bigquery/v1.58.0/bigquery/storage/managedwriter/connection.go#L447

@sstemmle sstemmle added the triage me I really want to be triaged. label Jan 25, 2024
@quartzmo quartzmo added the api: bigquery Issues related to the BigQuery API. label Jan 25, 2024
@shollyman
Copy link
Contributor

Thanks for the report, I'll take a look!

@noahdietz noahdietz added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p3 Desirable enhancement or fix. May not be included in next release. and removed triage me I really want to be triaged. labels Jan 30, 2024
@noahdietz
Copy link
Contributor

I did a simple triaging, please feel free to change the priority as necessary.

shollyman added a commit to shollyman/google-cloud-go that referenced this issue Feb 3, 2024
This PR adds a non-assertive test which helps expose data
races by doing a lot of concurrent write operations on a single
ManagedStream instance.

As a byproduct, this cleans up two possible races:  In the first,
a deferred function may incorrectly access a retained context.
We change this to grab a reference to the context in the defer
where we still retain the lock.

In the second, the retry mechanism leverages math/rand and retry
processing can yield concurrent usage of the random number source.
PR adds a mutex guard to the source.

Fixes: googleapis#9301
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the BigQuery API. priority: p3 Desirable enhancement or fix. May not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants