Skip to content

Commit

Permalink
sessiondata: extract many fields into a protobuf
Browse files Browse the repository at this point in the history
This commit extract all "simple" fields from `sessiondata.SessionData`
object that need to be serialized and sent to the remote nodes in order
to get the correct execution into a protobuf. All fields in
`SessionData` object have been audited to make sure that all the
necessary ones are serialized. Only one omission was found - we were not
propagating the default int size - and it is now fixed.

Release note: None
  • Loading branch information
yuzefovich committed Oct 15, 2020
1 parent d8a4d5f commit 0e3822e
Show file tree
Hide file tree
Showing 43 changed files with 1,414 additions and 761 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func descForTable(

var testEvalCtx = &tree.EvalContext{
SessionData: &sessiondata.SessionData{
DataConversion: sessiondata.DataConversionConfig{Location: time.UTC},
Location: time.UTC,
},
StmtTimestamp: timeutil.Unix(100000000, 0),
Settings: cluster.MakeTestingClusterSettings(),
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/sql_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/lex"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1154,7 +1155,7 @@ func formatVal(val driver.Value, showPrintableUnicode bool, showNewLinesAndTabs
// that we can let the user see and control the result using
// `bytea_output`.
return lex.EncodeByteArrayToRawBytes(string(t),
lex.BytesEncodeEscape, false /* skipHexPrefix */)
sessiondatapb.BytesEncodeEscape, false /* skipHexPrefix */)

case time.Time:
// Since we do not know whether the datum is Timestamp or TimestampTZ,
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -147,10 +147,10 @@ func needHashAggregator(aggSpec *execinfrapb.AggregatorSpec) (bool, error) {
// isSupported checks whether we have a columnar operator equivalent to a
// processor described by spec. Note that it doesn't perform any other checks
// (like validity of the number of inputs).
func isSupported(mode sessiondata.VectorizeExecMode, spec *execinfrapb.ProcessorSpec) error {
func isSupported(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.ProcessorSpec) error {
core := spec.Core
isFullVectorization := mode == sessiondata.VectorizeOn ||
mode == sessiondata.VectorizeExperimentalAlways
isFullVectorization := mode == sessiondatapb.VectorizeOn ||
mode == sessiondatapb.VectorizeExperimentalAlways

switch {
case core.Noop != nil:
Expand Down Expand Up @@ -429,9 +429,9 @@ func (r opResult) createAndWrapRowSource(
}
if spec.Core.JoinReader == nil {
switch flowCtx.EvalCtx.SessionData.VectorizeMode {
case sessiondata.Vectorize201Auto:
case sessiondatapb.Vectorize201Auto:
return errors.New("rowexec processor wrapping for non-JoinReader core unsupported in vectorize=201auto mode")
case sessiondata.VectorizeExperimentalAlways:
case sessiondatapb.VectorizeExperimentalAlways:
return causeToWrap
}
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/colexec/window_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils/colcontainerutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -48,7 +47,6 @@ func TestWindowFunctions(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
evalCtx.SessionData.VectorizeMode = sessiondata.VectorizeOn
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1034,7 +1034,7 @@ func (s *vectorizedFlowCreator) setupFlow(
if flowCtx.EvalCtx.SessionData.TestingVectorizeInjectPanics {
result.Op = colexec.NewPanicInjector(result.Op)
}
if flowCtx.EvalCtx.SessionData.VectorizeMode == sessiondata.Vectorize201Auto &&
if flowCtx.EvalCtx.SessionData.VectorizeMode == sessiondatapb.Vectorize201Auto &&
!result.IsStreaming {
err = errors.Errorf("non-streaming operator encountered when vectorize=201auto")
return
Expand Down Expand Up @@ -1076,7 +1076,7 @@ func (s *vectorizedFlowCreator) setupFlow(
}
}

if (flowCtx.EvalCtx.SessionData.VectorizeMode == sessiondata.Vectorize201Auto) &&
if (flowCtx.EvalCtx.SessionData.VectorizeMode == sessiondatapb.Vectorize201Auto) &&
pspec.Output[0].Type == execinfrapb.OutputRouterSpec_BY_HASH {
// colexec.HashRouter is not supported when vectorize=auto since it can
// buffer an unlimited number of tuples, even though it falls back to
Expand Down
26 changes: 12 additions & 14 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -462,18 +463,13 @@ type ConnectionHandler struct {
// GetUnqualifiedIntSize implements pgwire.sessionDataProvider and returns
// the type that INT should be parsed as.
func (h ConnectionHandler) GetUnqualifiedIntSize() *types.T {
var size int
var size int32
if h.ex != nil {
// The executor will be nil in certain testing situations where
// no server is actually present.
size = h.ex.sessionData.DefaultIntSize
}
switch size {
case 4, 32:
return types.Int4
default:
return types.Int
}
return parser.NakedIntTypeFromDefaultIntSize(size)
}

// GetParamStatus retrieves the configured value of the session
Expand Down Expand Up @@ -512,7 +508,9 @@ func (s *Server) ServeConn(
// newSessionData a SessionData that can be passed to newConnExecutor.
func (s *Server) newSessionData(args SessionArgs) *sessiondata.SessionData {
sd := &sessiondata.SessionData{
User: args.User,
SessionData: sessiondatapb.SessionData{
User: args.User,
},
RemoteAddr: args.RemoteAddr,
ResultsBufferSize: args.ConnResultsBufferSize,
}
Expand All @@ -537,10 +535,8 @@ func (s *Server) populateMinimalSessionData(sd *sessiondata.SessionData) {
if sd.SequenceState == nil {
sd.SequenceState = sessiondata.NewSequenceState()
}
if sd.DataConversion == (sessiondata.DataConversionConfig{}) {
sd.DataConversion = sessiondata.DataConversionConfig{
Location: time.UTC,
}
if sd.Location == nil {
sd.Location = time.UTC
}
if len(sd.SearchPath.GetPathArray()) == 0 {
sd.SearchPath = sessiondata.DefaultSearchPathForUser(sd.User)
Expand Down Expand Up @@ -1462,7 +1458,8 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
NeedRowDesc,
pos,
nil, /* formatCodes */
ex.sessionData.DataConversion,
ex.sessionData.DataConversionConfig,
ex.sessionData.Location,
0, /* limit */
"", /* portalName */
ex.implicitTxn(),
Expand Down Expand Up @@ -1533,7 +1530,8 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
// needed.
DontNeedRowDesc,
pos, portal.OutFormats,
ex.sessionData.DataConversion,
ex.sessionData.DataConversionConfig,
ex.sessionData.Location,
tcmd.Limit,
portalName,
ex.implicitTxn(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (ex *connExecutor) execBind(
"expected %d arguments, got %d", numQArgs, len(bindCmd.Args)))
}

ptCtx := tree.NewParseTimeContext(ex.state.sqlTimestamp.In(ex.sessionData.DataConversion.Location))
ptCtx := tree.NewParseTimeContext(ex.state.sqlTimestamp.In(ex.sessionData.Location))

for i, arg := range bindCmd.Args {
k := tree.PlaceholderIdx(i)
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/ring"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -591,7 +591,8 @@ type ClientComm interface {
descOpt RowDescOpt,
pos CmdPos,
formatCodes []pgwirebase.FormatCode,
conv sessiondata.DataConversionConfig,
conv sessiondatapb.DataConversionConfig,
location *time.Location,
limit int,
portalName string,
implicitTxn bool,
Expand Down
29 changes: 5 additions & 24 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/faketreeeval"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/lex"
"github.com/cockroachdb/cockroach/pkg/sql/rowflow"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
Expand Down Expand Up @@ -268,34 +268,15 @@ func (ds *ServerImpl) setupFlow(
return ctx, nil, err
}

var be lex.BytesEncodeFormat
switch req.EvalContext.BytesEncodeFormat {
case execinfrapb.BytesEncodeFormat_HEX:
be = lex.BytesEncodeHex
case execinfrapb.BytesEncodeFormat_ESCAPE:
be = lex.BytesEncodeEscape
case execinfrapb.BytesEncodeFormat_BASE64:
be = lex.BytesEncodeBase64
default:
return nil, nil, errors.AssertionFailedf("unknown byte encode format: %s",
errors.Safe(req.EvalContext.BytesEncodeFormat))
}
sd := &sessiondata.SessionData{
ApplicationName: req.EvalContext.ApplicationName,
Database: req.EvalContext.Database,
User: req.EvalContext.User,
SessionData: req.EvalContext.SessionData,
SearchPath: sessiondata.MakeSearchPath(
req.EvalContext.SearchPath,
).WithTemporarySchemaName(
req.EvalContext.TemporarySchemaName,
).WithUserSchemaName(req.EvalContext.User),
).WithUserSchemaName(req.EvalContext.SessionData.User),
SequenceState: sessiondata.NewSequenceState(),
DataConversion: sessiondata.DataConversionConfig{
Location: location,
BytesEncodeFormat: be,
ExtraFloatDigits: int(req.EvalContext.ExtraFloatDigits),
},
VectorizeMode: sessiondata.VectorizeExecMode(req.EvalContext.Vectorize),
Location: location,
}
ie := &lazyInternalExecutor{
newInternalExecutor: func() sqlutil.InternalExecutor {
Expand Down Expand Up @@ -352,7 +333,7 @@ func (ds *ServerImpl) setupFlow(
// have non-nil localState.EvalContext. We don't want to update EvalContext
// itself when the vectorize mode needs to be changed because we would need
// to restore the original value which can have data races under stress.
isVectorized := sessiondata.VectorizeExecMode(req.EvalContext.Vectorize) != sessiondata.VectorizeOff
isVectorized := req.EvalContext.SessionData.VectorizeMode != sessiondatapb.VectorizeOff
f := newFlow(flowCtx, ds.flowRegistry, syncFlowConsumer, localState.LocalProcs, isVectorized)
opt := flowinfra.FuseNormally
if localState.IsLocal {
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
Expand Down Expand Up @@ -151,13 +151,13 @@ func (dsp *DistSQLPlanner) setupFlows(
resultChan = make(chan runnerResult, len(flows)-1)
}

if evalCtx.SessionData.VectorizeMode != sessiondata.VectorizeOff {
if !vectorizeThresholdMet && (evalCtx.SessionData.VectorizeMode == sessiondata.Vectorize201Auto || evalCtx.SessionData.VectorizeMode == sessiondata.VectorizeOn) {
if evalCtx.SessionData.VectorizeMode != sessiondatapb.VectorizeOff {
if !vectorizeThresholdMet && (evalCtx.SessionData.VectorizeMode == sessiondatapb.Vectorize201Auto || evalCtx.SessionData.VectorizeMode == sessiondatapb.VectorizeOn) {
// Vectorization is not justified for this flow because the expected
// amount of data is too small and the overhead of pre-allocating data
// structures needed for the vectorized engine is expected to dominate
// the execution time.
setupReq.EvalContext.Vectorize = int32(sessiondata.VectorizeOff)
setupReq.EvalContext.SessionData.VectorizeMode = sessiondatapb.VectorizeOff
} else {
// Now we check to see whether or not to even try vectorizing the flow.
// The goal here is to determine up front whether all of the flows can be
Expand All @@ -182,7 +182,7 @@ func (dsp *DistSQLPlanner) setupFlows(
); err != nil {
// Vectorization attempt failed with an error.
returnVectorizationSetupError := false
if evalCtx.SessionData.VectorizeMode == sessiondata.VectorizeExperimentalAlways {
if evalCtx.SessionData.VectorizeMode == sessiondatapb.VectorizeExperimentalAlways {
returnVectorizationSetupError = true
// If running with VectorizeExperimentalAlways, this check makes sure
// that we can still run SET statements (mostly to set vectorize to
Expand All @@ -206,7 +206,7 @@ func (dsp *DistSQLPlanner) setupFlows(
}
// Vectorization is not supported for this flow, so we override the
// setting.
setupReq.EvalContext.Vectorize = int32(sessiondata.VectorizeOff)
setupReq.EvalContext.SessionData.VectorizeMode = sessiondatapb.VectorizeOff
break
}
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier"
"github.com/cockroachdb/cockroach/pkg/sql/lex"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand All @@ -60,6 +59,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
Expand Down Expand Up @@ -305,9 +305,9 @@ var VectorizeClusterMode = settings.RegisterEnumSetting(
"default vectorize mode",
"on",
map[int64]string{
int64(sessiondata.VectorizeOff): "off",
int64(sessiondata.Vectorize201Auto): "201auto",
int64(sessiondata.VectorizeOn): "on",
int64(sessiondatapb.VectorizeOff): "off",
int64(sessiondatapb.Vectorize201Auto): "201auto",
int64(sessiondatapb.VectorizeOn): "on",
},
)

Expand Down Expand Up @@ -2054,12 +2054,12 @@ func (m *sessionDataMutator) SetApplicationName(appName string) {
m.paramStatusUpdater.BufferParamStatusUpdate("application_name", appName)
}

func (m *sessionDataMutator) SetBytesEncodeFormat(val lex.BytesEncodeFormat) {
m.data.DataConversion.BytesEncodeFormat = val
func (m *sessionDataMutator) SetBytesEncodeFormat(val sessiondatapb.BytesEncodeFormat) {
m.data.DataConversionConfig.BytesEncodeFormat = val
}

func (m *sessionDataMutator) SetExtraFloatDigits(val int) {
m.data.DataConversion.ExtraFloatDigits = val
func (m *sessionDataMutator) SetExtraFloatDigits(val int32) {
m.data.DataConversionConfig.ExtraFloatDigits = val
}

func (m *sessionDataMutator) SetDatabase(dbName string) {
Expand All @@ -2078,7 +2078,7 @@ func (m *sessionDataMutator) SetTemporarySchemaIDForDatabase(dbID uint32, tempSc
m.data.DatabaseIDToTempSchemaID[dbID] = tempSchemaID
}

func (m *sessionDataMutator) SetDefaultIntSize(size int) {
func (m *sessionDataMutator) SetDefaultIntSize(size int32) {
m.data.DefaultIntSize = size
}

Expand Down Expand Up @@ -2128,7 +2128,7 @@ func (m *sessionDataMutator) SetReorderJoinsLimit(val int) {
m.data.ReorderJoinsLimit = val
}

func (m *sessionDataMutator) SetVectorize(val sessiondata.VectorizeExecMode) {
func (m *sessionDataMutator) SetVectorize(val sessiondatapb.VectorizeExecMode) {
m.data.VectorizeMode = val
}

Expand Down Expand Up @@ -2177,7 +2177,7 @@ func (m *sessionDataMutator) UpdateSearchPath(paths []string) {
}

func (m *sessionDataMutator) SetLocation(loc *time.Location) {
m.data.DataConversion.Location = loc
m.data.Location = loc
m.paramStatusUpdater.BufferParamStatusUpdate("TimeZone", sessionDataTimeZoneFormat(loc))
}

Expand Down
Loading

0 comments on commit 0e3822e

Please sign in to comment.