Skip to content

Commit f813ebd

Browse files
author
Chris Stockton
committed
feat: support percentage based db limits with reload support
**Summary** Introduce a context aware DB dial path, a new `ConnPercentage` knob to cap Auth's share of Postgres connections, and background wiring to apply pool changes on config reloads. **Storage / DB** - Add `DialContext(ctx, *conf.GlobalConfiguration)` and keep `Dial(...)` as a thin wrapper. `serve` now passes its cancelable context so startup can't hang indefinitely. - `Connection` now keeps a handle to the underlying `*sql.DB` (via `popConnToStd`) when available. - New helpers: - `newConnectionDetails` and `applyDBDriver` to build `pop.ConnectionDetails` and derive driver when omitted. - `Connection.Copy()` to retain `sqldb` reference and updated locations that copy (`WithContext, Transaction)`. - Runtime tuning API: `(*Connection).ApplyConfig(ctx, cfg, le)` computes and applies connection limits to the underlying `*sql.DB`. - Fixed limits come from `MaxPoolSize`, `MaxIdlePoolSize`, `ConnMaxLifetime`, `ConnMaxIdleTime`. - If `ConnPercentage` is set (1-100), compute limits from `SHOW max_connections`, prefer percentage over fixed pool sizes, and set idle = open. - Retains previous behavior when `ConnPercentage` is `0` - No-op (and error) if `*sql.DB` is unavailable. **API worker** - `apiworker.New` now accepts the DB connection. - Split worker into three goroutines (via `errgroup`): - `configNotifier` fans out reload signals, - `templateWorker` refreshes template cache, - `dbWorker` applies DB connection limits on boot and each reload. **Serve** - Use `storage.DialContext(ctx, cfg)` and then `db = db.WithContext(ctx)` so the DB handle participates in request/trace context and shutdown. **Observability** - Add `observability.NewLogEntry(*logrus.Entry)` to construct chi middleware log entries. - Structured logs around applying DB limits. **Configuration knobs** (`GOTRUE_DB_*`) - `GOTRUE_DB_CONN_PERCENTAGE` (int, clamped to `[0,100]`): - `0` (default) disables percentage-based sizing. - `1-100` reserves that % of `max_connections` for the Auth server. **Tests** - `internal/storage/dial_test.go` - `DialContext` happy path and invalid driver/URL error path. - Reflection bridge to `*sql.DB` (`popConnToStd`) including `WithContext`-wrapped connection behavior. - `ApplyConfig` end-to-end: verify pool sizing and stats reflect limits. - Percentage math and precedence vs fixed pools across edge cases. - `internal/conf/configuration_test.go` - Validation clamps `ConnPercentage` to `[0,100]`.
1 parent 0bd1c28 commit f813ebd

File tree

7 files changed

+774
-41
lines changed

7 files changed

+774
-41
lines changed

cmd/serve_cmd.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ func serve(ctx context.Context) {
4444
logrus.WithError(err).Fatal("unable to load config")
4545
}
4646

47-
db, err := storage.Dial(config)
47+
// Include serve ctx which carries cancelation signals so DialContext does
48+
// not hang indefinitely at startup.
49+
db, err := storage.DialContext(ctx, config)
4850
if err != nil {
4951
logrus.Fatalf("error opening database: %+v", err)
5052
}
@@ -53,6 +55,10 @@ func serve(ctx context.Context) {
5355
baseCtx, baseCancel := context.WithCancel(context.Background())
5456
defer baseCancel()
5557

58+
// Add the base context to the db, this is so during the shutdown sequence
59+
// the DB will be available while connections drain.
60+
db = db.WithContext(ctx)
61+
5662
var wg sync.WaitGroup
5763
defer wg.Wait() // Do not return to caller until this goroutine is done.
5864

@@ -79,7 +85,7 @@ func serve(ctx context.Context) {
7985
log := logrus.WithField("component", "api")
8086

8187
wrkLog := logrus.WithField("component", "apiworker")
82-
wrk := apiworker.New(config, mrCache, wrkLog)
88+
wrk := apiworker.New(config, mrCache, db, wrkLog)
8389
wg.Add(1)
8490
go func() {
8591
defer wg.Done()

internal/api/apiworker/apiworker.go

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@ import (
99
"github.com/sirupsen/logrus"
1010
"github.com/supabase/auth/internal/conf"
1111
"github.com/supabase/auth/internal/mailer/templatemailer"
12+
"github.com/supabase/auth/internal/storage"
13+
"golang.org/x/sync/errgroup"
1214
)
1315

1416
// Worker is a simple background worker for async tasks.
1517
type Worker struct {
1618
le *logrus.Entry
1719
tc *templatemailer.Cache
20+
db *storage.Connection
1821

1922
// Notifies worker the cfg has been updated.
2023
cfgCh chan struct{}
@@ -31,12 +34,14 @@ type Worker struct {
3134
func New(
3235
cfg *conf.GlobalConfiguration,
3336
tc *templatemailer.Cache,
37+
db *storage.Connection,
3438
le *logrus.Entry,
3539
) *Worker {
3640
return &Worker{
3741
le: le,
3842
cfg: cfg,
3943
tc: tc,
44+
db: db,
4045
cfgCh: make(chan struct{}, 1),
4146
}
4247
}
@@ -63,14 +68,85 @@ func (o *Worker) ReloadConfig(cfg *conf.GlobalConfiguration) {
6368
}
6469
}
6570

66-
// Work will periodically reload the templates in the background as long as the
67-
// system remains active.
71+
// Work will run background workers.
6872
func (o *Worker) Work(ctx context.Context) error {
6973
if ok := o.workMu.TryLock(); !ok {
7074
return errors.New("apiworker: concurrent calls to Work are invalid")
7175
}
7276
defer o.workMu.Unlock()
7377

78+
var (
79+
eg errgroup.Group
80+
notifyTpl = make(chan struct{}, 1)
81+
notifyDb = make(chan struct{}, 1)
82+
)
83+
eg.Go(func() error {
84+
return o.configNotifier(ctx, notifyTpl, notifyDb)
85+
})
86+
eg.Go(func() error {
87+
return o.templateWorker(ctx, notifyTpl)
88+
})
89+
eg.Go(func() error {
90+
return o.dbWorker(ctx, notifyDb)
91+
})
92+
return eg.Wait()
93+
}
94+
95+
func (o *Worker) configNotifier(
96+
ctx context.Context,
97+
notifyCh ...chan<- struct{},
98+
) error {
99+
le := o.le.WithFields(logrus.Fields{
100+
"worker_type": "apiworker_config_notifier",
101+
})
102+
le.Info("apiworker: config notifier started")
103+
defer le.Info("apiworker: config notifier exited")
104+
105+
for {
106+
select {
107+
case <-ctx.Done():
108+
return ctx.Err()
109+
case <-o.cfgCh:
110+
111+
// When we get a config update, notify each worker to wake up
112+
for _, ch := range notifyCh {
113+
select {
114+
case ch <- struct{}{}:
115+
default:
116+
}
117+
}
118+
}
119+
}
120+
}
121+
122+
func (o *Worker) dbWorker(ctx context.Context, cfgCh <-chan struct{}) error {
123+
le := o.le.WithFields(logrus.Fields{
124+
"worker_type": "apiworker_db_worker",
125+
})
126+
le.Info("apiworker: db worker started")
127+
defer le.Info("apiworker: db worker exited")
128+
129+
if err := o.db.ApplyConfig(ctx, o.getConfig(), le); err != nil {
130+
le.WithError(err).Error(
131+
"failure applying config connection limits to db")
132+
}
133+
134+
for {
135+
select {
136+
case <-ctx.Done():
137+
return ctx.Err()
138+
case <-cfgCh:
139+
if err := o.db.ApplyConfig(ctx, o.getConfig(), le); err != nil {
140+
le.WithError(err).Error(
141+
"failure applying config connection limits to db")
142+
}
143+
}
144+
}
145+
}
146+
147+
// templateWorker will periodically reload the templates in the background as
148+
// long as the system remains active.
149+
func (o *Worker) templateWorker(ctx context.Context, cfgCh <-chan struct{}) error {
74150
le := o.le.WithFields(logrus.Fields{
75151
"worker_type": "apiworker_template_cache",
76152
})
@@ -91,7 +167,7 @@ func (o *Worker) Work(ctx context.Context) error {
91167
select {
92168
case <-ctx.Done():
93169
return ctx.Err()
94-
case <-o.cfgCh:
170+
case <-cfgCh:
95171
tr.Reset(ival())
96172
case <-tr.C:
97173
}

internal/conf/configuration.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ type DBConfiguration struct {
9898
Driver string `json:"driver" required:"true"`
9999
URL string `json:"url" envconfig:"DATABASE_URL" required:"true"`
100100
Namespace string `json:"namespace" envconfig:"DB_NAMESPACE" default:"auth"`
101+
102+
// Percentage of DB conns the auth server may use in
103+
// integer form i.e.: [1, 100] -> [1%, 100%]
104+
ConnPercentage int `json:"conn_percentage" split_words:"true"`
105+
101106
// MaxPoolSize defaults to 0 (unlimited).
102107
MaxPoolSize int `json:"max_pool_size" split_words:"true"`
103108
MaxIdlePoolSize int `json:"max_idle_pool_size" split_words:"true"`
@@ -109,6 +114,7 @@ type DBConfiguration struct {
109114
}
110115

111116
func (c *DBConfiguration) Validate() error {
117+
c.ConnPercentage = min(max(c.ConnPercentage, 0), 100)
112118
return nil
113119
}
114120

internal/conf/configuration_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,30 @@ func TestGlobal(t *testing.T) {
262262
err := populateGlobal(cfg)
263263
require.NoError(t, err)
264264
}
265+
266+
// ConnPercentage
267+
{
268+
tests := []struct {
269+
from int
270+
exp int
271+
}{
272+
{-2, 0},
273+
{-1, 0},
274+
{0, 0},
275+
{1, 1},
276+
{25, 25},
277+
{99, 99},
278+
{100, 100},
279+
{101, 100},
280+
{102, 100},
281+
}
282+
for _, test := range tests {
283+
cfg := &DBConfiguration{ConnPercentage: test.from}
284+
err := cfg.Validate()
285+
require.NoError(t, err)
286+
require.Equal(t, test.exp, cfg.ConnPercentage)
287+
}
288+
}
265289
}
266290

267291
func TestPasswordRequiredCharactersDecode(t *testing.T) {

internal/observability/request-logger.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ type logEntry struct {
7373
Entry *logrus.Entry
7474
}
7575

76+
// NewLogEntry returns a new chimiddleware.LogEntry from a *logrus.Entry.
77+
func NewLogEntry(le *logrus.Entry) chimiddleware.LogEntry {
78+
return &logEntry{le}
79+
}
80+
7681
func (e *logEntry) Write(status, bytes int, header http.Header, elapsed time.Duration, extra interface{}) {
7782
fields := logrus.Fields{
7883
"status": status,

0 commit comments

Comments
 (0)