Skip to content

Commit

Permalink
sql,stmtdiagnostics: remove some version gates
Browse files Browse the repository at this point in the history
This commit addresses several TODOs with my name on them about removing
the version gates, mostly around the conditional statement diagnostics
introduced in 22.1 cycle.

Release note: None
  • Loading branch information
yuzefovich committed May 19, 2022
1 parent 73e7263 commit 0770254
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 122 deletions.
62 changes: 15 additions & 47 deletions pkg/cli/clisqlclient/statement_diag.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package clisqlclient
import (
"context"
"database/sql/driver"
"fmt"
"io"
"os"
"time"
Expand Down Expand Up @@ -100,51 +99,22 @@ func StmtDiagListOutstandingRequests(
return result, nil
}

// TODO(yuzefovich): remove this in 22.2.
func isAtLeast22dot1ClusterVersion(ctx context.Context, conn Conn) (bool, error) {
// Check whether the upgrade to add the conditional diagnostics columns to
// the statement_diagnostics_requests system table has already been run.
row, err := conn.QueryRow(ctx, `
SELECT
count(*)
FROM
[SHOW COLUMNS FROM system.statement_diagnostics_requests]
WHERE
column_name = 'min_execution_latency';`)
if err != nil {
return false, err
}
c, ok := row[0].(int64)
if !ok {
return false, nil
}
return c == 1, nil
}

func stmtDiagListOutstandingRequestsInternal(
ctx context.Context, conn Conn,
) ([]StmtDiagActivationRequest, error) {
var extraColumns string
atLeast22dot1, err := isAtLeast22dot1ClusterVersion(ctx, conn)
if err != nil {
return nil, err
}
if atLeast22dot1 {
// Converting an INTERVAL to a number of milliseconds within that
// interval is a pain - we extract the number of seconds and multiply it
// by 1000, then we extract the number of milliseconds and add that up
// to the previous result; however, we have now double counted the
// seconds field, so we have to remove that times 1000.
getMilliseconds := `EXTRACT(epoch FROM min_execution_latency)::INT8 * 1000 +
// Converting an INTERVAL to a number of milliseconds within that interval
// is a pain - we extract the number of seconds and multiply it by 1000,
// then we extract the number of milliseconds and add that up to the
// previous result; however, we have now double counted the seconds field,
// so we have to remove that times 1000.
getMilliseconds := `EXTRACT(epoch FROM min_execution_latency)::INT8 * 1000 +
EXTRACT(millisecond FROM min_execution_latency)::INT8 -
EXTRACT(second FROM min_execution_latency)::INT8 * 1000`
extraColumns = ", " + getMilliseconds + ", expires_at"
}
rows, err := conn.Query(ctx,
fmt.Sprintf(`SELECT id, statement_fingerprint, requested_at%s
FROM system.statement_diagnostics_requests
WHERE NOT completed
ORDER BY requested_at DESC`, extraColumns),
"SELECT id, statement_fingerprint, requested_at, "+getMilliseconds+`, expires_at
FROM system.statement_diagnostics_requests
WHERE NOT completed
ORDER BY requested_at DESC`,
)
if err != nil {
return nil, err
Expand All @@ -159,13 +129,11 @@ func stmtDiagListOutstandingRequestsInternal(
}
var minExecutionLatency time.Duration
var expiresAt time.Time
if atLeast22dot1 {
if ms, ok := vals[3].(int64); ok {
minExecutionLatency = time.Millisecond * time.Duration(ms)
}
if e, ok := vals[4].(time.Time); ok {
expiresAt = e
}
if ms, ok := vals[3].(int64); ok {
minExecutionLatency = time.Millisecond * time.Duration(ms)
}
if e, ok := vals[4].(time.Time); ok {
expiresAt = e
}
info := StmtDiagActivationRequest{
ID: vals[0].(int64),
Expand Down
36 changes: 13 additions & 23 deletions pkg/server/statement_diagnostics_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ package server

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -131,27 +129,21 @@ func (s *statusServer) StatementDiagnosticsRequests(

var err error

// TODO(yuzefovich): remove this version gating in 22.2.
var extraColumns string
if s.admin.server.st.Version.IsActive(ctx, clusterversion.AlterSystemStmtDiagReqs) {
extraColumns = `,
min_execution_latency,
expires_at`
}

// TODO(davidh): Add pagination to this request.
it, err := s.internalExecutor.QueryIteratorEx(ctx, "stmt-diag-get-all", nil, /* txn */
sessiondata.InternalExecutorOverride{
User: username.RootUserName(),
},
fmt.Sprintf(`SELECT
`SELECT
id,
statement_fingerprint,
completed,
statement_diagnostics_id,
requested_at%s
requested_at,
min_execution_latency,
expires_at
FROM
system.statement_diagnostics_requests`, extraColumns))
system.statement_diagnostics_requests`)
if err != nil {
return nil, err
}
Expand All @@ -175,16 +167,14 @@ func (s *statusServer) StatementDiagnosticsRequests(
if requestedAt, ok := row[4].(*tree.DTimestampTZ); ok {
req.RequestedAt = requestedAt.Time
}
if extraColumns != "" {
if minExecutionLatency, ok := row[5].(*tree.DInterval); ok {
req.MinExecutionLatency = time.Duration(minExecutionLatency.Duration.Nanos())
}
if expiresAt, ok := row[6].(*tree.DTimestampTZ); ok {
req.ExpiresAt = expiresAt.Time
// Don't return already expired requests.
if req.ExpiresAt.Before(timeutil.Now()) {
continue
}
if minExecutionLatency, ok := row[5].(*tree.DInterval); ok {
req.MinExecutionLatency = time.Duration(minExecutionLatency.Duration.Nanos())
}
if expiresAt, ok := row[6].(*tree.DTimestampTZ); ok {
req.ExpiresAt = expiresAt.Time
// Don't return already expired requests.
if req.ExpiresAt.Before(timeutil.Now()) {
continue
}
}

Expand Down
1 change: 0 additions & 1 deletion pkg/sql/row/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/row",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/row/kv_batch_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package row
import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand All @@ -24,13 +23,11 @@ import (

// CanUseStreamer returns whether the kvstreamer.Streamer API should be used.
func CanUseStreamer(ctx context.Context, settings *cluster.Settings) bool {
// TODO(yuzefovich): remove the version gate in 22.2 cycle.
return settings.Version.IsActive(ctx, clusterversion.ScanWholeRows) &&
useStreamerEnabled.Get(&settings.SV)
return useStreamerEnabled.Get(&settings.SV)
}

// useStreamerEnabled determines whether the Streamer API should be used.
// TODO(yuzefovich): remove this in 22.2.
// TODO(yuzefovich): remove this in 23.1.
var useStreamerEnabled = settings.RegisterBoolSetting(
settings.TenantReadOnly,
"sql.distsql.use_streamer.enabled",
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/stmtdiagnostics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/kv",
"//pkg/roachpb",
Expand Down
59 changes: 14 additions & 45 deletions pkg/sql/stmtdiagnostics/statement_diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -195,11 +194,6 @@ func (r *Registry) poll(ctx context.Context) {
}
}

// TODO(yuzefovich): remove this in 22.2.
func (r *Registry) isMinExecutionLatencySupported(ctx context.Context) bool {
return r.st.Version.IsActive(ctx, clusterversion.AlterSystemStmtDiagReqs)
}

// RequestID is the ID of a diagnostics request, corresponding to the id
// column in statement_diagnostics_requests.
// A zero ID is invalid.
Expand Down Expand Up @@ -285,29 +279,19 @@ func (r *Registry) insertRequestInternal(
return 0, err
}

if !r.isMinExecutionLatencySupported(ctx) {
if minExecutionLatency != 0 || expiresAfter != 0 {
return 0, errors.New(
"conditional statement diagnostics are only supported " +
"after 22.1 version migrations have completed",
)
}
}

var reqID RequestID
var expiresAt time.Time
err = r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Check if there's already a pending request for this fingerprint.
var extraConditions string
if r.isMinExecutionLatencySupported(ctx) {
extraConditions = " AND (expires_at IS NULL OR expires_at > now())"
}
row, err := r.ie.QueryRowEx(ctx, "stmt-diag-check-pending", txn,
sessiondata.InternalExecutorOverride{
User: username.RootUserName(),
},
fmt.Sprintf("SELECT count(1) FROM system.statement_diagnostics_requests "+
"WHERE completed = false AND statement_fingerprint = $1%s", extraConditions),
`SELECT count(1) FROM system.statement_diagnostics_requests
WHERE
completed = false AND
statement_fingerprint = $1 AND
(expires_at IS NULL OR expires_at > now())`,
stmtFingerprint)
if err != nil {
return err
Expand Down Expand Up @@ -386,14 +370,6 @@ func (r *Registry) CancelRequest(ctx context.Context, requestID int64) error {
return err
}

if !r.isMinExecutionLatencySupported(ctx) {
// If conditional diagnostics are not supported for this cluster yet,
// then we cannot cancel the request.
return errors.New(
"statement diagnostics can only be canceled after 22.1 version migrations have completed",
)
}

row, err := r.ie.QueryRowEx(ctx, "stmt-diag-cancel-request", nil, /* txn */
sessiondata.InternalExecutorOverride{
User: username.RootUserName(),
Expand Down Expand Up @@ -636,25 +612,20 @@ func (r *Registry) InsertStatementDiagnostics(
// updates r.mu.requests accordingly.
func (r *Registry) pollRequests(ctx context.Context) error {
var rows []tree.Datums
isMinExecutionLatencySupported := r.isMinExecutionLatencySupported(ctx)
// Loop until we run the query without straddling an epoch increment.
for {
r.mu.Lock()
epoch := r.mu.epoch
r.mu.Unlock()

var extraColumns string
var extraConditions string
if isMinExecutionLatencySupported {
extraColumns = ", min_execution_latency, expires_at"
extraConditions = " AND (expires_at IS NULL OR expires_at > now())"
}
it, err := r.ie.QueryIteratorEx(ctx, "stmt-diag-poll", nil, /* txn */
sessiondata.InternalExecutorOverride{
User: username.RootUserName(),
},
fmt.Sprintf("SELECT id, statement_fingerprint%s FROM system.statement_diagnostics_requests "+
"WHERE completed = false%s", extraColumns, extraConditions))
`SELECT id, statement_fingerprint, min_execution_latency, expires_at
FROM system.statement_diagnostics_requests
WHERE completed = false AND (expires_at IS NULL OR expires_at > now())`,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -686,13 +657,11 @@ func (r *Registry) pollRequests(ctx context.Context) error {
stmtFingerprint := string(*row[1].(*tree.DString))
var minExecutionLatency time.Duration
var expiresAt time.Time
if isMinExecutionLatencySupported {
if minExecLatency, ok := row[2].(*tree.DInterval); ok {
minExecutionLatency = time.Duration(minExecLatency.Nanos())
}
if e, ok := row[3].(*tree.DTimestampTZ); ok {
expiresAt = e.Time
}
if minExecLatency, ok := row[2].(*tree.DInterval); ok {
minExecutionLatency = time.Duration(minExecLatency.Nanos())
}
if e, ok := row[3].(*tree.DTimestampTZ); ok {
expiresAt = e.Time
}
ids.Add(int(id))
r.addRequestInternalLocked(ctx, id, stmtFingerprint, minExecutionLatency, expiresAt)
Expand Down

0 comments on commit 0770254

Please sign in to comment.