diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 3c5d20b8f0ee..4386892176d4 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -625,7 +625,7 @@ func (s *Server) SetupConn( ex := s.newConnExecutor( ctx, sdMutIterator, stmtBuf, clientComm, memMetrics, &s.Metrics, - s.sqlStats.GetWriterForApplication(sd.ApplicationName), + s.sqlStats.GetApplicationStats(sd.ApplicationName), ) return ConnectionHandler{ex}, nil } @@ -733,7 +733,7 @@ func (s *Server) newConnExecutor( clientComm ClientComm, memMetrics MemoryMetrics, srvMetrics *Metrics, - statsWriter sqlstats.Writer, + applicationStats sqlstats.ApplicationStats, ) *connExecutor { // Create the various monitors. // The session monitors are started in activate(). @@ -811,11 +811,11 @@ func (s *Server) newConnExecutor( } ex.applicationName.Store(ex.sessionData().ApplicationName) - ex.statsWriter = statsWriter - ex.statsCollector = sslocal.NewStatsCollector(statsWriter, ex.phaseTimes) + ex.applicationStats = applicationStats + ex.statsCollector = sslocal.NewStatsCollector(applicationStats, ex.phaseTimes) ex.dataMutatorIterator.onApplicationNameChange = func(newName string) { ex.applicationName.Store(newName) - ex.statsWriter = ex.server.sqlStats.GetWriterForApplication(newName) + ex.applicationStats = ex.server.sqlStats.GetApplicationStats(newName) } ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionInit, timeutil.Now()) @@ -866,7 +866,7 @@ func (s *Server) newConnExecutorWithTxn( srvMetrics *Metrics, txn *kv.Txn, syntheticDescs []catalog.Descriptor, - statsWriter sqlstats.Writer, + applicationStats sqlstats.ApplicationStats, ) *connExecutor { ex := s.newConnExecutor( ctx, @@ -875,7 +875,7 @@ func (s *Server) newConnExecutorWithTxn( clientComm, memMetrics, srvMetrics, - statsWriter, + applicationStats, ) if txn.Type() == kv.LeafTxn { // If the txn is a leaf txn it is not allowed to perform mutations. For @@ -1274,10 +1274,10 @@ type connExecutor struct { // executor. dataMutatorIterator *sessionDataMutatorIterator - // statsWriter is a writer interface for recording per-application SQL usage - // statistics. It is maintained to represent statistics for the application - // currently identified by sessiondata.ApplicationName. - statsWriter sqlstats.Writer + // applicationStats records per-application SQL usage statistics. It is + // maintained to represent statistics for the application currently identified + // by sessiondata.ApplicationName. + applicationStats sqlstats.ApplicationStats // statsCollector is used to collect statistics about SQL statements and // transactions. @@ -2169,7 +2169,7 @@ func (ex *connExecutor) execCopyIn( // state machine, but the copyMachine manages its own transactions without // going through the state machine. ex.state.sqlTimestamp = txnTS - ex.statsCollector.Reset(ex.statsWriter, ex.phaseTimes) + ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) ex.initPlanner(ctx, p) ex.resetPlanner(ctx, p, txn, stmtTS) } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index a2319a8417a4..33bad959e8cb 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -91,7 +91,7 @@ func (ex *connExecutor) execStmt( // Run observer statements in a separate code path; their execution does not // depend on the current transaction state. if _, ok := ast.(tree.ObserverStatement); ok { - ex.statsCollector.Reset(ex.statsWriter, ex.phaseTimes) + ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) err := ex.runObserverStatement(ctx, ast, res) // Note that regardless of res.Err(), these observer statements don't // generate error events; transactions are always allowed to continue. @@ -363,7 +363,7 @@ func (ex *connExecutor) execStmtInOpenState( p := &ex.planner stmtTS := ex.server.cfg.Clock.PhysicalTime() - ex.statsCollector.Reset(ex.statsWriter, ex.phaseTimes) + ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS) p.sessionDataMutatorIterator.paramStatusUpdater = res p.noticeSender = res @@ -1425,7 +1425,7 @@ func (ex *connExecutor) beginTransactionTimestampsAndReadMode( rwMode = ex.readWriteModeWithSessionDefault(modes.ReadWriteMode) return rwMode, now, nil, nil } - ex.statsCollector.Reset(ex.statsWriter, ex.phaseTimes) + ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) p := &ex.planner // NB: this use of p.txn is totally bogus. The planner's txn should diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index 11f9c18e104e..a2955e41d07a 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -161,7 +161,7 @@ func (ex *connExecutor) prepare( var flags planFlags prepare := func(ctx context.Context, txn *kv.Txn) (err error) { - ex.statsCollector.Reset(ex.statsWriter, ex.phaseTimes) + ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) p := &ex.planner if origin != PreparedStatementOriginSQL { // If the PREPARE command was issued as a SQL statement, then we diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index bf3e19dab08d..24e4bb7fcb77 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -173,7 +173,7 @@ func (ie *InternalExecutor) initConnEx( // If this is already an "internal app", don't put more prefix. appStatsBucketName = sd.ApplicationName } - statsWriter := ie.s.sqlStats.GetWriterForApplication(appStatsBucketName) + applicationStats := ie.s.sqlStats.GetApplicationStats(appStatsBucketName) sds := sessiondata.NewStack(sd) sdMutIterator := ie.s.makeSessionDataMutatorIterator(sds, nil /* sessionDefaults */) @@ -186,7 +186,7 @@ func (ie *InternalExecutor) initConnEx( clientComm, ie.memMetrics, &ie.s.InternalMetrics, - statsWriter, + applicationStats, ) } else { ex = ie.s.newConnExecutorWithTxn( @@ -199,7 +199,7 @@ func (ie *InternalExecutor) initConnEx( &ie.s.InternalMetrics, txn, ie.syntheticDescriptors, - statsWriter, + applicationStats, ) } diff --git a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel index f4508c6e0dba..427d6f9f9816 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel +++ b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel @@ -5,6 +5,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "persistedsqlstats", srcs = [ + "appStats.go", "cluster_settings.go", "combined_iterator.go", "compaction_exec.go", @@ -16,7 +17,6 @@ go_library( "scheduled_job_monitor.go", "stmt_reader.go", "txn_reader.go", - "writer.go", ], embed = [":persistedsqlstats_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats", @@ -34,7 +34,6 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/catalog/systemschema", - "//pkg/sql/execstats", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlstats", diff --git a/pkg/sql/sqlstats/persistedsqlstats/writer.go b/pkg/sql/sqlstats/persistedsqlstats/appStats.go similarity index 63% rename from pkg/sql/sqlstats/persistedsqlstats/writer.go rename to pkg/sql/sqlstats/persistedsqlstats/appStats.go index 04250beca5ba..e0e95cb928f0 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/writer.go +++ b/pkg/sql/sqlstats/persistedsqlstats/appStats.go @@ -14,62 +14,54 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/ssmemstorage" "github.com/cockroachdb/errors" ) -// StatsWriter is a sqlstats.Writer that wraps a in-memory node-local stats -// writer. StatsWriter signals the subsystem when it encounters memory pressure -// which will triggers the flush operation. -type StatsWriter struct { +// ApplicationStats is a sqlstats.ApplicationStats that wraps an in-memory +// node-local ApplicationStats. ApplicationStats signals the subsystem when it +// encounters memory pressure which will triggers the flush operation. +type ApplicationStats struct { // local in-memory storage. - memWriter sqlstats.Writer + sqlstats.ApplicationStats // Use to signal the stats writer is experiencing memory pressure. memoryPressureSignal chan struct{} } -var _ sqlstats.Writer = &StatsWriter{} +var _ sqlstats.ApplicationStats = &ApplicationStats{} -// RecordStatement implements sqlstats.Writer interface. -func (s *StatsWriter) RecordStatement( +// RecordStatement implements sqlstats.ApplicationStats interface. +func (s *ApplicationStats) RecordStatement( ctx context.Context, key roachpb.StatementStatisticsKey, value sqlstats.RecordedStmtStats, ) (roachpb.StmtFingerprintID, error) { var fingerprintID roachpb.StmtFingerprintID err := s.recordStatsOrSendMemoryPressureSignal(func() (err error) { - fingerprintID, err = s.memWriter.RecordStatement(ctx, key, value) + fingerprintID, err = s.ApplicationStats.RecordStatement(ctx, key, value) return err }) return fingerprintID, err } -// RecordStatementExecStats implements sqlstats.Writer interface. -func (s *StatsWriter) RecordStatementExecStats( - key roachpb.StatementStatisticsKey, stats execstats.QueryLevelStats, -) error { - return s.memWriter.RecordStatementExecStats(key, stats) -} - -// ShouldSaveLogicalPlanDesc implements sqlstats.Writer interface. -func (s *StatsWriter) ShouldSaveLogicalPlanDesc( +// ShouldSaveLogicalPlanDesc implements sqlstats.ApplicationStats interface. +func (s *ApplicationStats) ShouldSaveLogicalPlanDesc( fingerprint string, implicitTxn bool, database string, ) bool { - return s.memWriter.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, database) + return s.ApplicationStats.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, database) } -// RecordTransaction implements sqlstats.Writer interface and saves +// RecordTransaction implements sqlstats.ApplicationStats interface and saves // per-transaction statistics. -func (s *StatsWriter) RecordTransaction( +func (s *ApplicationStats) RecordTransaction( ctx context.Context, key roachpb.TransactionFingerprintID, value sqlstats.RecordedTxnStats, ) error { return s.recordStatsOrSendMemoryPressureSignal(func() error { - return s.memWriter.RecordTransaction(ctx, key, value) + return s.ApplicationStats.RecordTransaction(ctx, key, value) }) } -func (s *StatsWriter) recordStatsOrSendMemoryPressureSignal(fn func() error) error { +func (s *ApplicationStats) recordStatsOrSendMemoryPressureSignal(fn func() error) error { err := fn() if errors.Is(err, ssmemstorage.ErrFingerprintLimitReached) || errors.Is(err, ssmemstorage.ErrMemoryPressure) { select { diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go index c0bb67f817eb..e37e0697fe53 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/provider.go +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -77,7 +77,7 @@ type PersistedSQLStats struct { cfg *Config - // memoryPressureSignal is used by the persistedsqlstats.StatsWriter to signal + // memoryPressureSignal is used by the persistedsqlstats.ApplicationStats to signal // memory pressure during stats recording. A signal is emitted through this // channel either if the fingerprint limit or the memory limit has been // exceeded. @@ -199,11 +199,11 @@ func (s *PersistedSQLStats) jitterInterval(interval time.Duration) time.Duration return jitteredInterval } -// GetWriterForApplication implements sqlstats.Provider interface. -func (s *PersistedSQLStats) GetWriterForApplication(appName string) sqlstats.Writer { - writer := s.SQLStats.GetWriterForApplication(appName) - return &StatsWriter{ - memWriter: writer, +// GetApplicationStats implements sqlstats.Provider interface. +func (s *PersistedSQLStats) GetApplicationStats(appName string) sqlstats.ApplicationStats { + appStats := s.SQLStats.GetApplicationStats(appName) + return &ApplicationStats{ + ApplicationStats: appStats, memoryPressureSignal: s.memoryPressureSignal, } } diff --git a/pkg/sql/sqlstats/sslocal/sslocal_provider.go b/pkg/sql/sqlstats/sslocal/sslocal_provider.go index c0f59e852bed..9c158bce0d34 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_provider.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_provider.go @@ -101,8 +101,8 @@ func (s *SQLStats) periodicallyClearSQLStats( }) } -// GetWriterForApplication implements sqlstats.Provider interface. -func (s *SQLStats) GetWriterForApplication(appName string) sqlstats.Writer { +// GetApplicationStats implements sqlstats.Provider interface. +func (s *SQLStats) GetApplicationStats(appName string) sqlstats.ApplicationStats { s.mu.Lock() defer s.mu.Unlock() if a, ok := s.mu.apps[appName]; ok { @@ -173,7 +173,7 @@ func (s *SQLStats) TxnStatsIterator(options *sqlstats.IteratorOptions) *TxnStats // IterateAggregatedTransactionStats implements sqlstats.Provider interface. func (s *SQLStats) IterateAggregatedTransactionStats( - _ context.Context, + ctx context.Context, options *sqlstats.IteratorOptions, visitor sqlstats.AggregatedTransactionVisitor, ) error { @@ -182,7 +182,7 @@ func (s *SQLStats) IterateAggregatedTransactionStats( for _, appName := range appNames { statsContainer := s.getStatsForApplication(appName) - err := statsContainer.IterateAggregatedTransactionStats(appName, visitor) + err := statsContainer.IterateAggregatedTransactionStats(ctx, options, visitor) if err != nil { return fmt.Errorf("sql stats iteration abort: %s", err) } diff --git a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go index e8d48ddded06..b79ed8adb540 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go @@ -15,8 +15,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" ) -type statsCollector struct { - sqlstats.Writer +// StatsCollector is used to collect statement and transaction statistics +// from connExecutor. +type StatsCollector struct { + sqlstats.ApplicationStats // phaseTimes tracks session-level phase times. phaseTimes *sessionphase.Times @@ -26,33 +28,33 @@ type statsCollector struct { previousPhaseTimes *sessionphase.Times } -var _ sqlstats.StatsCollector = &statsCollector{} +var _ sqlstats.ApplicationStats = &StatsCollector{} // NewStatsCollector returns an instance of sqlstats.StatsCollector. func NewStatsCollector( - writer sqlstats.Writer, phaseTime *sessionphase.Times, -) sqlstats.StatsCollector { - return &statsCollector{ - Writer: writer, - phaseTimes: phaseTime.Clone(), + appStats sqlstats.ApplicationStats, phaseTime *sessionphase.Times, +) *StatsCollector { + return &StatsCollector{ + ApplicationStats: appStats, + phaseTimes: phaseTime.Clone(), } } // PhaseTimes implements sqlstats.StatsCollector interface. -func (s *statsCollector) PhaseTimes() *sessionphase.Times { +func (s *StatsCollector) PhaseTimes() *sessionphase.Times { return s.phaseTimes } // PreviousPhaseTimes implements sqlstats.StatsCollector interface. -func (s *statsCollector) PreviousPhaseTimes() *sessionphase.Times { +func (s *StatsCollector) PreviousPhaseTimes() *sessionphase.Times { return s.previousPhaseTimes } // Reset implements sqlstats.StatsCollector interface. -func (s *statsCollector) Reset(writer sqlstats.Writer, phaseTime *sessionphase.Times) { +func (s *StatsCollector) Reset(appStats sqlstats.ApplicationStats, phaseTime *sessionphase.Times) { previousPhaseTime := s.phaseTimes - *s = statsCollector{ - Writer: writer, + *s = StatsCollector{ + ApplicationStats: appStats, previousPhaseTimes: previousPhaseTime, phaseTimes: phaseTime.Clone(), } diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go index f5af7cbc2dde..48780a53203a 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go @@ -99,7 +99,7 @@ type Container struct { txnCounts transactionCounts } -var _ sqlstats.Writer = &Container{} +var _ sqlstats.ApplicationStats = &Container{} // New returns a new instance of Container. func New( @@ -131,17 +131,17 @@ func New( return s } -// IterateAggregatedTransactionStats iterates through the stored aggregated -// transaction statistics stored in this Container. +// IterateAggregatedTransactionStats implements sqlstats.ApplicationStats +// interface. func (s *Container) IterateAggregatedTransactionStats( - appName string, visitor sqlstats.AggregatedTransactionVisitor, + _ context.Context, _ *sqlstats.IteratorOptions, visitor sqlstats.AggregatedTransactionVisitor, ) error { var txnStat roachpb.TxnStats s.txnCounts.mu.Lock() txnStat = s.txnCounts.mu.TxnStats s.txnCounts.mu.Unlock() - err := visitor(appName, &txnStat) + err := visitor(s.appName, &txnStat) if err != nil { return fmt.Errorf("sql stats iteration abort: %s", err) } @@ -207,6 +207,37 @@ func (s *Container) TxnStatsIterator(options *sqlstats.IteratorOptions) *TxnStat return NewTxnStatsIterator(s, options) } +// IterateStatementStats implements sqlstats.Provider interface. +func (s *Container) IterateStatementStats( + ctx context.Context, options *sqlstats.IteratorOptions, visitor sqlstats.StatementVisitor, +) error { + iter := s.StmtStatsIterator(options) + + for iter.Next() { + if err := visitor(ctx, iter.Cur()); err != nil { + return err + } + } + + return nil +} + +// IterateTransactionStats implements sqlstats.Provider interface. +func (s *Container) IterateTransactionStats( + ctx context.Context, options *sqlstats.IteratorOptions, visitor sqlstats.TransactionVisitor, +) error { + iter := s.TxnStatsIterator(options) + + for iter.Next() { + stats := iter.Cur() + if err := visitor(ctx, stats); err != nil { + return err + } + } + + return nil +} + // NewTempContainerFromExistingStmtStats creates a new Container by ingesting a slice // of serverpb.StatementsResponse_CollectedStatementStatistics sorted by // Key.KeyData.App field. diff --git a/pkg/sql/sqlstats/ssprovider.go b/pkg/sql/sqlstats/ssprovider.go index c3db4ecbbf8a..6d33eb583e2a 100644 --- a/pkg/sql/sqlstats/ssprovider.go +++ b/pkg/sql/sqlstats/ssprovider.go @@ -55,9 +55,6 @@ type Writer interface { // Reader provides methods to retrieve transaction/statement statistics from // the Storage. type Reader interface { - // GetLastReset returns the last time when the sqlstats is being reset. - GetLastReset() time.Time - // IterateStatementStats iterates through all the collected statement statistics // by using StatementVisitor. Caller can specify iteration behavior, such // as ordering, through IteratorOptions argument. StatementVisitor can return @@ -82,6 +79,13 @@ type Reader interface { GetTransactionStats(appName string, key roachpb.TransactionFingerprintID) (*roachpb.CollectedTransactionStatistics, error) } +// ApplicationStats is an interface to read from or write to the statistics +// belongs to an application. +type ApplicationStats interface { + Reader + Writer +} + // IteratorOptions provides the ability to the caller to change how it iterates // the statements and transactions. // TODO(azhng): introduce StartTime and EndTime field so we can implement @@ -126,9 +130,9 @@ type StatsCollector interface { // was previously tracking before being Reset. PreviousPhaseTimes() *sessionphase.Times - // Reset resets the StatsCollector with a new Writer and a new copy of the - // sessionphase.Times. - Reset(Writer, *sessionphase.Times) + // Reset resets the StatsCollector with a new ApplicationStats and a new copy + // of the sessionphase.Times. + Reset(ApplicationStats, *sessionphase.Times) } // Storage provides clients with interface to perform read and write operations @@ -136,9 +140,12 @@ type StatsCollector interface { type Storage interface { Reader - // GetWriterForApplication returns a Writer instance for the given application - // name. - GetWriterForApplication(appName string) Writer + // GetLastReset returns the last time when the sqlstats is being reset. + GetLastReset() time.Time + + // GetApplicationStats returns an ApplicationStats instance for the given + // application name. + GetApplicationStats(appName string) ApplicationStats // Reset resets all the statistics stored in-memory in the current Storage. Reset(context.Context) error