Skip to content

Commit

Permalink
sql: initialize with a limited number of table leases
Browse files Browse the repository at this point in the history
This will prevent initial requests hitting a server from
blocking on lease acquisition.

related to cockroachdb#23510

Release note: Fixed slowness caused by table lease acquisition
at startup.
  • Loading branch information
vivekmenezes committed Aug 27, 2018
1 parent e7d570e commit 69028fd
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
<tr><td><code>sql.metrics.statement_details.dump_to_logs</code></td><td>boolean</td><td><code>false</code></td><td>dump collected statement statistics to node logs when periodically cleared</td></tr>
<tr><td><code>sql.metrics.statement_details.enabled</code></td><td>boolean</td><td><code>true</code></td><td>collect per-statement query statistics</td></tr>
<tr><td><code>sql.metrics.statement_details.threshold</code></td><td>duration</td><td><code>0s</code></td><td>minimum execution time to cause statistics to be collected</td></tr>
<tr><td><code>sql.tablecache.lease.init_limit</code></td><td>integer</td><td><code>1000</code></td><td>maximum number of table leases to be created at startup</td></tr>
<tr><td><code>sql.tablecache.lease.refresh_limit</code></td><td>integer</td><td><code>50</code></td><td>maximum number of tables to periodically refresh leases for</td></tr>
<tr><td><code>sql.trace.log_statement_execute</code></td><td>boolean</td><td><code>false</code></td><td>set to true to enable logging of executed statements</td></tr>
<tr><td><code>sql.trace.session_eventlog.enabled</code></td><td>boolean</td><td><code>false</code></td><td>set to true to enable session tracing</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {

s.leaseMgr.SetExecCfg(&execCfg)
s.leaseMgr.RefreshLeases(s.stopper, s.db, s.gossip)
s.leaseMgr.InitTableLeases()
s.leaseMgr.PeriodicallyRefreshSomeLeases()

s.node.InitLogger(&execCfg)
Expand Down
80 changes: 80 additions & 0 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,10 @@ type LeaseManagerTestingKnobs struct {
// A callback called after the leases are refreshed as a result of a gossip update.
TestingLeasesRefreshedEvent func(config.SystemConfig)

// LeaseInitCompletionEvent is called after the lease initialization
// is complete.
LeaseInitCompletionEvent func()

LeaseStoreTestingKnobs LeaseStoreTestingKnobs
}

Expand Down Expand Up @@ -1752,3 +1756,79 @@ func (m *LeaseManager) refreshSomeLeases(ctx context.Context) {
}
wg.Wait()
}

// initTableLeasesLimit is the upper-limit on the number of table leases
// that are created at startup.
var initTableLeasesLimit = settings.RegisterIntSetting(
"sql.tablecache.lease.init_limit",
"maximum number of table leases to be created at startup",
1000,
)

// InitTableLeases initializes the table cache with some table leases
// so that when traffic hits the node, the traffic is not blocked on
// lease acquisition.
// TODO(vivek): Figure out how to guarantee that traffic will not be
// sent to this node before this method has acquired all leases.
func (m *LeaseManager) InitTableLeases() {
limit := initTableLeasesLimit.Get(&m.execCfg.Settings.SV)
if limit <= 0 {
return
}

// Run async task so that startup is not blocked on this.
if err := m.stopper.RunAsyncTask(
context.Background(), "init table leases", func(ctx context.Context) {
if fn := m.testingKnobs.LeaseInitCompletionEvent; fn != nil {
defer fn()
}

// The internal executor cannot function without the
// version initialization.
retryOptions := base.DefaultRetryOptions()
retryOptions.Closer = m.stopper.ShouldQuiesce()
for r := retry.Start(retryOptions); r.Next(); {
if m.execCfg.Settings.Version.HasBeenInitialized() {
break
}
}

// Get a list of a limited number of table ids in descending
// order so that newer ids are given preference.
res, _, err := m.LeaseStore.execCfg.InternalExecutor.Query(
ctx, "list-tables", nil, /*txn*/
`SELECT id FROM system.namespace WHERE id > $1 ORDER BY id DESC LIMIT $2`,
keys.MaxReservedDescID,
limit,
)
if err != nil {
log.Error(ctx, errors.Wrapf(err, "init list-tables failed"))
return
}
log.VEventf(ctx, 2, "init leasing %d tables", len(res))
// Limit the number of concurrent lease acquisitions.
sem := make(chan struct{}, 20)
var wg sync.WaitGroup
for _, cols := range res {
wg.Add(1)
id := sqlbase.ID(tree.MustBeDInt(cols[0]))
if err := m.stopper.RunLimitedAsyncTask(
ctx, fmt.Sprintf("init table:%d lease", id), sem, true /*wait*/, func(ctx context.Context) {
defer wg.Done()
table, _, err := m.Acquire(ctx, m.execCfg.Clock.Now(), id)
if err != nil {
return
}
if err := m.Release(table); err != nil {
log.Warning(ctx, errors.Wrapf(err, "failed to release table %d", id))
}
}); err != nil {
log.Error(context.TODO(), errors.Wrapf(err, "didnt init table lease"))
wg.Done()
}
}
wg.Wait()
}); err != nil {
log.Error(context.TODO(), errors.Wrapf(err, "init table leases failed"))
}
}
73 changes: 73 additions & 0 deletions pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1677,3 +1677,76 @@ CREATE TABLE t.test2 ();
t.Fatalf("expected lease acquisition to not block, but blockCount is: %d", blockCount)
}
}

// This test makes sure leases are created at startup.
func TestInitTableLeases(testingT *testing.T) {
defer leaktest.AfterTest(testingT)()

var testAcquiredCount int32

var initNotify chan struct{}
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLLeaseManager: &sql.LeaseManagerTestingKnobs{
LeaseStoreTestingKnobs: sql.LeaseStoreTestingKnobs{
// We want to track when leases get acquired and renewed.
LeaseAcquiredEvent: func(table sqlbase.TableDescriptor, _ error) {
if table.ID > keys.MaxReservedDescID {
atomic.AddInt32(&testAcquiredCount, 1)
}
},
},
LeaseInitCompletionEvent: func() {
if notify := initNotify; notify != nil {
initNotify = nil
close(notify)
}

},
},
}

ctx := context.Background()
t := newLeaseTest(testingT, params)
defer t.cleanup()

if _, err := t.db.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test1 (k CHAR PRIMARY KEY, v CHAR);
CREATE TABLE t.test2 ();
`); err != nil {
t.Fatal(err)
}

test2Desc := sqlbase.GetTableDescriptor(t.kvDB, "t", "test2")
dbID := test2Desc.ParentID

initNotify = make(chan struct{})
notify := initNotify
t.node(1).InitTableLeases()
<-notify

atomic.StoreInt32(&testAcquiredCount, 0)

// Acquire a lease on test1 by name.
ts1, _, err := t.node(1).AcquireByName(ctx, t.server.Clock().Now(), dbID, "test1")
if err != nil {
t.Fatal(err)
} else if err := t.release(1, ts1); err != nil {
t.Fatal(err)
}

// Acquire a lease on test2 by ID.
ts2, _, err := t.node(1).Acquire(ctx, t.server.Clock().Now(), test2Desc.ID)
if err != nil {
t.Fatal(err)
} else if err := t.release(1, ts2); err != nil {
t.Fatal(err)
}

// Ensure that no new leases were acquired.
if count := atomic.LoadInt32(&testAcquiredCount); count != 0 {
t.Fatalf("expected 0 leases to be acquired, but acquired %d times",
count)
}
}

0 comments on commit 69028fd

Please sign in to comment.