Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
98368: multiregionccl,server: use cached sqlliveness.Reader, deflake ColdStartLatencyTest r=ajwerner a=ajwerner

#### multitenantccl: deflake ColdStartLatencyTest
This test was flakey due to the closed timestamp sometimes not leading far
for global tables due to overload, and due to a cached liveness reader
not being used in distsql. The former was fixed in previous commits. The
latter is fixed here.

Fixes: #96334


#### sql: use CachedReader for uses with sqlinstance and the sql builtins
The CachedReader won't block, which in multi-region clusters is good. It will
mean that in some cases, it'll state that a sessions is alive when it most
certainly is not. Currently, nobody needs synchronous semantics.

This is a major part of fixing the TestColdStartLatency as sometimes
distsql planning would block. That's not acceptable -- the idea that
query physical planning can need to wait for a cross-region RPC is
unacceptable.

#### sqlliveness: re-arrange APIs to clarify when the API was blocking

By "default" the implementation of Reader was blocking and there was a method
to get a handle to a non-blocking CachedReader(). This asymmetry does not aid
understandability. The non-blocking reader came later, but it is generally the
more desirable interface.

Release note: None

98519: storage: More CheckSSTConflicts fixes r=erikgrinaker a=itsbilal

A few additional fixes around CheckSSTConflicts, stats calculations, and Next()ing logic, caught by kvnemesis. Hopefully the last of its kind.

Also re-enable kvnemesis testing for range keys in AddSSTable, reverting #98475.

Fixes #94141.
Fixes #98473.
Informs #94876.

Epic: none

Release note: None

98567: backupccl: use correct version gate for restore checkpointing r=adityamaru a=msbutler

PR #97862 introduced a subtle bug which allowed the new restore checkpointing policy to take effect before the 23_1 migrations occured. This patch ensures the new policy only takes effect after all migrations occur.

Release note: None

Epic: None

Co-authored-by: ajwerner <awerner32@gmail.com>
Co-authored-by: Bilal Akhtar <bilal@cockroachlabs.com>
Co-authored-by: Michael Butler <butler@cockroachlabs.com>
  • Loading branch information
4 people committed Mar 14, 2023
4 parents 0205bfe + 358f86d + 9ec7760 + 46e55ac commit ca5ae38
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 56 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1811,7 +1811,7 @@ func TestBackupRestoreResume(t *testing.T) {
},
},
// Required because restore checkpointing is version gated.
clusterversion.ByKey(clusterversion.V23_1Start),
clusterversion.ByKey(clusterversion.V23_1),
)
// If the restore properly took the (incorrect) low-water mark into account,
// the first half of the table will be missing.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func restore(
return emptyRowCount, err
}

on231 := clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion)
on231 := clusterversion.ByKey(clusterversion.V23_1).LessEq(job.Payload().CreationClusterVersion)
progressTracker, err := makeProgressTracker(
dataToRestore.getSpans(),
job.Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint,
Expand Down
11 changes: 10 additions & 1 deletion pkg/ccl/multiregionccl/cold_start_latency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
func TestColdStartLatency(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 96334)
skip.UnderRace(t, "too slow")
skip.UnderStress(t, "too slow")
defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")()
Expand Down Expand Up @@ -146,7 +145,17 @@ func TestColdStartLatency(t *testing.T) {
}
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(1))

