Skip to content

Commit

Permalink
sql: introduce and use OptionalLiveness
Browse files Browse the repository at this point in the history
This makes it optional to provide a NodeLiveness instance.

The only uses are in DistSQL planning and jobs. In the former,
it was used only to check whether peers were live; we can just
avoid that check since no remote DistSQL planning occurs in
multi-tenancy Phase 2. For jobs, it was used to decide about job
adoption and cancellation, but again in multi-tenancy there is
at most one SQL server running (for that tenant) and so all jobs should
be adopted unconditionally.

Release note: None
  • Loading branch information
tbg committed May 28, 2020
1 parent e16ca57 commit 6547ad6
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 85 deletions.
6 changes: 6 additions & 0 deletions pkg/jobs/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

// FakeNodeID is a dummy node ID for use in tests. It always stores 1.
Expand Down Expand Up @@ -87,6 +88,11 @@ func (nl *FakeNodeLiveness) GetLivenesses() (out []kvserverpb.Liveness) {
return out
}

// IsLive is unimplemented.
func (nl *FakeNodeLiveness) IsLive(roachpb.NodeID) (bool, error) {
return false, errors.New("FakeNodeLiveness.IsLive is unimplemented")
}

// FakeIncrementEpoch increments the epoch for the node with the specified ID.
func (nl *FakeNodeLiveness) FakeIncrementEpoch(id roachpb.NodeID) {
nl.mu.Lock()
Expand Down
70 changes: 41 additions & 29 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -65,13 +64,6 @@ var (
time.Hour*24*14)
)

// NodeLiveness is the subset of storage.NodeLiveness's interface needed
// by Registry.
type NodeLiveness interface {
Self() (kvserverpb.Liveness, error)
GetLivenesses() []kvserverpb.Liveness
}

