-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sql,jobs: create SQL Stats Compaction Job and resumer #68434
Conversation
4b7ec28
to
8ebff66
Compare
ac53e65
to
2b8342d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
Reviewed 3 of 3 files at r1, 16 of 16 files at r2, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @Azhng)
-- commits, line 10 at r2 ([raw file](https://github.com/cockroachdb/cockroach/blob/2b8342d0df54f4634affec19a708aaa5fa425e53/-- commits#L10)):
nit: "barebones"
pkg/sql/compact_sql_stats.go, line 54 at r2 (raw file):
return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { if err != nil { if jobErr := r.Job.FractionProgressed(ctx, txn, func(ctx context.Context, _ jobspb.ProgressDetails) float32 {
Maybe a comment here, because it's a little unclear to me what this line is doing. Does the literal 0
in the callback mean anything, like are we checking that the job hasn't made any progress? Or are we just using FractionProgressed
for its error-checking capabilities, but we don't actually care about progress?
pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go, line 59 at r2 (raw file):
"sql.stats.persisted_rows.max", "maximum number of rows of statement and transaction"+ " statistics will be persisted in the system tables",
nit: "statistics that will be"
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 41 at r2 (raw file):
// StatsCompactorKnobs can be used to tune the behavior of // StatsCompactor. type StatsCompactorKnobs struct {
If these are just for testing, maybe call them StatsCompactorTestingKnobs
?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 69 at r2 (raw file):
} stmtStatsEntryCnt, txnStatsEntryCnt, err := c.getExistingStmtAndTxnStatsEntries(ctx)
Is "cnt" a common abbreviation for "count" around here? Maybe spell it out if not?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 87 at r2 (raw file):
stmt := "SELECT count(*) FROM system.statement_statistics" if c.TestingKnobs != nil && c.TestingKnobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.statement_statistics"
This is the same stmt
as above, did you mean to make something different happen here?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 103 at r2 (raw file):
stmtStatsEntryCnt = int64(tree.MustBeDInt(row[0])) // stmt = "SELECT count(*) FROM system.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp()"
Kill this comment, yeah?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 106 at r2 (raw file):
stmt = "SELECT count(*) FROM system.transaction_statistics" if c.TestingKnobs != nil && c.TestingKnobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.transaction_statistics"
Ditto, same stmt
as above.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 138 at r2 (raw file):
SELECT %[2]s FROM %[1]s ORDER BY aggregated_ts
Maybe an explicit ASC
here? I get nervous deleting things, lol.
pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go, line 53 at r2 (raw file):
// CheckExistingCompactionJob checks for existing SQL Stats Compaction job // that are either PAUCED, CANCELED, or RUNNING. If so, it returns a
PAUSED
pkg/sql/sqlstats/persistedsqlstats/compaction_test.go, line 54 at r2 (raw file):
stopper := stop.NewStopper() defer stopper.Stop(ctx)
Just curious, what does this stopper
stop? I don't see it used below. Does it look for magic stuff in the ctx
?
pkg/sql/sqlstats/persistedsqlstats/compaction_test.go, line 100 at r2 (raw file):
require.Equal(t, maxPersistedRowLimit, len(actualStmtFingerprints)) require.Equal(t, maxPersistedRowLimit, len(actualTxnFingerprints)) for fingerprintID := range actualTxnFingerprints {
super-nit: This code usually talks about statements, then transactions, so maybe move this for loop down?
pkg/util/jobutil/job_utils.go, line 28 at r1 (raw file):
// started earlier than this one. If job is nil, CheckRunningJobsHelper just // checks if there are any pending, running, or paused jobs. func CheckRunningJobsHelper(
All makes sense, since this is a pure extraction. I'm thinking about names and wondering if this might be called RunningJobsExist
?
pkg/util/jobutil/job_utils.go, line 30 at r1 (raw file):
func CheckRunningJobsHelper( ctx context.Context, job *jobs.Job,
Would it make sense to just pass the JobID
here? Or could it be integrated into the filter somehow?
pkg/util/jobutil/job_utils.go, line 33 at r1 (raw file):
ie sqlutil.InternalExecutor, txn *kv.Txn, payloadFilter func(payload *jobspb.Payload) bool,
One more naming thought, I think this is a "payloadPredicate"?
7801f9e
to
201d810
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @matthewtodd)
pkg/sql/compact_sql_stats.go, line 54 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Maybe a comment here, because it's a little unclear to me what this line is doing. Does the literal
0
in the callback mean anything, like are we checking that the job hasn't made any progress? Or are we just usingFractionProgressed
for its error-checking capabilities, but we don't actually care about progress?
Hmm I put it here because I saw the automatic create stats job updates the job progress in their resumer. On second thought, since as of now, the compaction job is relatively simple and runs pretty fast, I don't think we need to update the job progress just for the sake of doing so.
I removed it for now and we can reintroduce it once we start to actually implement the compaction logic.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 69 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Is "cnt" a common abbreviation for "count" around here? Maybe spell it out if not?
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 87 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
This is the same
stmt
as above, did you mean to make something different happen here?
Ah, the original stmt
should haveAS OF SYSTEM TIME
clause. Fixed .
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 103 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Kill this comment, yeah?
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 106 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Ditto, same
stmt
as above.
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 138 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Maybe an explicit
ASC
here? I get nervous deleting things, lol.
Ah sounds good 👍
pkg/sql/sqlstats/persistedsqlstats/compaction_test.go, line 54 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Just curious, what does this
stopper
stop? I don't see it used below. Does it look for magic stuff in thectx
?
Ah this is definitely redundant. Removed.
pkg/util/jobutil/job_utils.go, line 28 at r1 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
All makes sense, since this is a pure extraction. I'm thinking about names and wondering if this might be called
RunningJobsExist
?
Done.
pkg/util/jobutil/job_utils.go, line 30 at r1 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Would it make sense to just pass the
JobID
here? Or could it be integrated into the filter somehow?
The behavior of the function when job
is nil is different when the job
is not nil. If job
is nil, then this function just check if there are any other running job that matches the predicate. If the job
is not nil, the function checks if the provided job
's jobID is the earliest running job that's created.
Updated the comment to highlight that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 18 of 18 files at r3, 10 of 16 files at r4, 6 of 6 files at r5, all commit messages.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @Azhng)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job navigating the jobs boilerplate.
pkg/sql/compact_sql_stats.go
Outdated
) | ||
|
||
type sqlStatsCompactionResumer struct { | ||
*jobs.Job |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this embedding buying you anything?
if c.TestingKnobs == nil { | ||
c.TestingKnobs = &StatsCompactorTestingKnobs{} | ||
} | ||
c.TestingKnobs.DisableFollowerRead = disallowFollowerRead |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not often that you see non-testing code setting a TestingKnob
value. Is the idea that your test is going to verify that the boolean made it through to here? If so, use a callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reading more, I think it's that you're trying to tell the compactor to disallow follower reads. But, like, that knob is not going to change throughout the life of the cluster so this seems a little funky. Instead just inject the testing knobs in the appropriate places.
|
||
// Init initializes the StatsCompactor. This method needs to be called before | ||
// the DeleteOldestEntries method. | ||
func (c *StatsCompactor) Init( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Init
and not just NewStatsCompactor
so you don't need to stress about whether or not it has been initialized?
// [1]: table name | ||
// [2]: primary key | ||
// [3]: number of entries to remove | ||
stmt := ` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: const
) | ||
` | ||
|
||
err := c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can all be return c.db.Txn(ctx, ...
// on system tables that are yet to be created. This is not a scenario that is | ||
// possible in real life. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the "real life" commentary but perhaps "outside of testing" would be better.
pkg/jobs/jobspb/jobs.proto
Outdated
@@ -663,6 +663,14 @@ message MigrationProgress { | |||
|
|||
} | |||
|
|||
message SQLStatsCompactionDetails { | |||
bool disable_follower_read = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this in the job details? From my reading it only comes from a testing knob which should be shared by all of the instances of the job.
pkg/sql/compact_sql_stats.go
Outdated
if !ok { | ||
return errors.AssertionFailedf("invalid job detail payload for sql stats compaction") | ||
} | ||
r.compactionExecutor.Init(ie, db, jobDetails.DisableFollowerRead) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code uses both DisableFollowerReads
and disallowFollerReads
. Can we pick one?
pkg/util/jobutil/job_utils.go
Outdated
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package jobutil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know that this warrants its own package. jobs
already depends on all of this stuff. Seems fine to put this file in that package.
pkg/util/jobutil/job_utils.go
Outdated
// running, or paused jobs that matches payloadPredicate. | ||
func RunningJobExists( | ||
ctx context.Context, | ||
job *jobs.Job, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to just pass the job ID as opposed to the job struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner and @matthewtodd)
pkg/jobs/jobspb/jobs.proto, line 667 at r5 (raw file):
Previously, ajwerner wrote…
Do we really need this in the job details? From my reading it only comes from a testing knob which should be shared by all of the instances of the job.
Removed. Added package-level variable disableFollowerReadForTest
instead of passing it through protobuf.
pkg/sql/compact_sql_stats.go, line 25 at r5 (raw file):
Previously, ajwerner wrote…
Is this embedding buying you anything?
Not really. Removed the embedding.
pkg/sql/compact_sql_stats.go, line 52 at r5 (raw file):
Previously, ajwerner wrote…
The code uses both
DisableFollowerReads
anddisallowFollerReads
. Can we pick one?
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 46 at r5 (raw file):
Previously, ajwerner wrote…
I like the "real life" commentary but perhaps "outside of testing" would be better.
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 55 at r5 (raw file):
Previously, ajwerner wrote…
Why
Init
and not justNewStatsCompactor
so you don't need to stress about whether or not it has been initialized?
I was under the wrong impression that all the fields in the Resumer
need to be initialized before Resume()
is called.
Removed Init
and replace it with NewStatsCompactor
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 64 at r5 (raw file):
Previously, ajwerner wrote…
After reading more, I think it's that you're trying to tell the compactor to disallow follower reads. But, like, that knob is not going to change throughout the life of the cluster so this seems a little funky. Instead just inject the testing knobs in the appropriate places.
Replaced it with a package level private variable. Since other parts of the persistedsqlstats
will eventually start using follower read and having one package level private variable can just disable all the follower reads in the package.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 148 at r5 (raw file):
Previously, ajwerner wrote…
I think this can all be
return c.db.Txn(ctx, ...
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go, line 79 at r5 (raw file):
Previously, ajwerner wrote…
total nit: I might write this
if err == nil && exists { err = ErrConcurrentSQLStatsCompaction } return err </blockquote></details> Done. ___ *[pkg/sql/sqlstats/persistedsqlstats/compaction_test.go, line 68 at r5](https://reviewable.io/reviews/cockroachdb/cockroach/68434#-MgHHSTR10m5UZAUk3hE-r5-68:-MgucJck04OW4ep56Xu7:b2nhoia) ([raw file](https://github.com/cockroachdb/cockroach/blob/201d8109a5b5072c4897a7320e4372e61fbc584c/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go#L68)):* <details><summary><i>Previously, ajwerner wrote…</i></summary><blockquote> nit: you can pass the value to `Exec` like: ```go firstSQLConn.Exec(t, "SET CLUSTER SETTING sql.stats.persisted_rows.max = $1", maxPersistedRowLimit)
TIL
pkg/util/jobutil/job_utils.go, line 11 at r5 (raw file):
Previously, ajwerner wrote…
I don't know that this warrants its own package.
jobs
already depends on all of this stuff. Seems fine to put this file in that package.
Done.
pkg/util/jobutil/job_utils.go, line 32 at r5 (raw file):
Previously, ajwerner wrote…
Any reason not to just pass the job ID as opposed to the job struct?
Replaced with job ID.
pkg/util/jobutil/job_utils.go, line 57 at r5 (raw file):
Previously, ajwerner wrote…
I think you want more than just this. Reverting, PauseRequested, CancelRequested. See
jobs.NonTerminalStatusTupleString
.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 6 of 19 files at r6, 2 of 16 files at r7.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @Azhng, and @matthewtodd)
pkg/jobs/jobspb/jobs.proto, line 667 at r5 (raw file):
Previously, Azhng (Archer Zhang) wrote…
Removed. Added package-level variable
disableFollowerReadForTest
instead of passing it through protobuf.
huh, I don't think that's better. Don't you have access to testing knobs everywhere that you'd need it?
pkg/sql/create_stats.go, line 651 at r6 (raw file):
} return nil /* retErr */
you've removed retErr
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 34 at r7 (raw file):
ie sqlutil.InternalExecutor TestingKnobs *StatsCompactorTestingKnobs
this thing should get passed to NewStatsCompactor and then should exist in the top-level TestingKnobs
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 107 at r7 (raw file):
if row.Len() != 1 { return 0 /* stmtStatsEntryCount */, 0 /* txnStatsEntryCount */, errors.AssertionFailedf("unexpected number of column returned")
not that it matters but, if you're going to use AssertionFailedf
maybe report the number of columns?
pkg/jobs/utils.go, line 39 at r6 (raw file):
system.jobs WHERE status IN ($1, $2, $3, $4, $5, $6)
you can literally just do the following and lose the args
status IN ` + NonTerminalStatusTupleString +`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @Azhng, and @matthewtodd)
pkg/jobs/jobspb/jobs.proto, line 667 at r5 (raw file):
Previously, ajwerner wrote…
huh, I don't think that's better. Don't you have access to testing knobs everywhere that you'd need it?
Since the resumer
creates the StatsCompactor
, and the resumer
's constructor is registered in func init()
, I'm not entirely sure how I can inject testing knobs into the resumer? (Since it's privately scoped into the sql
package because it depends on JobExecContext
). Any ideas here?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 107 at r7 (raw file):
Previously, ajwerner wrote…
not that it matters but, if you're going to use
AssertionFailedf
maybe report the number of columns?
Done.
pkg/jobs/utils.go, line 39 at r6 (raw file):
Previously, ajwerner wrote…
you can literally just do the following and lose the args
status IN ` + NonTerminalStatusTupleString +`
For some reason I thought NonTerminalStatusTupleString
doesn't have StatusRunning
and StatusPaused
. 🤦♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 19 files at r6, 18 of 18 files at r8.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, and @Azhng)
pkg/sql/create_stats.go, line 635 at r8 (raw file):
// just checks if there are any pending, running, or paused CreateStats jobs. func checkRunningJobs(ctx context.Context, job *jobs.Job, p JobExecContext) error { var jobID jobspb.JobID
nit: s/var jobID jobspb.ID/jobID := jobspb.InvalidJobID
pkg/jobs/utils.go, line 25 at r8 (raw file):
// RunningJobExists checks that whether there are any other jobs (matched by // payloadPredicate callback) in the pending, running, or paused status that // started earlier than the job with provided jobID.
Drive by comment -- briefly looking at this code, it seems like this is intended to work even when the jobID does not refer to a real jobID. Am I reading this right? Could you expand on your comment to talk about the case where jobID is InvalidJobID as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, and @Azhng)
pkg/sql/create_stats.go, line 635 at r8 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
nit: s/var jobID jobspb.ID/jobID := jobspb.InvalidJobID
Done
pkg/jobs/utils.go, line 25 at r8 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
Drive by comment -- briefly looking at this code, it seems like this is intended to work even when the jobID does not refer to a real jobID. Am I reading this right? Could you expand on your comment to talk about the case where jobID is InvalidJobID as well?
Added additional explanation for the InvalidJobID
case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner and @arulajmani)
pkg/jobs/jobspb/jobs.proto, line 667 at r5 (raw file):
Previously, Azhng (Archer Zhang) wrote…
Since the
resumer
creates theStatsCompactor
, and theresumer
's constructor is registered infunc init()
, I'm not entirely sure how I can inject testing knobs into the resumer? (Since it's privately scoped into thesql
package because it depends onJobExecContext
). Any ideas here?
Nevermind, I can just use the testing knobs that is already in the ExecCfg(). Updated
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 34 at r7 (raw file):
Previously, ajwerner wrote…
this thing should get passed to NewStatsCompactor and then should exist in the top-level TestingKnobs
Done.
@ajwerner RFAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 18 files at r16, 3 of 19 files at r17, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, @Azhng, and @maryliag)
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 79 at r17 (raw file):
func (c *StatsCompactor) getExistingStmtAndTxnStatsCount( ctx context.Context, ) (stmtStatsEntryCount, txnStatsEntryCount int64, err error) {
nit: don't name this error, it can get you in trouble.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 80 at r17 (raw file):
ctx context.Context, ) (stmtStatsEntryCount, txnStatsEntryCount int64, err error) { getRowCount := func(ctx context.Context, opName string, stmt string) (int64, error) {
nit: you could pass in the pointer and simplify things a tad.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 106 at r17 (raw file):
Quoted 13 lines of code…
stmt := "SELECT count(*) FROM system.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()" if c.knobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.statement_statistics" } stmtStatsEntryCount, err = getRowCount(ctx, "scan-sql-stmt-stats-entries", stmt) if err != nil { return 0 /* stmtStatsEntryCount */, 0 /* txnStatsEntryCount */, err } stmt = "SELECT count(*) FROM system.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp()" if c.knobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.transaction_statistics" } txnStatsEntryCount, err = getRowCount(ctx, "scan-sql-txn-stats-entries", stmt)
nit: put these in separate blocks:
{
stmt := "SELECT count(*) FROM system.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()"
if c.knobs.DisableFollowerRead {
stmt = "SELECT count(*) FROM system.statement_statistics"
}
stmtStatsEntryCount, err = getRowCount(ctx, "scan-sql-stmt-stats-entries", stmt)
if err != nil {
return 0 /* stmtStatsEntryCount */, 0 /* txnStatsEntryCount */, err
}
}
{
stmt = "SELECT count(*) FROM system.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp()"
if c.knobs.DisableFollowerRead {
stmt = "SELECT count(*) FROM system.transaction_statistics"
}
txnStatsEntryCount, err = getRowCount(ctx, "scan-sql-txn-stats-entries", stmt)
if err != nil
}
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 113 at r17 (raw file):
} txnStatsEntryCount, err = getRowCount(ctx, "scan-sql-txn-stats-entries", stmt)
you weren't returning this error.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 118 at r17 (raw file):
} func (c *StatsCompactor) deleteOldestEntries(
I feel like the intention behind this pattern does not match reality. Namely, you're providing a number of rows to delete with the idea that you're going to not contend with foreground writes. The logic there would be sound if it weren't for the hash sharded index. The problem is that we're going to end up scanning the number of entries from each shard and we'll have to do a merge join to take the top k. I think what you want is to break this down and find the number of entries to delete on a per-shard basis. Somewhere you should export the shard count and then do this query for each shard specifying the shard column explicitly.
I encourage you to take a look at the query plan and see what the limit pushdown looks like.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 140 at r17 (raw file):
txn *kv.Txn, stmt string, curRowsCount int64) error {
nit: wrap this like:
removeRows := func(
ctx context.Context,
opName string,
txn *kv.Txn,
stmt string,
curRowsCount int64,
) error {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, and @maryliag)
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 79 at r17 (raw file):
Previously, ajwerner wrote…
nit: don't name this error, it can get you in trouble.
Done. Is it because of the concern for shadowing variables?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 80 at r17 (raw file):
Previously, ajwerner wrote…
nit: you could pass in the pointer and simplify things a tad.
Hmm I'm not exactly sure what the "pointer" is referring to here. Do you mind clarify?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 106 at r17 (raw file):
Previously, ajwerner wrote…
stmt := "SELECT count(*) FROM system.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()" if c.knobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.statement_statistics" } stmtStatsEntryCount, err = getRowCount(ctx, "scan-sql-stmt-stats-entries", stmt) if err != nil { return 0 /* stmtStatsEntryCount */, 0 /* txnStatsEntryCount */, err } stmt = "SELECT count(*) FROM system.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp()" if c.knobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.transaction_statistics" } txnStatsEntryCount, err = getRowCount(ctx, "scan-sql-txn-stats-entries", stmt)
nit: put these in separate blocks:
{ stmt := "SELECT count(*) FROM system.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()" if c.knobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.statement_statistics" } stmtStatsEntryCount, err = getRowCount(ctx, "scan-sql-stmt-stats-entries", stmt) if err != nil { return 0 /* stmtStatsEntryCount */, 0 /* txnStatsEntryCount */, err } } { stmt = "SELECT count(*) FROM system.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp()" if c.knobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.transaction_statistics" } txnStatsEntryCount, err = getRowCount(ctx, "scan-sql-txn-stats-entries", stmt) if err != nil } </blockquote></details> Done. ___ *[pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 113 at r17](https://reviewable.io/reviews/cockroachdb/cockroach/68434#-Mho9PBr8YDCXdTcbCfr:-MhoTvRf2MzXa2qPuKM3:b-896fix) ([raw file](https://github.com/cockroachdb/cockroach/blob/2397132670a7b41e242da5574080e344432680df/pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go#L113)):* <details><summary><i>Previously, ajwerner wrote…</i></summary><blockquote> you weren't returning this error. </blockquote></details> Done. ___ *[pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 118 at r17](https://reviewable.io/reviews/cockroachdb/cockroach/68434#-MhoBJPe9j7RjgV8e4Iu:-MhoTw4W-_S7xvxgG-ej:bkca99l) ([raw file](https://github.com/cockroachdb/cockroach/blob/2397132670a7b41e242da5574080e344432680df/pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go#L118)):* <details><summary><i>Previously, ajwerner wrote…</i></summary><blockquote> I feel like the intention behind this pattern does not match reality. Namely, you're providing a number of rows to delete with the idea that you're going to not contend with foreground writes. The logic there would be sound if it weren't for the hash sharded index. The problem is that we're going to end up scanning the number of entries from each shard and we'll have to do a merge join to take the top k. I think what you want is to break this down and find the number of entries to delete on a per-shard basis. Somewhere you should export the shard count and then do this query for each shard specifying the shard column explicitly. I encourage you to take a look at the query plan and see what the limit pushdown looks like. </blockquote></details> I see. That makes sense. Changed the code to clean up on per-shard basis. ___ *[pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 140 at r17](https://reviewable.io/reviews/cockroachdb/cockroach/68434#-MhoAgP36zmoJHM9BIjZ:-MhoZPgvEjA97uUH6A6Q:b-896fix) ([raw file](https://github.com/cockroachdb/cockroach/blob/2397132670a7b41e242da5574080e344432680df/pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go#L140)):* <details><summary><i>Previously, ajwerner wrote…</i></summary><blockquote> nit: wrap this like:
removeRows := func( ctx context.Context, opName string, txn *kv.Txn, stmt string, curRowsCount int64, ) error {
</blockquote></details> Done. <!-- Sent from Reviewable.io -->
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's more feedback. Some of it would eliminate the need for other parts of it (like if you broke out the logic per shard, then you definitely don't need the array).
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, @Azhng, and @maryliag)
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 79 at r17 (raw file):
Previously, Azhng (Archer Zhang) wrote…
Done. Is it because of the concern for shadowing variables?
Partially, also so that you can get unused variable
checking. Like consider the case below where you assigned to err
but then returned nil
. The linter didn't catch that.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 80 at r17 (raw file):
Previously, Azhng (Archer Zhang) wrote…
Hmm I'm not exactly sure what the "pointer" is referring to here. Do you mind clarify?
like, you could pass in a *int64
and only return an error from this function.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 99 at r18 (raw file):
ctx context.Context, opName, tableName, hashColumnName string, ) ([]int64, error) { rowCountPerShard := make([]int64, systemschema.SQLStatsHashShardBucketCount)
you could use arrays everywhere given this is a constant. Consider:
const shards = systemschema.SQLStatsHashShardBucketCount
func (c *StatsCompactor) getRowCountPerShard(
ctx context.Context, opName, tableName, hashColumnName string,
) ([shards]int64, error) {
and then pass these arrays everywhere.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 116 at r18 (raw file):
} stmt = fmt.Sprintf(stmt, tableName, hashColumnName)
nit: if you're going to be formatting this anyway, you could just add the follower read clause as another parameter.
stmt = `SELECT count(*) FROM %[1]s WHERE %[2]s = $1%[3]s`
followerReadClause := ` AS OF SYSTEM TIME follower_read_timestamp()`
if c.knobs.DisableFollowerRead {
followerReadClause = ""
}
stmt = fmt.Sprintf(stmt, tableName, hashColumnName, followerReadClause)
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 136 at r18 (raw file):
func (c *StatsCompactor) getExistingStmtAndTxnStatsCount( ctx context.Context, ) ([]int64, []int64, error) {
I'd name the counts, just _
for the error.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 188 at r18 (raw file):
} else { limitPerShard[shardIdx] = maxStatsEntryPerShard }
there's a nice trick here I learned once. You can just keep subtracting and dividing and it will all work out nicely:
limitPerShard := make([]int64, systemschema.SQLStatsHashShardBucketCount)
maxPersistedRows := SQLStatsMaxPersistedRows.Get(&c.st.SV)
for i := 0; i < shards; i++ {
limitsPerShard[i] = maxPersistedRows / (shards - i)
maxPersistedRows -= limitsPerShard[i]
}
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 195 at r18 (raw file):
func (c *StatsCompactor) deleteOldestEntries( ctx context.Context, stmtStatsRowCountPerShard, txnStatsRowCountPerShard []int64,
how crazy would it be to pick a timestamp below which you'll be deleting stuff as opposed to picking what amounts to an offset?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 242 at r18 (raw file):
} return c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
I'm anxious about the size of this transaction. Large transactions in cockroach are known for having a bad time. Particularly large deletes which might overlap with foreground traffic. I would:
- break this transaction out per shard. I think that'll also simplify some of the logic in this file.
- Put a limit on the number of rows we're willing to delete in a single transaction and then add a loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, and @maryliag)
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 99 at r18 (raw file):
Previously, ajwerner wrote…
you could use arrays everywhere given this is a constant. Consider:
const shards = systemschema.SQLStatsHashShardBucketCount func (c *StatsCompactor) getRowCountPerShard( ctx context.Context, opName, tableName, hashColumnName string, ) ([shards]int64, error) {and then pass these arrays everywhere.
Opted to refactor to implement the logic per shard.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 116 at r18 (raw file):
Previously, ajwerner wrote…
nit: if you're going to be formatting this anyway, you could just add the follower read clause as another parameter.
stmt = `SELECT count(*) FROM %[1]s WHERE %[2]s = $1%[3]s` followerReadClause := ` AS OF SYSTEM TIME follower_read_timestamp()` if c.knobs.DisableFollowerRead { followerReadClause = "" } stmt = fmt.Sprintf(stmt, tableName, hashColumnName, followerReadClause)
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 136 at r18 (raw file):
Previously, ajwerner wrote…
I'd name the counts, just
_
for the error.
Opted to implement the logic per shard.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 188 at r18 (raw file):
Previously, ajwerner wrote…
there's a nice trick here I learned once. You can just keep subtracting and dividing and it will all work out nicely:
limitPerShard := make([]int64, systemschema.SQLStatsHashShardBucketCount) maxPersistedRows := SQLStatsMaxPersistedRows.Get(&c.st.SV) for i := 0; i < shards; i++ { limitsPerShard[i] = maxPersistedRows / (shards - i) maxPersistedRows -= limitsPerShard[i] }
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 195 at r18 (raw file):
Previously, ajwerner wrote…
how crazy would it be to pick a timestamp below which you'll be deleting stuff as opposed to picking what amounts to an offset?
I don't quite follow here. Are we talking about the timestamp of the transaction ?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 242 at r18 (raw file):
Previously, ajwerner wrote…
I'm anxious about the size of this transaction. Large transactions in cockroach are known for having a bad time. Particularly large deletes which might overlap with foreground traffic. I would:
- break this transaction out per shard. I think that'll also simplify some of the logic in this file.
- Put a limit on the number of rows we're willing to delete in a single transaction and then add a loop.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, @Azhng, and @maryliag)
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 195 at r18 (raw file):
Previously, Azhng (Archer Zhang) wrote…
I don't quite follow here. Are we talking about the timestamp of the transaction ?
It's not important; what you have here is fine.
What I was thinking is that right now you go and find the number of rows in the shard and then you find the number you think should be in the shard and you delete until you think you've deleted enough. Another approach would be to find the aggregated_ts
of the newest row you want to delete and use that drive the deletions here.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 33 at r19 (raw file):
const maxDeleteRowsPerTxn = 128 // StatsCompactor is responsible for compacting older SQL Stats. It is
my last nit is that this thing doesn't really compact, it just deletes. Is compaction future work?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 89 at r19 (raw file):
deleteOldStatsStmt := c.getStatementForDeletingStaleRows(tableName, pkColumnNames, hashColumnName) for shardIdx := int64(0); shardIdx < systemschema.SQLStatsHashShardBucketCount; shardIdx++ {
nit: for shard, limit := range limitPerShard
, I don't think that the int64
nature of the shard is important for your function signatures.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 169 at r19 (raw file):
} if err := c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { _, err := c.ie.ExecEx(ctx,
fun fact, if you pass a nil
for the txn
the internal executor will deal with the transaction management for you. I encourage you to do that here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @ajwerner, @arulajmani, and @maryliag)
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 195 at r18 (raw file):
Previously, ajwerner wrote…
It's not important; what you have here is fine.
What I was thinking is that right now you go and find the number of rows in the shard and then you find the number you think should be in the shard and you delete until you think you've deleted enough. Another approach would be to find the
aggregated_ts
of the newest row you want to delete and use that drive the deletions here.
I see. Interesting 🤔 I suppose we can try out that approach if the current approach becomes problematic.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 33 at r19 (raw file):
Previously, ajwerner wrote…
my last nit is that this thing doesn't really compact, it just deletes. Is compaction future work?
Yes, compaction is future work.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 89 at r19 (raw file):
Previously, ajwerner wrote…
nit: for
shard, limit := range limitPerShard
, I don't think that theint64
nature of the shard is important for your function signatures.
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 169 at r19 (raw file):
Previously, ajwerner wrote…
fun fact, if you pass a
nil
for thetxn
the internal executor will deal with the transaction management for you. I encourage you to do that here.
Done.
c2b6973
to
9702ece
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 20 of 20 files at r23, all commit messages.
Reviewable status: complete! 1 of 0 LGTMs obtained (and 2 stale) (waiting on @ajwerner and @arulajmani)
TFTR! bors r=ajwerner,maryliag |
Merge conflict. |
Release note: None
This commit introduces the SQL Stats Compaction job type and a barebones implementation of the SQL Stats compaction. Release justification: category 4 Release note: None
bors r=ajwerner,maryliag |
Build succeeded: |
68401: sql: create sql stats compaction scheduled job r=miretskiy,maryliag a=Azhng Depends on #68434, #69346 sql: create sql stats compaction scheduled job This commit introduces SQL Stats Compaction Scheduled job, where it runs in an hourly schedule and invokes the SQL Stats Compaction Job that was created in the previous commit. This commit also introduces `crdb_internal.schedule_sql_stats_compaction()` builtin as a way to manually create SQL Stats compaction schedule. `SHOW SCHEDULES` command'syntax is also extended to support `SHOW SCHEDUELS FOR SQL STATISTICS`. Release justification: category 4 Release note (sql change): introduce crdb_internal.schedule_sql_stats_compaction() to manually create SQL Stats compaction schedule. Extend SHOW SCHEDULES command to support SHOW SCHEDULES FOR SQL STATISTICS. Co-authored-by: Azhng <archer.xn@gmail.com>
69046: kv: return error from checkNegotiateAndSendPreconditions r=nvanbenschoten a=nvanbenschoten Instead of panicking. Discussed in #68969. Release justification: change to new functionality 69273: sql: crdb_internal.reset_sql_stats() now resets persisted SQL Stats r=maryliag a=Azhng Depends on #68401 and #68434 Previously, crdb_internal.reset_sql_stats() builtin only resets cluster-wide in-memory sql stats. This patch updated the builtin to be able to reset persisted sql stats as well. Release justification: category 4 Release note (sql change): crdb_internal.reset_sql_stats() now resets persisted SQL Stats. Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com> Co-authored-by: Azhng <archer.xn@gmail.com>
Previous PR #67090
Followed by #68401
First Commit
sql,util: refactor checking for running jobs to its own util package
Release note: None
Second Commit
sql,jobs: create SQL Stats Compaction Job and resumer
This commit introduces the SQL Stats Compaction job type
and a barebones implementation of the SQL Stats compaction.
Release justification: category 4
Release note: None