// Shorten the closed timestamp target duration so that span configs
// propagate more rapidly.
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '200ms'`)
// Lengthen the lead time for the global tables to prevent overload from
// resulting in delays in propagating closed timestamps and, ultimately
// forcing requests from being redirected to the leaseholder. Without this
// change, the test sometimes is flakey because the latency budget allocated
// to closed timestamp propagation proves to be insufficient. This value is
// very cautious, and makes this already slow test even slower.
tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '50 ms'")
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.lead_for_global_reads_override = '1500ms'`)
tdb.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING spanconfig.reconciliation_job.checkpoint_interval = '500ms'`)

applyGlobalTables := func(t *testing.T, db *gosql.DB, isTenant bool) {
Expand Down
7 changes: 1 addition & 6 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,12 +525,7 @@ func randAddSSTable(g *generator, rng *rand.Rand) Operation {
probTombstone := 0.2 // probability to write a tombstone
asWrites := rng.Float64() < 0.2 // IngestAsWrites

if true {
// TODO(erikgrinaker): Disable range keys for now since CheckSSTConflicts
// computes incorrect MVCC stats. See:
// https://github.com/cockroachdb/cockroach/issues/98473
numRangeKeys = 0
} else if r := rng.Float64(); r < 0.8 {
if r := rng.Float64(); r < 0.8 {
// 80% probability of only point keys.
numRangeKeys = 0
} else if r < 0.9 {
Expand Down
16 changes: 15 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,20 @@ func TestEvalAddSSTable(t *testing.T) {
sst: kvs{rangeKV("a", "l", 8, "")},
expect: kvs{rangeKV("a", "c", 8, ""), rangeKV("c", "d", 8, ""), rangeKV("c", "d", 6, ""), rangeKV("d", "j", 8, ""), rangeKV("j", "k", 8, ""), rangeKV("j", "k", 5, ""), rangeKV("k", "l", 8, "")},
},
"DisallowConflicts correctly accounts for complex fragment cases 5": {
noConflict: true,
reqTS: 10,
data: kvs{pointKV("cc", 7, ""), pointKV("cc", 6, ""), pointKV("cc", 5, "foo"), pointKV("cc", 4, ""), pointKV("cc", 3, "bar"), pointKV("cc", 2, "barfoo"), rangeKV("ab", "g", 1, "")},
sst: kvs{pointKV("aa", 8, "foo"), pointKV("aaa", 8, ""), pointKV("ac", 8, "foo"), rangeKV("b", "c", 8, ""), pointKV("ca", 8, "foo"), pointKV("cb", 8, "foo"), pointKV("cc", 8, "foo"), rangeKV("d", "e", 8, ""), pointKV("e", 8, "foobar")},
expect: kvs{pointKV("aa", 8, "foo"), pointKV("aaa", 8, ""), rangeKV("ab", "b", 1, ""), pointKV("ac", 8, "foo"), rangeKV("b", "c", 8, ""), rangeKV("b", "c", 1, ""), rangeKV("c", "d", 1, ""), pointKV("ca", 8, "foo"), pointKV("cb", 8, "foo"), pointKV("cc", 8, "foo"), pointKV("cc", 7, ""), pointKV("cc", 6, ""), pointKV("cc", 5, "foo"), pointKV("cc", 4, ""), pointKV("cc", 3, "bar"), pointKV("cc", 2, "barfoo"), rangeKV("d", "e", 8, ""), rangeKV("d", "e", 1, ""), rangeKV("e", "g", 1, ""), pointKV("e", 8, "foobar")},
},
"DisallowConflicts handles existing point key above existing range tombstone": {
noConflict: true,
reqTS: 10,
data: kvs{pointKV("c", 7, ""), rangeKV("a", "g", 6, ""), pointKV("h", 7, "")},
sst: kvs{rangeKV("b", "d", 8, ""), rangeKV("f", "j", 8, "")},
expect: kvs{rangeKV("a", "b", 6, ""), rangeKV("b", "d", 8, ""), rangeKV("b", "d", 6, ""), pointKV("c", 7, ""), rangeKV("d", "f", 6, ""), rangeKV("f", "g", 8, ""), rangeKV("f", "g", 6, ""), rangeKV("g", "j", 8, ""), pointKV("h", 7, "")},
},
"DisallowConflicts accounts for point key already deleted in engine": {
noConflict: true,
reqTS: 10,
Expand Down Expand Up @@ -1024,7 +1038,7 @@ func TestEvalAddSSTable(t *testing.T) {
noConflict: true,
data: kvs{rangeKV("a", "b", 7, "")},
sst: kvs{rangeKV("a", "b", 7, "")},
expectErr: "ingested range key collides with an existing one",
expectErr: &kvpb.WriteTooOldError{},
},
}
testutils.RunTrueAndFalse(t, "IngestAsWrites", func(t *testing.T, ingestAsWrites bool) {
Expand Down
12 changes: 7 additions & 5 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,11 +558,13 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
)

cfg.sqlInstanceStorage = instancestorage.NewStorage(
cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings, cfg.clock, cfg.rangeFeedFactory)
cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings,
cfg.clock, cfg.rangeFeedFactory,
)
cfg.sqlInstanceReader = instancestorage.NewReader(
cfg.sqlInstanceStorage,
cfg.sqlLivenessProvider,
cfg.stopper)
cfg.sqlInstanceStorage, cfg.sqlLivenessProvider.CachedReader(),
cfg.stopper,
)

// We can't use the nodeDailer as the podNodeDailer unless we
// are serving the system tenant despite the fact that we've
Expand Down Expand Up @@ -787,7 +789,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
RowMetrics: &rowMetrics,
InternalRowMetrics: &internalRowMetrics,

SQLLivenessReader: cfg.sqlLivenessProvider,
SQLLivenessReader: cfg.sqlLivenessProvider.CachedReader(),
JobRegistry: jobRegistry,
Gossip: cfg.gossip,
PodNodeDialer: cfg.podNodeDialer,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,7 +1244,7 @@ type ExecutorConfig struct {
MetricsRecorder nodeStatusGenerator
SessionRegistry *SessionRegistry
ClosedSessionCache *ClosedSessionCache
SQLLiveness sqlliveness.Liveness
SQLLiveness sqlliveness.Provider
JobRegistry *jobs.Registry
VirtualSchemas *VirtualSchemaHolder
DistSQLPlanner *DistSQLPlanner
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ func (evalCtx *extendedEvalContext) copyFromExecCfg(execCfg *ExecutorConfig) {
evalCtx.Settings = execCfg.Settings
evalCtx.Codec = execCfg.Codec
evalCtx.Tracer = execCfg.AmbientCtx.Tracer
evalCtx.SQLLivenessReader = execCfg.SQLLiveness
if execCfg.SQLLiveness != nil { // nil in some tests
evalCtx.SQLLivenessReader = execCfg.SQLLiveness.CachedReader()
}
evalCtx.CompactEngineSpan = execCfg.CompactEngineSpanFunc
evalCtx.SetCompactionConcurrency = execCfg.CompactionConcurrencyFunc
evalCtx.TestingKnobs = execCfg.EvalContextTestingKnobs
Expand Down
35 changes: 22 additions & 13 deletions pkg/sql/sqlliveness/slstorage/slstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var CacheSize = settings.RegisterIntSetting(
1024)

// Storage deals with reading and writing session records. It implements the
// sqlliveness.Reader interface, and the slinstace.Writer interface.
// sqlliveness.Storage interface, and the slinstace.Writer interface.
type Storage struct {
settings *cluster.Settings
settingsWatcher *settingswatcher.SettingsWatcher
Expand Down Expand Up @@ -110,7 +110,7 @@ type Storage struct {
}
}

var _ sqlliveness.Reader = &Storage{}
var _ sqlliveness.StorageReader = &Storage{}

// NewTestingStorage constructs a new storage with control for the database
// in which the `sqlliveness` table should exist.
Expand Down Expand Up @@ -186,13 +186,6 @@ func (s *Storage) Start(ctx context.Context) {
s.mu.started = true
}

// IsAlive determines whether a given session is alive. If this method returns
// true, the session may no longer be alive, but if it returns false, the
// session definitely is not alive.
func (s *Storage) IsAlive(ctx context.Context, sid sqlliveness.SessionID) (alive bool, err error) {
return s.isAlive(ctx, sid, sync)
}

type readType byte

const (
Expand Down Expand Up @@ -580,15 +573,31 @@ func (s *Storage) Update(
// currently known state of the session, but will trigger an asynchronous
// refresh of the state of the session if it is not known.
func (s *Storage) CachedReader() sqlliveness.Reader {
return (*cachedStorage)(s)
return (*cachedReader)(s)
}

// BlockingReader reader returns an implementation of sqlliveness.Reader which
// will cache results of previous reads but will synchronously block to
// determine the status of a session which it does not know about or thinks
// might be expired.
func (s *Storage) BlockingReader() sqlliveness.Reader {
return (*blockingReader)(s)
}

type blockingReader Storage

func (s *blockingReader) IsAlive(
ctx context.Context, sid sqlliveness.SessionID,
) (alive bool, err error) {
return (*Storage)(s).isAlive(ctx, sid, sync)
}

// cachedStorage implements the sqlliveness.Reader interface, and the
// cachedReader implements the sqlliveness.Reader interface, and the
// slinstace.Writer interface, but does not read from the underlying store
// synchronously during IsAlive.
type cachedStorage Storage
type cachedReader Storage

func (s *cachedStorage) IsAlive(
func (s *cachedReader) IsAlive(
ctx context.Context, sid sqlliveness.SessionID,
) (alive bool, err error) {
return (*Storage)(s).isAlive(ctx, sid, async)
Expand Down
34 changes: 19 additions & 15 deletions pkg/sql/sqlliveness/slstorage/slstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func testStorage(t *testing.T) {
clock, _, _, stopper, storage := setup(t)
storage.Start(ctx)
defer stopper.Stop(ctx)
reader := storage.BlockingReader()

exp := clock.Now().Add(time.Second.Nanoseconds(), 0)
id, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4())
Expand All @@ -100,14 +101,14 @@ func testStorage(t *testing.T) {
require.Equal(t, int64(1), metrics.WriteSuccesses.Count())
}
{
isAlive, err := storage.IsAlive(ctx, id)
isAlive, err := reader.IsAlive(ctx, id)
require.NoError(t, err)
require.True(t, isAlive)
require.Equal(t, int64(1), metrics.IsAliveCacheMisses.Count())
require.Equal(t, int64(0), metrics.IsAliveCacheHits.Count())
}
{
isAlive, err := storage.IsAlive(ctx, id)
isAlive, err := reader.IsAlive(ctx, id)
require.NoError(t, err)
require.True(t, isAlive)
require.Equal(t, int64(1), metrics.IsAliveCacheMisses.Count())
Expand All @@ -119,6 +120,7 @@ func testStorage(t *testing.T) {
defer stopper.Stop(ctx)
slstorage.GCJitter.Override(ctx, &settings.SV, 0)
storage.Start(ctx)
reader := storage.BlockingReader()
metrics := storage.Metrics()

// GC will run some time after startup.
Expand Down Expand Up @@ -148,10 +150,10 @@ func testStorage(t *testing.T) {

// Verify they are alive.
{
isAlive1, err := storage.IsAlive(ctx, id1)
isAlive1, err := reader.IsAlive(ctx, id1)
require.NoError(t, err)
require.True(t, isAlive1)
isAlive2, err := storage.IsAlive(ctx, id2)
isAlive2, err := reader.IsAlive(ctx, id2)
require.NoError(t, err)
require.True(t, isAlive2)
require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count())
Expand All @@ -168,7 +170,7 @@ func testStorage(t *testing.T) {

// Ensure that the cached value is still in use for id2.
{
isAlive, err := storage.IsAlive(ctx, id2)
isAlive, err := reader.IsAlive(ctx, id2)
require.NoError(t, err)
require.True(t, isAlive)
require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count())
Expand All @@ -194,15 +196,15 @@ func testStorage(t *testing.T) {

// Ensure that we now see the id1 as dead. That fact will be cached.
{
isAlive, err := storage.IsAlive(ctx, id1)
isAlive, err := reader.IsAlive(ctx, id1)
require.NoError(t, err)
require.False(t, isAlive)
require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count())
require.Equal(t, int64(2), metrics.IsAliveCacheHits.Count())
}
// Ensure that the fact that it's dead is cached.
{
isAlive, err := storage.IsAlive(ctx, id1)
isAlive, err := reader.IsAlive(ctx, id1)
require.NoError(t, err)
require.False(t, isAlive)
require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count())
Expand All @@ -218,15 +220,15 @@ func testStorage(t *testing.T) {

// Ensure that we now see the id2 as alive.
{
isAlive, err := storage.IsAlive(ctx, id2)
isAlive, err := reader.IsAlive(ctx, id2)
require.NoError(t, err)
require.True(t, isAlive)
require.Equal(t, int64(3), metrics.IsAliveCacheMisses.Count())
require.Equal(t, int64(3), metrics.IsAliveCacheHits.Count())
}
// Ensure that the fact that it's still alive is cached.
{
isAlive, err := storage.IsAlive(ctx, id1)
isAlive, err := reader.IsAlive(ctx, id1)
require.NoError(t, err)
require.False(t, isAlive)
require.Equal(t, int64(3), metrics.IsAliveCacheMisses.Count())
Expand All @@ -237,6 +239,7 @@ func testStorage(t *testing.T) {
clock, timeSource, _, stopper, storage := setup(t)
defer stopper.Stop(ctx)
storage.Start(ctx)
reader := storage.BlockingReader()

exp := clock.Now().Add(time.Second.Nanoseconds(), 0)
id, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4())
Expand All @@ -248,7 +251,7 @@ func testStorage(t *testing.T) {
require.Equal(t, int64(1), metrics.WriteSuccesses.Count())
}
{
isAlive, err := storage.IsAlive(ctx, id)
isAlive, err := reader.IsAlive(ctx, id)
require.NoError(t, err)
require.True(t, isAlive)
require.Equal(t, int64(1), metrics.IsAliveCacheMisses.Count())
Expand All @@ -259,7 +262,7 @@ func testStorage(t *testing.T) {
timeSource.Advance(time.Second + time.Nanosecond)
// Ensure that we discover it is no longer alive.
{
isAlive, err := storage.IsAlive(ctx, id)
isAlive, err := reader.IsAlive(ctx, id)
require.NoError(t, err)
require.False(t, isAlive)
require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count())
Expand All @@ -268,7 +271,7 @@ func testStorage(t *testing.T) {
}
// Ensure that the fact that it is no longer alive is cached.
{
isAlive, err := storage.IsAlive(ctx, id)
isAlive, err := reader.IsAlive(ctx, id)
require.NoError(t, err)
require.False(t, isAlive)
require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count())
Expand Down Expand Up @@ -348,6 +351,7 @@ func testConcurrentAccessesAndEvictions(t *testing.T) {
storage := slstorage.NewTestingStorage(ambientCtx, stopper, clock, kvDB, keys.SystemSQLCodec, settings,
s.SettingsWatcher().(*settingswatcher.SettingsWatcher), table, timeSource.NewTimer)
storage.Start(ctx)
reader := storage.BlockingReader()

const (
runsPerWorker = 100
Expand Down Expand Up @@ -446,7 +450,7 @@ func testConcurrentAccessesAndEvictions(t *testing.T) {
for i := 0; i < runsPerWorker; i++ {
time.Sleep(time.Microsecond)
i, id := pickSession()
isAlive, err := storage.IsAlive(ctx, id)
isAlive, err := reader.IsAlive(ctx, id)
assert.NoError(t, err)
checkIsAlive(t, i, isAlive)
}
Expand Down Expand Up @@ -603,7 +607,7 @@ func testConcurrentAccessSynchronization(t *testing.T) {
// Now launch another, synchronous reader, which will join
// the single-flight.
g.Go(func() (err error) {
alive, err = storage.IsAlive(ctx, sid)
alive, err = storage.BlockingReader().IsAlive(ctx, sid)
return err
})
// Sleep some tiny amount of time to hopefully allow the other
Expand Down Expand Up @@ -648,7 +652,7 @@ func testConcurrentAccessSynchronization(t *testing.T) {
// Now launch another, synchronous reader, which will join
// the single-flight.
g.Go(func() (err error) {
alive, err = storage.IsAlive(toCancel, sid)
alive, err = storage.BlockingReader().IsAlive(toCancel, sid)
return err
})

Expand Down
Loading

0 comments on commit ca5ae38

Please sign in to comment.