Skip to content

Commit

Permalink
database/sql: optimize connection request pool
Browse files Browse the repository at this point in the history
This replaces a map used as a set with a slice.

We were using a surprising amount of CPU in this code, making mapiters
to pull out a random element of the map. Instead, just rand.IntN to pick
a random element of the slice.

It also adds a benchmark:

                     │    before    │                after                │
                     │    sec/op    │   sec/op     vs base                │
    ConnRequestSet-8   1818.0n ± 0%   452.4n ± 0%  -75.12% (p=0.000 n=10)

(whether random is a good policy is a bigger question, but this
 optimizes the current policy without changing behavior)

Updates #66361

Change-Id: I3d456a819cc720c2d18e1befffd2657e5f50f1e7
Reviewed-on: https://go-review.googlesource.com/c/go/+/572119
Reviewed-by: Cherry Mui <cherryyz@google.com>
Reviewed-by: Ian Lance Taylor <iant@google.com>
Reviewed-by: Emmanuel Odeke <emmanuel@orijtech.com>
Reviewed-by: David Chase <drchase@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Auto-Submit: Brad Fitzpatrick <bradfitz@golang.org>
  • Loading branch information
bradfitz authored and gopherbot committed Mar 18, 2024
1 parent 1ed85ee commit 190d0d3
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 38 deletions.
163 changes: 128 additions & 35 deletions src/database/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"io"
"math/rand/v2"
"reflect"
"runtime"
"sort"
Expand Down Expand Up @@ -497,9 +498,8 @@ type DB struct {

mu sync.Mutex // protects following fields
freeConn []*driverConn // free connections ordered by returnedAt oldest to newest
connRequests map[uint64]chan connRequest
nextRequest uint64 // Next key to use in connRequests.
numOpen int // number of opened and pending open connections
connRequests connRequestSet
numOpen int // number of opened and pending open connections
// Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection)
Expand Down Expand Up @@ -814,11 +814,10 @@ func (t dsnConnector) Driver() driver.Driver {
func OpenDB(c driver.Connector) *DB {
ctx, cancel := context.WithCancel(context.Background())
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
lastPut: make(map[*driverConn]string),
connRequests: make(map[uint64]chan connRequest),
stop: cancel,
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
lastPut: make(map[*driverConn]string),
stop: cancel,
}

go db.connectionOpener(ctx)
Expand Down Expand Up @@ -922,9 +921,7 @@ func (db *DB) Close() error {
}
db.freeConn = nil
db.closed = true
for _, req := range db.connRequests {
close(req)
}
db.connRequests.CloseAndRemoveAll()
db.mu.Unlock()
for _, fn := range fns {
err1 := fn()
Expand Down Expand Up @@ -1223,7 +1220,7 @@ func (db *DB) Stats() DBStats {
// If there are connRequests and the connection limit hasn't been reached,
// then tell the connectionOpener to open new connections.
func (db *DB) maybeOpenNewConnections() {
numRequests := len(db.connRequests)
numRequests := db.connRequests.Len()
if db.maxOpen > 0 {
numCanOpen := db.maxOpen - db.numOpen
if numRequests > numCanOpen {
Expand Down Expand Up @@ -1297,14 +1294,6 @@ type connRequest struct {

var errDBClosed = errors.New("sql: database is closed")

// nextRequestKeyLocked returns the next connection request key.
// It is assumed that nextRequest will not overflow.
func (db *DB) nextRequestKeyLocked() uint64 {
next := db.nextRequest
db.nextRequest++
return next
}

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock()
Expand Down Expand Up @@ -1352,8 +1341,7 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
delHandle := db.connRequests.Add(req)
db.waitCount++
db.mu.Unlock()

Expand All @@ -1365,16 +1353,26 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn
// Remove the connection request and ensure no value has been sent
// on it after removing.
db.mu.Lock()
delete(db.connRequests, reqKey)
deleted := db.connRequests.Delete(delHandle)
db.mu.Unlock()

db.waitDuration.Add(int64(time.Since(waitStart)))

select {
default:
case ret, ok := <-req:
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false)
// If we failed to delete it, that means something else
// grabbed it and is about to send on it.
if !deleted {
// TODO(bradfitz): rather than this best effort select, we
// should probably start a goroutine to read from req. This best
// effort select existed before the change to check 'deleted'.
// But if we know for sure it wasn't deleted and a sender is
// outstanding, we should probably block on req (in a new
// goroutine) to get the connection back.
select {
default:
case ret, ok := <-req:
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false)
}
}
}
return nil, ctx.Err()
Expand Down Expand Up @@ -1530,13 +1528,7 @@ func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
if req, ok := db.connRequests.TakeRandom(); ok {
if err == nil {
dc.inUse = true
}
Expand Down Expand Up @@ -3529,3 +3521,104 @@ func withLock(lk sync.Locker, fn func()) {
defer lk.Unlock() // in case fn panics
fn()
}

// connRequestSet is a set of chan connRequest that's
// optimized for:
//
// - adding an element
// - removing an element (only by the caller who added it)
// - taking (get + delete) a random element
//
// We previously used a map for this but the take of a random element
// was expensive, making mapiters. This type avoids a map entirely
// and just uses a slice.
type connRequestSet struct {
// s are the elements in the set.
s []connRequestAndIndex
}

type connRequestAndIndex struct {
// req is the element in the set.
req chan connRequest

// curIdx points to the current location of this element in
// connRequestSet.s. It gets set to -1 upon removal.
curIdx *int
}

// CloseAndRemoveAll closes all channels in the set
// and clears the set.
func (s *connRequestSet) CloseAndRemoveAll() {
for _, v := range s.s {
close(v.req)
}
s.s = nil
}

// Len returns the length of the set.
func (s *connRequestSet) Len() int { return len(s.s) }

// connRequestDelHandle is an opaque handle to delete an
// item from calling Add.
type connRequestDelHandle struct {
idx *int // pointer to index; or -1 if not in slice
}

// Add adds v to the set of waiting requests.
// The returned connRequestDelHandle can be used to remove the item from
// the set.
func (s *connRequestSet) Add(v chan connRequest) connRequestDelHandle {
idx := len(s.s)
// TODO(bradfitz): for simplicity, this always allocates a new int-sized
// allocation to store the index. But generally the set will be small and
// under a scannable-threshold. As an optimization, we could permit the *int
// to be nil when the set is small and should be scanned. This works even if
// the set grows over the threshold with delete handles outstanding because
// an element can only move to a lower index. So if it starts with a nil
// position, it'll always be in a low index and thus scannable. But that
// can be done in a follow-up change.
idxPtr := &idx
s.s = append(s.s, connRequestAndIndex{v, idxPtr})
return connRequestDelHandle{idxPtr}
}

// Delete removes an element from the set.
//
// It reports whether the element was deleted. (It can return false if a caller
// of TakeRandom took it meanwhile, or upon the second call to Delete)
func (s *connRequestSet) Delete(h connRequestDelHandle) bool {
idx := *h.idx
if idx < 0 {
return false
}
s.deleteIndex(idx)
return true
}

func (s *connRequestSet) deleteIndex(idx int) {
// Mark item as deleted.
*(s.s[idx].curIdx) = -1
// Copy last element, updating its position
// to its new home.
if idx < len(s.s)-1 {
last := s.s[len(s.s)-1]
*last.curIdx = idx
s.s[idx] = last
}
// Zero out last element (for GC) before shrinking the slice.
s.s[len(s.s)-1] = connRequestAndIndex{}
s.s = s.s[:len(s.s)-1]
}

// TakeRandom returns and removes a random element from s
// and reports whether there was one to take. (It returns ok=false
// if the set is empty.)
func (s *connRequestSet) TakeRandom() (v chan connRequest, ok bool) {
if len(s.s) == 0 {
return nil, false
}
pick := rand.IntN(len(s.s))
e := s.s[pick]
s.deleteIndex(pick)
return e.req, true
}
104 changes: 103 additions & 1 deletion src/database/sql/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"math/rand"
"reflect"
"runtime"
"slices"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -2983,7 +2984,7 @@ func TestConnExpiresFreshOutOfPool(t *testing.T) {
return
}
db.mu.Lock()
ct := len(db.connRequests)
ct := db.connRequests.Len()
db.mu.Unlock()
if ct > 0 {
return
Expand Down Expand Up @@ -4803,3 +4804,104 @@ func BenchmarkGrabConn(b *testing.B) {
release(nil)
}
}

func TestConnRequestSet(t *testing.T) {
var s connRequestSet
wantLen := func(want int) {
t.Helper()
if got := s.Len(); got != want {
t.Errorf("Len = %d; want %d", got, want)
}
if want == 0 && !t.Failed() {
if _, ok := s.TakeRandom(); ok {
t.Fatalf("TakeRandom returned result when empty")
}
}
}
reset := func() { s = connRequestSet{} }

t.Run("add-delete", func(t *testing.T) {
reset()
wantLen(0)
dh := s.Add(nil)
wantLen(1)
if !s.Delete(dh) {
t.Fatal("failed to delete")
}
wantLen(0)
if s.Delete(dh) {
t.Error("delete worked twice")
}
wantLen(0)
})
t.Run("take-before-delete", func(t *testing.T) {
reset()
ch1 := make(chan connRequest)
dh := s.Add(ch1)
wantLen(1)
if got, ok := s.TakeRandom(); !ok || got != ch1 {
t.Fatalf("wrong take; ok=%v", ok)
}
wantLen(0)
if s.Delete(dh) {
t.Error("unexpected delete after take")
}
})
t.Run("get-take-many", func(t *testing.T) {
reset()
m := map[chan connRequest]bool{}
const N = 100
var inOrder, backOut []chan connRequest
for range N {
c := make(chan connRequest)
m[c] = true
s.Add(c)
inOrder = append(inOrder, c)
}
if s.Len() != N {
t.Fatalf("Len = %v; want %v", s.Len(), N)
}
for s.Len() > 0 {
c, ok := s.TakeRandom()
if !ok {
t.Fatal("failed to take when non-empty")
}
if !m[c] {
t.Fatal("returned item not in remaining set")
}
delete(m, c)
backOut = append(backOut, c)
}
if len(m) > 0 {
t.Error("items remain in expected map")
}
if slices.Equal(inOrder, backOut) { // N! chance of flaking; N=100 is fine
t.Error("wasn't random")
}
})
}

func BenchmarkConnRequestSet(b *testing.B) {
var s connRequestSet
for range b.N {
for range 16 {
s.Add(nil)
}
for range 8 {
if _, ok := s.TakeRandom(); !ok {
b.Fatal("want ok")
}
}
for range 8 {
s.Add(nil)
}
for range 16 {
if _, ok := s.TakeRandom(); !ok {
b.Fatal("want ok")
}
}
if _, ok := s.TakeRandom(); ok {
b.Fatal("unexpected ok")
}
}
}
5 changes: 3 additions & 2 deletions src/go/build/deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,9 @@ var depsRules = `
# databases
FMT
< database/sql/internal
< database/sql/driver
< database/sql;
< database/sql/driver;
database/sql/driver, math/rand/v2 < database/sql;
# images
FMT, compress/lzw, compress/zlib
Expand Down

0 comments on commit 190d0d3

Please sign in to comment.