// Registry creates Jobs and manages their leases and cancelation.
//
// Job information is stored in the `system.jobs` table. Each node will
Expand Down Expand Up @@ -100,7 +92,7 @@ type NodeLiveness interface {
type Registry struct {
ac log.AmbientContext
stopper *stop.Stopper
nl NodeLiveness
nl sqlbase.OptionalNodeLiveness
db *kv.DB
ex sqlutil.InternalExecutor
clock *hlc.Clock
Expand Down Expand Up @@ -179,7 +171,7 @@ func MakeRegistry(
ac log.AmbientContext,
stopper *stop.Stopper,
clock *hlc.Clock,
nl NodeLiveness,
nl sqlbase.OptionalNodeLiveness,
db *kv.DB,
ex sqlutil.InternalExecutor,
nodeID *base.SQLIDContainer,
Expand Down Expand Up @@ -524,35 +516,43 @@ func (r *Registry) Start(
return nil
}

func (r *Registry) maybeCancelJobs(ctx context.Context, nl NodeLiveness) {
func (r *Registry) maybeCancelJobs(ctx context.Context, nlw sqlbase.OptionalNodeLiveness) {
// Cancel all jobs if the stopper is quiescing.
select {
case <-r.stopper.ShouldQuiesce():
r.cancelAll(ctx)
return
default:
}

nl, ok := nlw.Optional(47892)
if !ok {
// At most one container is running on behalf of a SQL tenant, so it must be
// this one, and there's no point canceling anything.
//
// TODO(ajwerner): don't rely on this. Instead fix this issue:
// https://github.com/cockroachdb/cockroach/issues/47892
return
}
liveness, err := nl.Self()
if err != nil {
if nodeLivenessLogLimiter.ShouldLog() {
log.Warningf(ctx, "unable to get node liveness: %s", err)
}
// Conservatively assume our lease has expired. Abort all jobs.
r.mu.Lock()
defer r.mu.Unlock()
r.cancelAll(ctx)
return
}

r.mu.Lock()
defer r.mu.Unlock()
// If we haven't persisted a liveness record within the leniency
// interval, we'll cancel all of our jobs.
if !liveness.IsLive(r.lenientNow()) {
r.cancelAll(ctx)
r.mu.Lock()
defer r.mu.Unlock()
r.cancelAllLocked(ctx)
r.mu.epoch = liveness.Epoch
return
}

// Finally, we cancel all jobs if the stopper is quiescing.
select {
case <-r.stopper.ShouldQuiesce():
r.cancelAll(ctx)
default:
}
}

// isOrphaned tries to detect if there are no mutations left to be done for the
Expand Down Expand Up @@ -988,7 +988,7 @@ func (r *Registry) adoptionDisabled(ctx context.Context) bool {
}

func (r *Registry) maybeAdoptJob(
ctx context.Context, nl NodeLiveness, randomizeJobOrder bool,
ctx context.Context, nlw sqlbase.OptionalNodeLiveness, randomizeJobOrder bool,
) error {
const stmt = `
SELECT id, payload, progress IS NULL, status
Expand All @@ -1015,7 +1015,10 @@ WHERE status IN ($1, $2, $3, $4, $5) ORDER BY created DESC`
// the empty lease (Lease{}) is always considered expired.
0: {isLive: false},
}
{
// If no liveness is available, adopt all jobs. This is reasonable because this
// only affects SQL tenants, which have at most one SQL server running on their
// behalf at any given time.
if nl, ok := nlw.Optional(47892); ok {
// We subtract the leniency interval here to artificially
// widen the range of times over which the job registry will
// consider the node to be alive. We rely on the fact that
Expand Down Expand Up @@ -1111,9 +1114,10 @@ WHERE status IN ($1, $2, $3, $4, $5) ORDER BY created DESC`
_, runningOnNode := r.mu.jobs[*id]
r.mu.Unlock()

if notLeaseHolder := payload.Lease.NodeID != r.nodeID.DeprecatedNodeID(
multiTenancyIssueNo,
); notLeaseHolder {
// If we're running as a tenant (!ok), then we are the sole SQL server in
// charge of its jobs and ought to adopt all of them. Otherwise, look more
// closely at who is running the job and whether to adopt.
if nodeID, ok := r.nodeID.OptionalNodeID(); ok && nodeID != payload.Lease.NodeID {
// Another node holds the lease on the job, see if we should steal it.
if runningOnNode {
// If we are currently running a job that another node has the lease on,
Expand All @@ -1136,7 +1140,9 @@ WHERE status IN ($1, $2, $3, $4, $5) ORDER BY created DESC`
continue
}
}
// Below we know that this node holds the lease on the job.

// Below we know that this node holds the lease on the job, or that we want
// to adopt it anyway because the leaseholder seems dead.
job := &Job{id: id, registry: r}
resumeCtx, cancel := r.makeCtx()

Expand Down Expand Up @@ -1225,6 +1231,12 @@ func (r *Registry) newLease() *jobspb.Lease {
}

func (r *Registry) cancelAll(ctx context.Context) {
r.mu.Lock()
defer r.mu.Unlock()
r.cancelAllLocked(ctx)
}

func (r *Registry) cancelAllLocked(ctx context.Context) {
r.mu.AssertHeld()
for jobID, cancel := range r.mu.jobs {
log.Warningf(ctx, "job %d: canceling due to liveness failure", jobID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestRegistryResumeExpiredLease(t *testing.T) {
idContainer := base.NewSQLIDContainer(0, &c, true /* exposed */)
ac := log.AmbientContext{Tracer: tracing.NewTracer()}
r := jobs.MakeRegistry(
ac, s.Stopper(), clock, nodeLiveness, db, s.InternalExecutor().(sqlutil.InternalExecutor),
ac, s.Stopper(), clock, sqlbase.MakeOptionalNodeLiveness(nodeLiveness), db, s.InternalExecutor().(sqlutil.InternalExecutor),
idContainer, s.ClusterSettings(), base.DefaultHistogramWindowInterval(), jobs.FakePHS, "",
)
if err := r.Start(ctx, s.Stopper(), cancelInterval, adoptInterval); err != nil {
Expand Down
14 changes: 12 additions & 2 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,18 @@ func TestRegistryCancelation(t *testing.T) {
mClock := hlc.NewManualClock(hlc.UnixNano())
clock := hlc.NewClock(mClock.UnixNano, time.Nanosecond)
registry := MakeRegistry(
log.AmbientContext{}, stopper, clock, nodeLiveness, db, nil /* ex */, base.TestingIDContainer, cluster.NoSettings,
histogramWindowInterval, FakePHS, "")
log.AmbientContext{},
stopper,
clock,
sqlbase.MakeOptionalNodeLiveness(nodeLiveness),
db,
nil, /* ex */
base.TestingIDContainer,
cluster.NoSettings,
histogramWindowInterval,
FakePHS,
"",
)

const cancelInterval = time.Nanosecond
const adoptInterval = time.Duration(math.MaxInt64)
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
_ "github.com/cockroachdb/cockroach/pkg/sql/gcjob" // register jobs declared outside of pkg/sql
"github.com/cockroachdb/cockroach/pkg/sql/pgwire"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -533,7 +534,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
rpcContext: rpcContext,
distSender: distSender,
statusServer: serverpb.MakeOptionalStatusServer(sStatus),
nodeLiveness: nodeLiveness,
nodeLiveness: sqlbase.MakeOptionalNodeLiveness(nodeLiveness),
gossip: gossip.MakeExposedGossip(g),
nodeDialer: nodeDialer,
grpcServer: grpcServer.Server,
Expand Down
25 changes: 17 additions & 8 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
Expand Down Expand Up @@ -101,11 +102,8 @@ type sqlServerOptionalArgs struct {
distSender *kvcoord.DistSender
// statusServer gives access to the Status service.
statusServer serverpb.OptionalStatusServer
// Narrowed down version of *NodeLiveness.
nodeLiveness interface {
jobs.NodeLiveness // jobs uses this
IsLive(roachpb.NodeID) (bool, error) // DistSQLPlanner wants this
}
// Narrowed down version of *NodeLiveness. Used by jobs and DistSQLPlanner
nodeLiveness sqlbase.OptionalNodeLiveness
// Gossip is relied upon by distSQLCfg (execinfra.ServerConfig), the executor
// config, the DistSQL planner, the table statistics cache, the statements
// diagnostics registry, and the lease manager.
Expand Down Expand Up @@ -184,9 +182,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
jobRegistry := cfg.jobRegistry

{
regLiveness := jobs.NodeLiveness(cfg.nodeLiveness)
regLiveness := cfg.nodeLiveness
if testingLiveness := cfg.TestingKnobs.RegistryLiveness; testingLiveness != nil {
regLiveness = testingLiveness.(*jobs.FakeNodeLiveness)
regLiveness = sqlbase.MakeOptionalNodeLiveness(testingLiveness.(*jobs.FakeNodeLiveness))
}
*jobRegistry = *jobs.MakeRegistry(
cfg.AmbientCtx,
Expand Down Expand Up @@ -363,6 +361,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
NodeID: cfg.nodeIDContainer,
}

var isLive func(roachpb.NodeID) (bool, error)
if nl, ok := cfg.nodeLiveness.Optional(47900); ok {
isLive = nl.IsLive
} else {
// We're on a SQL tenant, so this is the only node DistSQL will ever
// schedule on - always returning true is fine.
isLive = func(roachpb.NodeID) (bool, error) {
return true, nil
}
}

*execCfg = sql.ExecutorConfig{
Settings: cfg.Settings,
NodeInfo: nodeInfo,
Expand Down Expand Up @@ -399,7 +408,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
cfg.distSender,
cfg.gossip,
cfg.stopper,
cfg.nodeLiveness,
isLive,
cfg.nodeDialer,
),

Expand Down
32 changes: 9 additions & 23 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider"
Expand Down Expand Up @@ -414,22 +413,6 @@ func (ts *TestServer) Start(params base.TestServerArgs) error {
return ts.Server.Start(ctx)
}

type allErrorsFakeLiveness struct{}

var _ jobs.NodeLiveness = (*allErrorsFakeLiveness)(nil)

func (allErrorsFakeLiveness) Self() (kvserverpb.Liveness, error) {
return kvserverpb.Liveness{}, errors.New("fake liveness")

}
func (allErrorsFakeLiveness) GetLivenesses() []kvserverpb.Liveness {
return nil
}

func (allErrorsFakeLiveness) IsLive(roachpb.NodeID) (bool, error) {
return false, errors.New("fake liveness")
}

type dummyProtectedTSProvider struct {
protectedts.Provider
}
Expand All @@ -438,6 +421,11 @@ func (d dummyProtectedTSProvider) Protect(context.Context, *kv.Txn, *ptpb.Record
return errors.New("fake protectedts.Provider")
}

// TODO(asubiotto): Jobs don't play well with a weird node ID in a multitenant
// environment, so a node ID of 1 is used here to get tests to pass. Fixing
// this is tracked in https://github.com/cockroachdb/cockroach/issues/47892.
const fakeNodeID = roachpb.NodeID(1)

func testSQLServerArgs(ts *TestServer) sqlServerArgs {
stopper := ts.Stopper()
clusterName := ts.Cfg.ClusterName
Expand Down Expand Up @@ -544,10 +532,6 @@ func testSQLServerArgs(ts *TestServer) sqlServerArgs {

dummyRecorder := &status.MetricsRecorder{}

// TODO(asubiotto): Jobs don't play well with a weird node ID in a multitenant
// environment, so a node ID of 1 is used here to get tests to pass. Fixing
// this is tracked in https://github.com/cockroachdb/cockroach/issues/47892.
const fakeNodeID = roachpb.NodeID(1)
var c base.NodeIDContainer
c.Set(context.Background(), fakeNodeID)
const sqlInstanceID = base.SQLInstanceID(10001)
Expand All @@ -563,7 +547,7 @@ func testSQLServerArgs(ts *TestServer) sqlServerArgs {
rpcContext: rpcContext,
distSender: ds,
statusServer: noStatusServer,
nodeLiveness: allErrorsFakeLiveness{},
nodeLiveness: sqlbase.MakeOptionalNodeLiveness(nil),
gossip: gossip.MakeUnexposedGossip(g),
nodeDialer: nodeDialer,
grpcServer: dummyRPCServer,
Expand Down Expand Up @@ -620,7 +604,9 @@ func (ts *TestServer) StartTenant(params base.TestTenantArgs) (pgAddr string, _

// NB: this should no longer be necessary after #47902. Right now it keeps
// the tenant from crashing.
s.execCfg.DistSQLPlanner.SetNodeDesc(roachpb.NodeDescriptor{NodeID: -1})
//
// NB: this NodeID is actually used by the DistSQL planner.
s.execCfg.DistSQLPlanner.SetNodeDesc(roachpb.NodeDescriptor{NodeID: fakeNodeID})

connManager := netutil.MakeServer(
args.stopper,
Expand Down
12 changes: 2 additions & 10 deletions pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,6 @@ func mustParseOne(s string) parser.Statement {
return stmts[0]
}

type dummyLivenessProvider struct {
}

// IsLive implements the livenessProvider interface.
func (l dummyLivenessProvider) IsLive(roachpb.NodeID) (bool, error) {
return true, nil
}

// startConnExecutor start a goroutine running a connExecutor. This connExecutor
// is using a mocked KV that can't really do anything, so it can't run
// statements that need to "access the database". It can only execute things
Expand Down Expand Up @@ -282,8 +274,8 @@ func startConnExecutor(
nil, /* distSender */
gw,
stopper,
dummyLivenessProvider{}, /* liveness */
nil, /* nodeDialer */
func(roachpb.NodeID) (bool, error) { return true, nil }, // everybody is live
nil, /* nodeDialer */
),
QueryCache: querycache.New(0),
TestingKnobs: ExecutorTestingKnobs{},
Expand Down
Loading

0 comments on commit 6547ad6

Please sign in to comment.