From 1cdffd8b1945d4d89c22bcceed3b2e5916343d9f Mon Sep 17 00:00:00 2001 From: Jordan Lewis Date: Sun, 23 Apr 2017 02:17:23 -0400 Subject: [PATCH] sql: prepared statements cache parsed statement Previously, prepared statements contained only the text of their query, causing them to be re-parsed at execution time. Now, they contain a parsed query. This improves throughput of kv -read-percent=100 -concurrency=1 by about 15% on a local single-node cluster. --- pkg/sql/executor.go | 56 ++++++++++++++++++++++++++++++++------------ pkg/sql/pgwire/v3.go | 34 +++++++++++++++++++++------ pkg/sql/prepare.go | 36 ++++++++++++++++++++-------- 3 files changed, 94 insertions(+), 32 deletions(-) diff --git a/pkg/sql/executor.go b/pkg/sql/executor.go index 8d43e0774c6d..8630931e0049 100644 --- a/pkg/sql/executor.go +++ b/pkg/sql/executor.go @@ -406,19 +406,12 @@ func (e *Executor) getDatabaseCache() *databaseCache { // populate the missing types. The PreparedStatement is returned (or // nil if there are no results). func (e *Executor) Prepare( - query string, session *Session, pinfo parser.PlaceholderTypes, + stmts parser.StatementList, session *Session, pinfo parser.PlaceholderTypes, ) (*PreparedStatement, error) { session.resetForBatch(e) - sessionEventf(session, "preparing: %s", query) - - var p parser.Parser - stmts, err := p.Parse(query) - if err != nil { - return nil, err - } + sessionEventf(session, "preparing: %s", stmts) prepared := &PreparedStatement{ - Query: query, SQLTypes: pinfo, portalNames: make(map[string]struct{}), } @@ -431,8 +424,8 @@ func (e *Executor) Prepare( return nil, errWrongNumberOfPreparedStatements(len(stmts)) } stmt := stmts[0] - prepared.Type = stmt.StatementType() - if err = pinfo.ProcessPlaceholderAnnotations(stmt); err != nil { + prepared.Statement = stmt + if err := pinfo.ProcessPlaceholderAnnotations(stmt); err != nil { return nil, err } protoTS, err := isAsOf(session, stmt, e.cfg.Clock.Now()) @@ -510,6 +503,29 @@ func (e *Executor) ExecuteStatements( return e.execRequest(session, stmts, pinfo, copyMsgNone) } +// ExecuteStatementsParsed executes the given statement and returns a response. +func (e *Executor) ExecuteStatementParsed( + session *Session, stmt *PreparedStatement, pinfo *parser.PlaceholderInfo, +) StatementResults { + session.resetForBatch(e) + session.phaseTimes[sessionStartBatch] = timeutil.Now() + + defer func() { + if r := recover(); r != nil { + // On a panic, prepend the executed SQL. + panic(fmt.Errorf("%s: %s", stmt.Statement.String(), r)) + } + }() + + var stmts parser.StatementList + if stmt.Statement != nil { + stmts = parser.StatementList{stmt.Statement} + } + // Send the Request for SQL execution and set the application-level error + // for each result in the reply. + return e.execParsed(session, stmts, pinfo, copyMsgNone) +} + // CopyData adds data to the COPY buffer and executes if there are enough rows. func (e *Executor) CopyData(session *Session, data string) StatementResults { return e.execRequest(session, data, nil, copyMsgData) @@ -577,9 +593,7 @@ func (e *Executor) waitForConfigUpdate() { func (e *Executor) execRequest( session *Session, sql string, pinfo *parser.PlaceholderInfo, copymsg copyMsg, ) StatementResults { - var res StatementResults var stmts parser.StatementList - var avoidCachedDescriptors bool var err error txnState := &session.TxnState @@ -608,9 +622,20 @@ func (e *Executor) execRequest( // Rollback the txn. txnState.updateStateAndCleanupOnErr(err, e) } + var res StatementResults res.ResultList = append(res.ResultList, Result{Err: err}) return res } + return e.execParsed(session, stmts, pinfo, copymsg) +} + +func (e *Executor) execParsed( + session *Session, stmts parser.StatementList, pinfo *parser.PlaceholderInfo, copymsg copyMsg, +) StatementResults { + var res StatementResults + var avoidCachedDescriptors bool + txnState := &session.TxnState + if len(stmts) == 0 { res.Empty = true return res @@ -643,6 +668,7 @@ func (e *Executor) execRequest( stmtsToExec = stmtsToExec[:1] // Check for AS OF SYSTEM TIME. If it is present but not detected here, // it will raise an error later on. + var err error protoTS, err = isAsOf(session, stmtsToExec[0], e.cfg.Clock.Now()) if err != nil { res.ResultList = append(res.ResultList, Result{Err: err}) @@ -1247,7 +1273,7 @@ func (e *Executor) execStmtInOpenTxn( for i, t := range s.Types { typeHints[strconv.Itoa(i+1)] = parser.CastTargetToDatumType(t) } - _, err := session.PreparedStatements.New(e, name, s.Statement.String(), typeHints) + _, err := session.PreparedStatements.New(e, name, parser.StatementList{s.Statement}, typeHints) return Result{}, err case *parser.Execute: name := s.Name.String() @@ -1274,7 +1300,7 @@ func (e *Executor) execStmtInOpenTxn( } qArgs[idx] = typedExpr } - results := e.ExecuteStatements(session, prepared.Query, &parser.PlaceholderInfo{Values: qArgs, Types: prepared.SQLTypes}) + results := e.ExecuteStatementParsed(session, prepared, &parser.PlaceholderInfo{Values: qArgs, Types: prepared.SQLTypes}) if results.Empty { return Result{}, nil } diff --git a/pkg/sql/pgwire/v3.go b/pkg/sql/pgwire/v3.go index b2f7e3880ec7..d9c4e82eda02 100644 --- a/pkg/sql/pgwire/v3.go +++ b/pkg/sql/pgwire/v3.go @@ -548,7 +548,7 @@ func (c *v3Conn) handleParse(buf *readBuffer) error { sqlTypeHints[strconv.Itoa(i+1)] = v } // Create the new PreparedStatement in the connection's Session. - stmt, err := c.session.PreparedStatements.New(c.executor, name, query, sqlTypeHints) + stmt, err := c.session.PreparedStatements.NewFromString(c.executor, name, query, sqlTypeHints) if err != nil { return c.sendError(err) } @@ -592,8 +592,8 @@ func (c *v3Conn) handleParse(buf *readBuffer) error { // canSendNoData returns true if describing a result of the input statement // type should return NoData. -func canSendNoData(statementType parser.StatementType) bool { - return statementType != parser.Rows +func canSendNoData(stmt parser.Statement) bool { + return stmt == nil || stmt.StatementType() != parser.Rows } func (c *v3Conn) handleDescribe(ctx context.Context, buf *readBuffer) error { @@ -622,7 +622,7 @@ func (c *v3Conn) handleDescribe(ctx context.Context, buf *readBuffer) error { return err } - return c.sendRowDescription(ctx, stmt.Columns, nil, canSendNoData(stmt.Type)) + return c.sendRowDescription(ctx, stmt.Columns, nil, canSendNoData(stmt.Statement)) case preparePortal: portal, ok := c.session.PreparedPortals.Get(name) if !ok { @@ -630,7 +630,7 @@ func (c *v3Conn) handleDescribe(ctx context.Context, buf *readBuffer) error { } portalMeta := portal.ProtocolMeta.(preparedPortalMeta) - return c.sendRowDescription(ctx, portal.Stmt.Columns, portalMeta.outFormats, canSendNoData(portal.Stmt.Type)) + return c.sendRowDescription(ctx, portal.Stmt.Columns, portalMeta.outFormats, canSendNoData(portal.Stmt.Statement)) default: return errors.Errorf("unknown describe type: %s", typ) } @@ -790,7 +790,7 @@ func (c *v3Conn) handleBind(ctx context.Context, buf *readBuffer) error { } if log.V(2) { - log.Infof(ctx, "portal: %q for %q, args %q, formats %q", portalName, stmt.Query, qargs, columnFormatCodes) + log.Infof(ctx, "portal: %q for %q, args %q, formats %q", portalName, stmt.Statement, qargs, columnFormatCodes) } // Attach pgwire-specific metadata to the PreparedPortal. @@ -820,7 +820,21 @@ func (c *v3Conn) handleExecute(buf *readBuffer) error { Values: portal.Qargs, } - return c.executeStatements(stmt.Query, &pinfo, portalMeta.outFormats, false, int(limit)) + return c.executeStatementParsed(stmt, &pinfo, portalMeta.outFormats, false, int(limit)) +} + +func (c *v3Conn) executeStatementParsed( + stmt *sql.PreparedStatement, + pinfo *parser.PlaceholderInfo, + formatCodes []formatCode, + sendDescription bool, + limit int, +) error { + tracing.AnnotateTrace() + // Note: sql.Executor gets its Context from c.session.context, which + // has been bound by v3Conn.setupSession(). + results := c.executor.ExecuteStatementParsed(c.session, stmt, pinfo) + return c.finishExecute(results, formatCodes, sendDescription, limit) } func (c *v3Conn) executeStatements( @@ -834,6 +848,12 @@ func (c *v3Conn) executeStatements( // Note: sql.Executor gets its Context from c.session.context, which // has been bound by v3Conn.setupSession(). results := c.executor.ExecuteStatements(c.session, stmts, pinfo) + return c.finishExecute(results, formatCodes, sendDescription, limit) +} + +func (c *v3Conn) finishExecute( + results sql.StatementResults, formatCodes []formatCode, sendDescription bool, limit int, +) error { // Delay evaluation of c.session.Ctx(). defer func() { results.Close(c.session.Ctx()) }() diff --git a/pkg/sql/prepare.go b/pkg/sql/prepare.go index 111feff4630d..ce5a14120cb2 100644 --- a/pkg/sql/prepare.go +++ b/pkg/sql/prepare.go @@ -28,8 +28,9 @@ import ( // PreparedStatement is a SQL statement that has been parsed and the types // of arguments and results have been determined. type PreparedStatement struct { - Query string - Type parser.StatementType + // Statement is the parsed, prepared SQL statement. It may be nil if the + // prepared statement is empty. + Statement parser.Statement SQLTypes parser.PlaceholderTypes Columns sqlbase.ResultColumns portalNames map[string]struct{} @@ -65,24 +66,39 @@ func (ps PreparedStatements) Exists(name string) bool { return ok } +// NewFromString creates a new PreparedStatement with the provided name and +// corresponding query string, using the given PlaceholderTypes hints to assist +// in inferring placeholder types. +// +// ps.session.Ctx() is used as the logging context for the prepare operation. +func (ps PreparedStatements) NewFromString( + e *Executor, name, query string, placeholderHints parser.PlaceholderTypes, +) (*PreparedStatement, error) { + sessionEventf(ps.session, "parsing: %s", query) + + var parser parser.Parser + stmts, err := parser.Parse(query) + if err != nil { + return nil, err + } + return ps.New(e, name, stmts, placeholderHints) +} + // New creates a new PreparedStatement with the provided name and corresponding -// query string, using the given PlaceholderTypes hints to assist in inferring -// placeholder types. +// query statements, using the given PlaceholderTypes hints to assist in +// inferring placeholder types. // // ps.session.Ctx() is used as the logging context for the prepare operation. func (ps PreparedStatements) New( - e *Executor, name, query string, placeholderHints parser.PlaceholderTypes, + e *Executor, name string, stmts parser.StatementList, placeholderHints parser.PlaceholderTypes, ) (*PreparedStatement, error) { // Prepare the query. This completes the typing of placeholders. - stmt, err := e.Prepare(query, ps.session, placeholderHints) + stmt, err := e.Prepare(stmts, ps.session, placeholderHints) if err != nil { return nil, err } - // For now we are just counting the size of the query string and - // statement name. When we start storing the prepared query plan - // during prepare, this should be tallied up to the monitor as well. - sz := int64(uintptr(len(query)+len(name)) + unsafe.Sizeof(*stmt)) + sz := int64(uintptr(len(name)) + unsafe.Sizeof(*stmt)) if err := stmt.memAcc.Wsession(ps.session).OpenAndInit(ps.session.Ctx(), sz); err != nil { return nil, err }