Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
83303: sql/seqexpr: Remove dep from seqexpr to builtin r=Xiang-Gu a=Xiang-Gu

`seqexpr` package is a low level package that manipulates
sequence expressions and we do not want it to depend on
the "heavy" package `builtins`.

To break this dependency, we had two commits for this PR:

1. Created a new package `builtinconstants` and moved 
    all constants used in the builtin package there.

2. We plumbed in the only function call from the `builtins`
    package in `seqexpr` to remove the dependency from
    seqexpr to builtins. Finally, we added a bazel rule in
    seqexpr/BUILD.bazel to disallow dependency to builtins

Release note: None

83451: sql: rename `oldest_query_start` column in sessions table r=xinhaoz a=xinhaoz

Closes #80676

Previously, the `oldest_query_start` column in the cluster
and node sessions tables was misleadingly named. The column
implies that it contains the time at which the oldest query
in a session was started. This column is actually the time
at which the session's currently active query started.

This commit renames `oldest_query_start` to `active_query_start`
to more accurately represent the column data.

Release note (sql change): `oldest_query_start` in the
`crdb_internal.cluster_sessions` and `crdb_internal.node_sessions`
has been renamed to `active_query_start`, as this column contains
the time at which the currently active query was started, not the
time at which the session's first query was started.

83517: sql/schemachanger/scexec: remove debug log line r=ajwerner a=ajwerner

This just merged by mistake.

Release note: None

83562: rowexec: fix recent bug of using nil context r=yuzefovich a=yuzefovich

In e7e724e we moved the creation of
a monitor for the streamer's disk usage into the constructor of the join
reader but forgot to update the context used during that operation. The
thing is that the context on the processors is only set in `Start`
meaning it is `nil` in the construct of the processor. This commit fixes
the issue.

Fixes: #83367

Release note: None

83594: kvstreamer: add more observability r=yuzefovich a=yuzefovich

This commit adds the following statistics about the streamer:
- number of Enqueue calls
- number of enqueued requests
- number of single-range enqueue requests
- number of issued BatchRequests (which is also exposed on EXPLAIN ANALYZE)
- number of resume BatchRequests
- number of resume single-range requests
- number of spilled results
- number of empty batch responses
- number of dropped batch responses
- the final average response size.

This information provides more visibility into how the streamer behaved.
It is only added to the verbose logs / tracing since these are pretty
low level details that would only pollute the output of EXPLAIN or
something like that.

Fixes: #82156.

Release note: None

83602: sql/syntheticprivilege: remove BUILD.bazel cruft r=ajwerner a=ajwerner

This was added as part of the rename.

Release note: None

