Skip to content

Commit

Permalink
sql: restructure sqlstats public writer API
Browse files Browse the repository at this point in the history
Previously, SQL Stats's public writer interface is was very limited
in its functionality. This was intentional back in the time where
SQL Stat's writer was injected into the stats collector and directly
wrote statistics into the in-memory store. However, as we move
to support grouping statement statistics within an explicit
transaction, we need to create short-lived emphemeral sqlstats.Writer
to record statement statistics for explicit transactions, then merge
the stored statistics inside the emphemeral sqlstats.Writer into
the main SQLStats. However, this merge cannot be implemented with
the current sqlstats.Writer API, since the API does not allow statistics
to be fetched from the sqlstats.Writer.

This commit restructures the SQL Stats Writer API to introduce
the concept of sqlstats.ApplicationStats, which is backed by the
ssmemstorage.Container struct. Now, sqlstats.Storage interface
would return the new sqlstats.ApplicationStats intead of
sqlstats.Writer to the connExecutor, which can be injected into
the sqlstats.StatsCollector.

This is the initial step to address cockroachdb#59205

Release justification: Low risk, high benefit changes to existing
functionality
Release note: None
  • Loading branch information
Azhng committed Sep 15, 2021
1 parent 30c199a commit c5e64d6
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 82 deletions.
24 changes: 12 additions & 12 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func (s *Server) SetupConn(

ex := s.newConnExecutor(
ctx, sdMutIterator, stmtBuf, clientComm, memMetrics, &s.Metrics,
s.sqlStats.GetWriterForApplication(sd.ApplicationName),
s.sqlStats.GetApplicationStats(sd.ApplicationName),
)
return ConnectionHandler{ex}, nil
}
Expand Down Expand Up @@ -733,7 +733,7 @@ func (s *Server) newConnExecutor(
clientComm ClientComm,
memMetrics MemoryMetrics,
srvMetrics *Metrics,
statsWriter sqlstats.Writer,
applicationStats sqlstats.ApplicationStats,
) *connExecutor {
// Create the various monitors.
// The session monitors are started in activate().
Expand Down Expand Up @@ -811,11 +811,11 @@ func (s *Server) newConnExecutor(
}

ex.applicationName.Store(ex.sessionData().ApplicationName)
ex.statsWriter = statsWriter
ex.statsCollector = sslocal.NewStatsCollector(statsWriter, ex.phaseTimes)
ex.applicationStats = applicationStats
ex.statsCollector = sslocal.NewStatsCollector(applicationStats, ex.phaseTimes)
ex.dataMutatorIterator.onApplicationNameChange = func(newName string) {
ex.applicationName.Store(newName)
ex.statsWriter = ex.server.sqlStats.GetWriterForApplication(newName)
ex.applicationStats = ex.server.sqlStats.GetApplicationStats(newName)
}

ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionInit, timeutil.Now())
Expand Down Expand Up @@ -866,7 +866,7 @@ func (s *Server) newConnExecutorWithTxn(
srvMetrics *Metrics,
txn *kv.Txn,
syntheticDescs []catalog.Descriptor,
statsWriter sqlstats.Writer,
applicationStats sqlstats.ApplicationStats,
) *connExecutor {
ex := s.newConnExecutor(
ctx,
Expand All @@ -875,7 +875,7 @@ func (s *Server) newConnExecutorWithTxn(
clientComm,
memMetrics,
srvMetrics,
statsWriter,
applicationStats,
)
if txn.Type() == kv.LeafTxn {
// If the txn is a leaf txn it is not allowed to perform mutations. For
Expand Down Expand Up @@ -1274,10 +1274,10 @@ type connExecutor struct {
// executor.
dataMutatorIterator *sessionDataMutatorIterator

// statsWriter is a writer interface for recording per-application SQL usage
// statistics. It is maintained to represent statistics for the application
// currently identified by sessiondata.ApplicationName.
statsWriter sqlstats.Writer
// applicationStats records per-application SQL usage statistics. It is
// maintained to represent statistics for the application currently identified
// by sessiondata.ApplicationName.
applicationStats sqlstats.ApplicationStats

// statsCollector is used to collect statistics about SQL statements and
// transactions.
Expand Down Expand Up @@ -2169,7 +2169,7 @@ func (ex *connExecutor) execCopyIn(
// state machine, but the copyMachine manages its own transactions without
// going through the state machine.
ex.state.sqlTimestamp = txnTS
ex.statsCollector.Reset(ex.statsWriter, ex.phaseTimes)
ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
ex.initPlanner(ctx, p)
ex.resetPlanner(ctx, p, txn, stmtTS)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (ex *connExecutor) execStmt(
// Run observer statements in a separate code path; their execution does not
// depend on the current transaction state.
if _, ok := ast.(tree.ObserverStatement); ok {
ex.statsCollector.Reset(ex.statsWriter, ex.phaseTimes)
ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
err := ex.runObserverStatement(ctx, ast, res)
// Note that regardless of res.Err(), these observer statements don't
// generate error events; transactions are always allowed to continue.
Expand Down Expand Up @@ -363,7 +363,7 @@ func (ex *connExecutor) execStmtInOpenState(

p := &ex.planner
stmtTS := ex.server.cfg.Clock.PhysicalTime()
ex.statsCollector.Reset(ex.statsWriter, ex.phaseTimes)
ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS)
p.sessionDataMutatorIterator.paramStatusUpdater = res
p.noticeSender = res
Expand Down Expand Up @@ -1425,7 +1425,7 @@ func (ex *connExecutor) beginTransactionTimestampsAndReadMode(
rwMode = ex.readWriteModeWithSessionDefault(modes.ReadWriteMode)
return rwMode, now, nil, nil
}
ex.statsCollector.Reset(ex.statsWriter, ex.phaseTimes)
ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
p := &ex.planner

// NB: this use of p.txn is totally bogus. The planner's txn should
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (ex *connExecutor) prepare(

var flags planFlags
prepare := func(ctx context.Context, txn *kv.Txn) (err error) {
ex.statsCollector.Reset(ex.statsWriter, ex.phaseTimes)
ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
p := &ex.planner
if origin != PreparedStatementOriginSQL {
// If the PREPARE command was issued as a SQL statement, then we
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (ie *InternalExecutor) initConnEx(
// If this is already an "internal app", don't put more prefix.
appStatsBucketName = sd.ApplicationName
}
statsWriter := ie.s.sqlStats.GetWriterForApplication(appStatsBucketName)
applicationStats := ie.s.sqlStats.GetApplicationStats(appStatsBucketName)

sds := sessiondata.NewStack(sd)
sdMutIterator := ie.s.makeSessionDataMutatorIterator(sds, nil /* sessionDefaults */)
Expand All @@ -186,7 +186,7 @@ func (ie *InternalExecutor) initConnEx(
clientComm,
ie.memMetrics,
&ie.s.InternalMetrics,
statsWriter,
applicationStats,
)
} else {
ex = ie.s.newConnExecutorWithTxn(
Expand All @@ -199,7 +199,7 @@ func (ie *InternalExecutor) initConnEx(
&ie.s.InternalMetrics,
txn,
ie.syntheticDescriptors,
statsWriter,
applicationStats,
)
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "persistedsqlstats",
srcs = [
"appStats.go",
"cluster_settings.go",
"combined_iterator.go",
"compaction_exec.go",
Expand All @@ -16,7 +17,6 @@ go_library(
"scheduled_job_monitor.go",
"stmt_reader.go",
"txn_reader.go",
"writer.go",
],
embed = [":persistedsqlstats_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats",
Expand All @@ -34,7 +34,6 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/execstats",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlstats",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,62 +14,54 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/ssmemstorage"
"github.com/cockroachdb/errors"
)

// StatsWriter is a sqlstats.Writer that wraps a in-memory node-local stats
// writer. StatsWriter signals the subsystem when it encounters memory pressure
// which will triggers the flush operation.
type StatsWriter struct {
// ApplicationStats is a sqlstats.ApplicationStats that wraps an in-memory
// node-local ApplicationStats. ApplicationStats signals the subsystem when it
// encounters memory pressure which will triggers the flush operation.
type ApplicationStats struct {
// local in-memory storage.
memWriter sqlstats.Writer
sqlstats.ApplicationStats

// Use to signal the stats writer is experiencing memory pressure.
memoryPressureSignal chan struct{}
}

var _ sqlstats.Writer = &StatsWriter{}
var _ sqlstats.ApplicationStats = &ApplicationStats{}

// RecordStatement implements sqlstats.Writer interface.
func (s *StatsWriter) RecordStatement(
// RecordStatement implements sqlstats.ApplicationStats interface.
func (s *ApplicationStats) RecordStatement(
ctx context.Context, key roachpb.StatementStatisticsKey, value sqlstats.RecordedStmtStats,
) (roachpb.StmtFingerprintID, error) {
var fingerprintID roachpb.StmtFingerprintID
err := s.recordStatsOrSendMemoryPressureSignal(func() (err error) {
fingerprintID, err = s.memWriter.RecordStatement(ctx, key, value)
fingerprintID, err = s.ApplicationStats.RecordStatement(ctx, key, value)
return err
})
return fingerprintID, err
}

// RecordStatementExecStats implements sqlstats.Writer interface.
func (s *StatsWriter) RecordStatementExecStats(
key roachpb.StatementStatisticsKey, stats execstats.QueryLevelStats,
) error {
return s.memWriter.RecordStatementExecStats(key, stats)
}

// ShouldSaveLogicalPlanDesc implements sqlstats.Writer interface.
func (s *StatsWriter) ShouldSaveLogicalPlanDesc(
// ShouldSaveLogicalPlanDesc implements sqlstats.ApplicationStats interface.
func (s *ApplicationStats) ShouldSaveLogicalPlanDesc(
fingerprint string, implicitTxn bool, database string,
) bool {
return s.memWriter.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, database)
return s.ApplicationStats.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, database)
}

// RecordTransaction implements sqlstats.Writer interface and saves
// RecordTransaction implements sqlstats.ApplicationStats interface and saves
// per-transaction statistics.
func (s *StatsWriter) RecordTransaction(
func (s *ApplicationStats) RecordTransaction(
ctx context.Context, key roachpb.TransactionFingerprintID, value sqlstats.RecordedTxnStats,
) error {
return s.recordStatsOrSendMemoryPressureSignal(func() error {
return s.memWriter.RecordTransaction(ctx, key, value)
return s.ApplicationStats.RecordTransaction(ctx, key, value)
})
}

func (s *StatsWriter) recordStatsOrSendMemoryPressureSignal(fn func() error) error {
func (s *ApplicationStats) recordStatsOrSendMemoryPressureSignal(fn func() error) error {
err := fn()
if errors.Is(err, ssmemstorage.ErrFingerprintLimitReached) || errors.Is(err, ssmemstorage.ErrMemoryPressure) {
select {
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/sqlstats/persistedsqlstats/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type PersistedSQLStats struct {

cfg *Config

// memoryPressureSignal is used by the persistedsqlstats.StatsWriter to signal
// memoryPressureSignal is used by the persistedsqlstats.ApplicationStats to signal
// memory pressure during stats recording. A signal is emitted through this
// channel either if the fingerprint limit or the memory limit has been
// exceeded.
Expand Down Expand Up @@ -199,11 +199,11 @@ func (s *PersistedSQLStats) jitterInterval(interval time.Duration) time.Duration
return jitteredInterval
}

// GetWriterForApplication implements sqlstats.Provider interface.
func (s *PersistedSQLStats) GetWriterForApplication(appName string) sqlstats.Writer {
writer := s.SQLStats.GetWriterForApplication(appName)
return &StatsWriter{
memWriter: writer,
// GetApplicationStats implements sqlstats.Provider interface.
func (s *PersistedSQLStats) GetApplicationStats(appName string) sqlstats.ApplicationStats {
appStats := s.SQLStats.GetApplicationStats(appName)
return &ApplicationStats{
ApplicationStats: appStats,
memoryPressureSignal: s.memoryPressureSignal,
}
}
8 changes: 4 additions & 4 deletions pkg/sql/sqlstats/sslocal/sslocal_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func (s *SQLStats) periodicallyClearSQLStats(
})
}

// GetWriterForApplication implements sqlstats.Provider interface.
func (s *SQLStats) GetWriterForApplication(appName string) sqlstats.Writer {
// GetApplicationStats implements sqlstats.Provider interface.
func (s *SQLStats) GetApplicationStats(appName string) sqlstats.ApplicationStats {
s.mu.Lock()
defer s.mu.Unlock()
if a, ok := s.mu.apps[appName]; ok {
Expand Down Expand Up @@ -173,7 +173,7 @@ func (s *SQLStats) TxnStatsIterator(options *sqlstats.IteratorOptions) *TxnStats

// IterateAggregatedTransactionStats implements sqlstats.Provider interface.
func (s *SQLStats) IterateAggregatedTransactionStats(
_ context.Context,
ctx context.Context,
options *sqlstats.IteratorOptions,
visitor sqlstats.AggregatedTransactionVisitor,
) error {
Expand All @@ -182,7 +182,7 @@ func (s *SQLStats) IterateAggregatedTransactionStats(
for _, appName := range appNames {
statsContainer := s.getStatsForApplication(appName)

err := statsContainer.IterateAggregatedTransactionStats(appName, visitor)
err := statsContainer.IterateAggregatedTransactionStats(ctx, options, visitor)
if err != nil {
return fmt.Errorf("sql stats iteration abort: %s", err)
}
Expand Down
28 changes: 15 additions & 13 deletions pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
)

type statsCollector struct {
sqlstats.Writer
// StatsCollector is used to collect statement and transaction statistics
// from connExecutor.
type StatsCollector struct {
sqlstats.ApplicationStats

// phaseTimes tracks session-level phase times.
phaseTimes *sessionphase.Times
Expand All @@ -26,33 +28,33 @@ type statsCollector struct {
previousPhaseTimes *sessionphase.Times
}

var _ sqlstats.StatsCollector = &statsCollector{}
var _ sqlstats.ApplicationStats = &StatsCollector{}

// NewStatsCollector returns an instance of sqlstats.StatsCollector.
func NewStatsCollector(
writer sqlstats.Writer, phaseTime *sessionphase.Times,
) sqlstats.StatsCollector {
return &statsCollector{
Writer: writer,
phaseTimes: phaseTime.Clone(),
appStats sqlstats.ApplicationStats, phaseTime *sessionphase.Times,
) *StatsCollector {
return &StatsCollector{
ApplicationStats: appStats,
phaseTimes: phaseTime.Clone(),
}
}

// PhaseTimes implements sqlstats.StatsCollector interface.
func (s *statsCollector) PhaseTimes() *sessionphase.Times {
func (s *StatsCollector) PhaseTimes() *sessionphase.Times {
return s.phaseTimes
}

// PreviousPhaseTimes implements sqlstats.StatsCollector interface.
func (s *statsCollector) PreviousPhaseTimes() *sessionphase.Times {
func (s *StatsCollector) PreviousPhaseTimes() *sessionphase.Times {
return s.previousPhaseTimes
}

// Reset implements sqlstats.StatsCollector interface.
func (s *statsCollector) Reset(writer sqlstats.Writer, phaseTime *sessionphase.Times) {
func (s *StatsCollector) Reset(appStats sqlstats.ApplicationStats, phaseTime *sessionphase.Times) {
previousPhaseTime := s.phaseTimes
*s = statsCollector{
Writer: writer,
*s = StatsCollector{
ApplicationStats: appStats,
previousPhaseTimes: previousPhaseTime,
phaseTimes: phaseTime.Clone(),
}
Expand Down
Loading

0 comments on commit c5e64d6

Please sign in to comment.