Co-authored-by: Xiang Gu <xiang@cockroachlabs.com>
Co-authored-by: Xin Hao Zhang <xzhang@cockroachlabs.com>
Co-authored-by: Andrew Werner <awerner32@gmail.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
5 people committed Jun 30, 2022
7 parents 0019bbd + ba1901c + bfcebc9 + 063df01 + 86b54f5 + 7f6a0f5 + 2aa047a commit e461d15
Show file tree
Hide file tree
Showing 44 changed files with 550 additions and 405 deletions.
2 changes: 1 addition & 1 deletion pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ ALL_TESTS = [
"//pkg/sql/catalog/resolver:resolver_test",
"//pkg/sql/catalog/schemadesc:schemadesc_test",
"//pkg/sql/catalog/schemaexpr:schemaexpr_test",
"//pkg/sql/catalog/seqexpr:seqexpr_disallowed_imports_test",
"//pkg/sql/catalog/seqexpr:seqexpr_test",
"//pkg/sql/catalog/systemschema_test:systemschema_test_test",
"//pkg/sql/catalog/tabledesc:tabledesc_test",
Expand Down Expand Up @@ -422,7 +423,6 @@ ALL_TESTS = [
"//pkg/sql/stats:stats_test",
"//pkg/sql/stmtdiagnostics:stmtdiagnostics_test",
"//pkg/sql/syntheticprivilege:syntheticprivilege_test",
"//pkg/sql/syntheticprivilege:systemprivilege_test",
"//pkg/sql/tests:tests_test",
"//pkg/sql/ttl/ttljob:ttljob_test",
"//pkg/sql/types:types_disallowed_imports_test",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,12 @@ id node_id session_id start txn_string application_name num_stmts num_ret
query ITTTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0
----
node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start active_query_start kv_txn alloc_bytes max_alloc_bytes status session_end

query ITTTTTTTTTTTTTT colnames
SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0
----
node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end
node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start active_query_start kv_txn alloc_bytes max_alloc_bytes status session_end

query IIITTTI colnames
SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ go_library(
"//pkg/util/admission",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/stop",
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvclient/kvstreamer/results_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ type resultsBuffer interface {
// It is assumed that the budget's mutex is already being held.
spill(_ context.Context, atLeastBytes int64, spillingPriority int) (bool, error)

// numSpilledResults returns the number of Results that have been spilled to
// disk by the resultsBuffer so far.
numSpilledResults() int

// error returns the first error encountered by the buffer.
error() error

Expand Down Expand Up @@ -269,6 +273,10 @@ func (b *outOfOrderResultsBuffer) spill(context.Context, int64, int) (bool, erro
return false, nil
}

func (b *outOfOrderResultsBuffer) numSpilledResults() int {
return 0
}

func (b *outOfOrderResultsBuffer) close(context.Context) {
// Note that only the client's goroutine can be blocked waiting for the
// results, and close() is called only by the same goroutine, so signaling
Expand Down Expand Up @@ -327,6 +335,9 @@ type inOrderResultsBuffer struct {

diskBuffer ResultDiskBuffer

// numSpilled tracks how many Results have been spilled to disk so far.
numSpilled int

// addCounter tracks the number of times add() has been called. See
// inOrderBufferedResult.addEpoch for why this is needed.
addCounter int
Expand Down Expand Up @@ -569,6 +580,7 @@ func (b *inOrderResultsBuffer) spill(
return false, err
}
r.spill(diskResultID)
b.numSpilled++
b.budget.releaseLocked(ctx, r.memoryTok.toRelease)
atLeastBytes -= r.memoryTok.toRelease
if atLeastBytes <= 0 {
Expand All @@ -582,6 +594,12 @@ func (b *inOrderResultsBuffer) spill(
return false, nil
}

func (b *inOrderResultsBuffer) numSpilledResults() int {
b.Lock()
defer b.Unlock()
return b.numSpilled
}

func (b *inOrderResultsBuffer) close(ctx context.Context) {
b.Lock()
defer b.Unlock()
Expand Down
71 changes: 68 additions & 3 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -226,9 +228,7 @@ type Streamer struct {
maxKeysPerRow int32
budget *budget

atomics struct {
batchRequestsIssued *int64
}
streamerStatistics

coordinator workerCoordinator
coordinatorStarted bool
Expand Down Expand Up @@ -276,6 +276,37 @@ type Streamer struct {
}
}

type streamerStatistics struct {
atomics struct {
batchRequestsIssued *int64
// resumeBatchRequests tracks the number of BatchRequests created for
// the ResumeSpans throughout the lifetime of the Streamer.
resumeBatchRequests int64
// resumeSingleRangeRequests tracks the number of single-range requests
// that were created for the ResumeSpans throughout the lifetime of the
// Streamer.
resumeSingleRangeRequests int64
// emptyBatchResponses tracks the number of BatchRequests that resulted
// in empty BatchResponses because they were issued with too low value
// of TargetBytes parameter.
emptyBatchResponses int64
// droppedBatchResponses tracks the number of the received
// BatchResponses that were dropped because the memory reservation
// during the budget reconciliation was denied (i.e. the original
// estimate was too low, and the budget has been used up by the time
// response came).
droppedBatchResponses int64
}
// enqueueCalls tracks the number of times Enqueue() has been called.
enqueueCalls int
// enqueuedRequests tracks the number of possibly-multi-range requests that
// have been Enqueue()'d into the Streamer.
enqueuedRequests int
// enqueuedSingleRangeRequests tracks the number of single-range
// sub-requests that were created during the truncation process in Enqueue()
enqueuedSingleRangeRequests int
}

// streamerConcurrencyLimit is an upper bound on the number of asynchronous
// requests that a single Streamer can have in flight. The default value for
// this setting is chosen arbitrarily as 1/8th of the default value for the
Expand Down Expand Up @@ -434,6 +465,9 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
return err
}

s.enqueueCalls++
s.enqueuedRequests += len(reqs)

// The minimal key range encompassing all requests contained within.
// Local addressing has already been resolved.
rs, err := keys.Range(reqs)
Expand Down Expand Up @@ -542,6 +576,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
}

requestsToServe = append(requestsToServe, r)
s.enqueuedSingleRangeRequests += len(singleRangeReqs)

// Determine next seek key, taking potentially sparse requests into
// consideration.
Expand Down Expand Up @@ -625,6 +660,7 @@ func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) {
// other calls on s are allowed after this.
func (s *Streamer) Close(ctx context.Context) {
if s.coordinatorStarted {
s.coordinator.logStatistics(ctx)
s.coordinatorCtxCancel()
s.mu.Lock()
s.mu.done = true
Expand Down Expand Up @@ -742,6 +778,31 @@ func (w *workerCoordinator) mainLoop(ctx context.Context) {
}
}

// logStatistics logs some of the statistics about the Streamer. It should be
// called at the end of the Streamer's lifecycle.
// TODO(yuzefovich): at the moment, these statistics will be attached to the
// tracing span of the Streamer's user. Some time has been spent to figure it
// out but led to no success. This should be cleaned up.
func (w *workerCoordinator) logStatistics(ctx context.Context) {
avgResponseSize, _ := w.getAvgResponseSize()
log.VEventf(
ctx, 1,
"enqueueCalls=%d enqueuedRequests=%d enqueuedSingleRangeRequests=%d "+
"batchRequestsIssued=%d resumeBatchRequests=%d resumeSingleRangeRequests=%d "+
"numSpilledResults=%d emptyBatchResponses=%d droppedBatchResponses=%d avgResponseSize=%s",
w.s.enqueueCalls,
w.s.enqueuedRequests,
w.s.enqueuedSingleRangeRequests,
atomic.LoadInt64(w.s.atomics.batchRequestsIssued),
atomic.LoadInt64(&w.s.atomics.resumeBatchRequests),
atomic.LoadInt64(&w.s.atomics.resumeSingleRangeRequests),
w.s.results.numSpilledResults(),
atomic.LoadInt64(&w.s.atomics.emptyBatchResponses),
atomic.LoadInt64(&w.s.atomics.droppedBatchResponses),
humanizeutil.IBytes(avgResponseSize),
)
}

// waitForRequests blocks until there is at least one request to be served.
func (w *workerCoordinator) waitForRequests(ctx context.Context) error {
w.s.requestsToServe.Lock()
Expand Down Expand Up @@ -1129,6 +1190,7 @@ func (w *workerCoordinator) performRequestAsync(
// but not enough for that large row).
toConsume := -overaccountedTotal
if err := w.s.budget.consume(ctx, toConsume, headOfLine /* allowDebt */); err != nil {
atomic.AddInt64(&w.s.atomics.droppedBatchResponses, 1)
w.s.budget.release(ctx, targetBytes)
if !headOfLine {
// Since this is not the head of the line, we'll just
Expand Down Expand Up @@ -1413,6 +1475,7 @@ func (w *workerCoordinator) processSingleRangeResults(
)
} else {
// We received an empty response.
atomic.AddInt64(&w.s.atomics.emptyBatchResponses, 1)
if req.minTargetBytes != 0 {
// We previously have already received an empty response for this
// request, and minTargetBytes wasn't sufficient. Make sure that
Expand Down Expand Up @@ -1444,6 +1507,8 @@ func (w *workerCoordinator) processSingleRangeResults(
req.reqs[i] = roachpb.RequestUnion{}
}
w.s.requestsToServe.add(resumeReq)
atomic.AddInt64(&w.s.atomics.resumeBatchRequests, 1)
atomic.AddInt64(&w.s.atomics.resumeSingleRangeRequests, int64(numIncompleteRequests))
}

return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ go_library(
"//pkg/sql/scrub",
"//pkg/sql/sem/asof",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/builtins/builtinconstants",
"//pkg/sql/sem/cast",
"//pkg/sql/sem/catconstants",
"//pkg/sql/sem/catid",
Expand Down
13 changes: 11 additions & 2 deletions pkg/sql/catalog/seqexpr/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//pkg/testutils/buildutil:buildutil.bzl", "disallowed_imports_test")

go_library(
name = "seqexpr",
Expand All @@ -8,7 +9,7 @@ go_library(
deps = [
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/builtins/builtinconstants",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
],
Expand All @@ -17,10 +18,18 @@ go_library(
go_test(
name = "seqexpr_test",
srcs = ["sequence_test.go"],
embed = [":seqexpr"],
deps = [
":seqexpr",
"//pkg/sql/parser",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
],
)

disallowed_imports_test(
"seqexpr",
[
"//pkg/sql/sem/builtins",
],
)
30 changes: 22 additions & 8 deletions pkg/sql/catalog/seqexpr/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
)
Expand All @@ -42,7 +42,12 @@ func (si *SeqIdentifier) IsByID() bool {
// takes a sequence identifier as an arg (a sequence identifier can either be
// a sequence name or an ID), wrapped in the SeqIdentifier type.
// Returns the identifier of the sequence or nil if no sequence was found.
func GetSequenceFromFunc(funcExpr *tree.FuncExpr) (*SeqIdentifier, error) {
//
// `getBuiltinProperties` argument is commonly builtins.GetBuiltinProperties.
func GetSequenceFromFunc(
funcExpr *tree.FuncExpr,
getBuiltinProperties func(name string) (*tree.FunctionProperties, []tree.Overload),
) (*SeqIdentifier, error) {

// Resolve doesn't use the searchPath for resolving FunctionDefinitions
// so we can pass in an empty SearchPath.
Expand All @@ -51,7 +56,7 @@ func GetSequenceFromFunc(funcExpr *tree.FuncExpr) (*SeqIdentifier, error) {
return nil, err
}

fnProps, overloads := builtins.GetBuiltinProperties(def.Name)
fnProps, overloads := getBuiltinProperties(def.Name)
if fnProps != nil && fnProps.HasSequenceArguments {
found := false
for _, overload := range overloads {
Expand All @@ -68,7 +73,7 @@ func GetSequenceFromFunc(funcExpr *tree.FuncExpr) (*SeqIdentifier, error) {
for i := 0; i < overload.Types.Length(); i++ {
// Find the sequence name arg.
argName := argTypes[i].Name
if argName == builtins.SequenceNameArg {
if argName == builtinconstants.SequenceNameArg {
arg := funcExpr.Exprs[i]
if seqIdentifier := getSequenceIdentifier(arg); seqIdentifier != nil {
return seqIdentifier, nil
Expand Down Expand Up @@ -125,14 +130,19 @@ func getSequenceIdentifier(expr tree.Expr) *SeqIdentifier {
// a call to sequence function in the given expression or nil if no sequence
// identifiers are found. The identifier is wrapped in a SeqIdentifier.
// e.g. nextval('foo') => "foo"; nextval(123::regclass) => 123; <some other expression> => nil
func GetUsedSequences(defaultExpr tree.Expr) ([]SeqIdentifier, error) {
//
// `getBuiltinProperties` argument is commonly builtins.GetBuiltinProperties.
func GetUsedSequences(
defaultExpr tree.Expr,
getBuiltinProperties func(name string) (*tree.FunctionProperties, []tree.Overload),
) ([]SeqIdentifier, error) {
var seqIdentifiers []SeqIdentifier
_, err := tree.SimpleVisit(
defaultExpr,
func(expr tree.Expr) (recurse bool, newExpr tree.Expr, err error) {
switch t := expr.(type) {
case *tree.FuncExpr:
identifier, err := GetSequenceFromFunc(t)
identifier, err := GetSequenceFromFunc(t, getBuiltinProperties)
if err != nil {
return false, nil, err
}
Expand All @@ -152,13 +162,17 @@ func GetUsedSequences(defaultExpr tree.Expr) ([]SeqIdentifier, error) {
// ReplaceSequenceNamesWithIDs walks the given expression, and replaces
// any sequence names in the expression by their IDs instead.
// e.g. nextval('foo') => nextval(123::regclass)
//
// `getBuiltinProperties` argument is commonly builtins.GetBuiltinProperties.
func ReplaceSequenceNamesWithIDs(
defaultExpr tree.Expr, nameToID map[string]int64,
defaultExpr tree.Expr,
nameToID map[string]int64,
getBuiltinProperties func(name string) (*tree.FunctionProperties, []tree.Overload),
) (tree.Expr, error) {
replaceFn := func(expr tree.Expr) (recurse bool, newExpr tree.Expr, err error) {
switch t := expr.(type) {
case *tree.FuncExpr:
identifier, err := GetSequenceFromFunc(t)
identifier, err := GetSequenceFromFunc(t, getBuiltinProperties)
if err != nil {
return false, nil, err
}
Expand Down
Loading

0 comments on commit e461d15

Please sign in to comment.