Skip to content

Commit

Permalink
sql: prepared statements cache parsed statement
Browse files Browse the repository at this point in the history
Previously, prepared statements contained only the text of their query,
causing them to be re-parsed at execution time.

Now, they contain a parsed query.

This improves throughput of kv -read-percent=100 -concurrency=1 by about
15% on a local single-node cluster.
  • Loading branch information
jordanlewis committed May 3, 2017
1 parent 237969e commit 90d7b21
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 38 deletions.
66 changes: 45 additions & 21 deletions pkg/sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,19 +406,12 @@ func (e *Executor) getDatabaseCache() *databaseCache {
// populate the missing types. The PreparedStatement is returned (or
// nil if there are no results).
func (e *Executor) Prepare(
query string, session *Session, pinfo parser.PlaceholderTypes,
stmts parser.StatementList, session *Session, pinfo parser.PlaceholderTypes,
) (*PreparedStatement, error) {
session.resetForBatch(e)
sessionEventf(session, "preparing: %s", query)

var p parser.Parser
stmts, err := p.Parse(query)
if err != nil {
return nil, err
}
sessionEventf(session, "preparing: %s", stmts)

prepared := &PreparedStatement{
Query: query,
SQLTypes: pinfo,
portalNames: make(map[string]struct{}),
}
Expand All @@ -431,8 +424,8 @@ func (e *Executor) Prepare(
return nil, errWrongNumberOfPreparedStatements(len(stmts))
}
stmt := stmts[0]
prepared.Type = stmt.StatementType()
if err = pinfo.ProcessPlaceholderAnnotations(stmt); err != nil {
prepared.Statement = stmt
if err := pinfo.ProcessPlaceholderAnnotations(stmt); err != nil {
return nil, err
}
protoTS, err := isAsOf(session, stmt, e.cfg.Clock.Now())
Expand Down Expand Up @@ -491,25 +484,46 @@ func (e *Executor) Prepare(
return prepared, nil
}

func logIfPanicking(ctx context.Context, sql string) {
if r := recover(); r != nil {
log.Shout(ctx, log.Severity_ERROR, "a SQL panic has occurred!")
// On a panic, prepend the executed SQL.
panic(log.WrappedPanic{ExtraInfo: sql, Err: r})
}
}

// ExecuteStatements executes the given statement(s) and returns a response.
func (e *Executor) ExecuteStatements(
session *Session, stmts string, pinfo *parser.PlaceholderInfo,
) StatementResults {
session.resetForBatch(e)
session.phaseTimes[sessionStartBatch] = timeutil.Now()

defer func() {
if r := recover(); r != nil {
// On a panic, prepend the executed SQL.
panic(log.WrappedPanic{ExtraInfo: stmts, Err: r})
}
}()
defer logIfPanicking(session.Ctx(), stmts)

// Send the Request for SQL execution and set the application-level error
// for each result in the reply.
return e.execRequest(session, stmts, pinfo, copyMsgNone)
}

// ExecuteStatementsParsed executes the given statement and returns a response.
func (e *Executor) ExecutePreparedStatement(
session *Session, stmt *PreparedStatement, pinfo *parser.PlaceholderInfo,
) StatementResults {
session.resetForBatch(e)
session.phaseTimes[sessionStartBatch] = timeutil.Now()

defer logIfPanicking(session.Ctx(), stmt.Statement.String())

var stmts parser.StatementList
if stmt.Statement != nil {
stmts = parser.StatementList{stmt.Statement}
}
// Send the Request for SQL execution and set the application-level error
// for each result in the reply.
return e.execParsed(session, stmts, pinfo, copyMsgNone)
}

// CopyData adds data to the COPY buffer and executes if there are enough rows.
func (e *Executor) CopyData(session *Session, data string) StatementResults {
return e.execRequest(session, data, nil, copyMsgData)
Expand Down Expand Up @@ -577,9 +591,7 @@ func (e *Executor) waitForConfigUpdate() {
func (e *Executor) execRequest(
session *Session, sql string, pinfo *parser.PlaceholderInfo, copymsg copyMsg,
) StatementResults {
var res StatementResults
var stmts parser.StatementList
var avoidCachedDescriptors bool
var err error
txnState := &session.TxnState

Expand Down Expand Up @@ -608,9 +620,20 @@ func (e *Executor) execRequest(
// Rollback the txn.
txnState.updateStateAndCleanupOnErr(err, e)
}
var res StatementResults
res.ResultList = append(res.ResultList, Result{Err: err})
return res
}
return e.execParsed(session, stmts, pinfo, copymsg)
}

func (e *Executor) execParsed(
session *Session, stmts parser.StatementList, pinfo *parser.PlaceholderInfo, copymsg copyMsg,
) StatementResults {
var res StatementResults
var avoidCachedDescriptors bool
txnState := &session.TxnState

if len(stmts) == 0 {
res.Empty = true
return res
Expand Down Expand Up @@ -643,6 +666,7 @@ func (e *Executor) execRequest(
stmtsToExec = stmtsToExec[:1]
// Check for AS OF SYSTEM TIME. If it is present but not detected here,
// it will raise an error later on.
var err error
protoTS, err = isAsOf(session, stmtsToExec[0], e.cfg.Clock.Now())
if err != nil {
res.ResultList = append(res.ResultList, Result{Err: err})
Expand Down Expand Up @@ -1247,7 +1271,7 @@ func (e *Executor) execStmtInOpenTxn(
for i, t := range s.Types {
typeHints[strconv.Itoa(i+1)] = parser.CastTargetToDatumType(t)
}
_, err := session.PreparedStatements.New(e, name, s.Statement.String(), typeHints)
_, err := session.PreparedStatements.New(e, name, parser.StatementList{s.Statement}, typeHints)
return Result{}, err
case *parser.Execute:
name := s.Name.String()
Expand All @@ -1274,7 +1298,7 @@ func (e *Executor) execStmtInOpenTxn(
}
qArgs[idx] = typedExpr
}
results := e.ExecuteStatements(session, prepared.Query, &parser.PlaceholderInfo{Values: qArgs, Types: prepared.SQLTypes})
results := e.ExecutePreparedStatement(session, prepared, &parser.PlaceholderInfo{Values: qArgs, Types: prepared.SQLTypes})
if results.Empty {
return Result{}, nil
}
Expand Down
34 changes: 27 additions & 7 deletions pkg/sql/pgwire/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func (c *v3Conn) handleParse(buf *readBuffer) error {
sqlTypeHints[strconv.Itoa(i+1)] = v
}
// Create the new PreparedStatement in the connection's Session.
stmt, err := c.session.PreparedStatements.New(c.executor, name, query, sqlTypeHints)
stmt, err := c.session.PreparedStatements.NewFromString(c.executor, name, query, sqlTypeHints)
if err != nil {
return c.sendError(err)
}
Expand Down Expand Up @@ -592,8 +592,8 @@ func (c *v3Conn) handleParse(buf *readBuffer) error {

// canSendNoData returns true if describing a result of the input statement
// type should return NoData.
func canSendNoData(statementType parser.StatementType) bool {
return statementType != parser.Rows
func canSendNoData(stmt parser.Statement) bool {
return stmt == nil || stmt.StatementType() != parser.Rows
}

func (c *v3Conn) handleDescribe(ctx context.Context, buf *readBuffer) error {
Expand Down Expand Up @@ -622,15 +622,15 @@ func (c *v3Conn) handleDescribe(ctx context.Context, buf *readBuffer) error {
return err
}

return c.sendRowDescription(ctx, stmt.Columns, nil, canSendNoData(stmt.Type))
return c.sendRowDescription(ctx, stmt.Columns, nil, canSendNoData(stmt.Statement))
case preparePortal:
portal, ok := c.session.PreparedPortals.Get(name)
if !ok {
return c.sendInternalError(fmt.Sprintf("unknown portal %q", name))
}

portalMeta := portal.ProtocolMeta.(preparedPortalMeta)
return c.sendRowDescription(ctx, portal.Stmt.Columns, portalMeta.outFormats, canSendNoData(portal.Stmt.Type))
return c.sendRowDescription(ctx, portal.Stmt.Columns, portalMeta.outFormats, canSendNoData(portal.Stmt.Statement))
default:
return errors.Errorf("unknown describe type: %s", typ)
}
Expand Down Expand Up @@ -790,7 +790,7 @@ func (c *v3Conn) handleBind(ctx context.Context, buf *readBuffer) error {
}

if log.V(2) {
log.Infof(ctx, "portal: %q for %q, args %q, formats %q", portalName, stmt.Query, qargs, columnFormatCodes)
log.Infof(ctx, "portal: %q for %q, args %q, formats %q", portalName, stmt.Statement, qargs, columnFormatCodes)
}

// Attach pgwire-specific metadata to the PreparedPortal.
Expand Down Expand Up @@ -820,7 +820,21 @@ func (c *v3Conn) handleExecute(buf *readBuffer) error {
Values: portal.Qargs,
}

return c.executeStatements(stmt.Query, &pinfo, portalMeta.outFormats, false, int(limit))
return c.executeStatementParsed(stmt, &pinfo, portalMeta.outFormats, false, int(limit))
}

func (c *v3Conn) executeStatementParsed(
stmt *sql.PreparedStatement,
pinfo *parser.PlaceholderInfo,
formatCodes []formatCode,
sendDescription bool,
limit int,
) error {
tracing.AnnotateTrace()
// Note: sql.Executor gets its Context from c.session.context, which
// has been bound by v3Conn.setupSession().
results := c.executor.ExecutePreparedStatement(c.session, stmt, pinfo)
return c.finishExecute(results, formatCodes, sendDescription, limit)
}

func (c *v3Conn) executeStatements(
Expand All @@ -834,6 +848,12 @@ func (c *v3Conn) executeStatements(
// Note: sql.Executor gets its Context from c.session.context, which
// has been bound by v3Conn.setupSession().
results := c.executor.ExecuteStatements(c.session, stmts, pinfo)
return c.finishExecute(results, formatCodes, sendDescription, limit)
}

func (c *v3Conn) finishExecute(
results sql.StatementResults, formatCodes []formatCode, sendDescription bool, limit int,
) error {
// Delay evaluation of c.session.Ctx().
defer func() { results.Close(c.session.Ctx()) }()

Expand Down
36 changes: 26 additions & 10 deletions pkg/sql/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import (
// PreparedStatement is a SQL statement that has been parsed and the types
// of arguments and results have been determined.
type PreparedStatement struct {
Query string
Type parser.StatementType
// Statement is the parsed, prepared SQL statement. It may be nil if the
// prepared statement is empty.
Statement parser.Statement
SQLTypes parser.PlaceholderTypes
Columns sqlbase.ResultColumns
portalNames map[string]struct{}
Expand Down Expand Up @@ -65,24 +66,39 @@ func (ps PreparedStatements) Exists(name string) bool {
return ok
}

// NewFromString creates a new PreparedStatement with the provided name and
// corresponding query string, using the given PlaceholderTypes hints to assist
// in inferring placeholder types.
//
// ps.session.Ctx() is used as the logging context for the prepare operation.
func (ps PreparedStatements) NewFromString(
e *Executor, name, query string, placeholderHints parser.PlaceholderTypes,
) (*PreparedStatement, error) {
sessionEventf(ps.session, "parsing: %s", query)

var parser parser.Parser
stmts, err := parser.Parse(query)
if err != nil {
return nil, err
}
return ps.New(e, name, stmts, placeholderHints)
}

// New creates a new PreparedStatement with the provided name and corresponding
// query string, using the given PlaceholderTypes hints to assist in inferring
// placeholder types.
// query statements, using the given PlaceholderTypes hints to assist in
// inferring placeholder types.
//
// ps.session.Ctx() is used as the logging context for the prepare operation.
func (ps PreparedStatements) New(
e *Executor, name, query string, placeholderHints parser.PlaceholderTypes,
e *Executor, name string, stmts parser.StatementList, placeholderHints parser.PlaceholderTypes,
) (*PreparedStatement, error) {
// Prepare the query. This completes the typing of placeholders.
stmt, err := e.Prepare(query, ps.session, placeholderHints)
stmt, err := e.Prepare(stmts, ps.session, placeholderHints)
if err != nil {
return nil, err
}

// For now we are just counting the size of the query string and
// statement name. When we start storing the prepared query plan
// during prepare, this should be tallied up to the monitor as well.
sz := int64(uintptr(len(query)+len(name)) + unsafe.Sizeof(*stmt))
sz := int64(uintptr(len(name)) + unsafe.Sizeof(*stmt))
if err := stmt.memAcc.Wsession(ps.session).OpenAndInit(ps.session.Ctx(), sz); err != nil {
return nil, err
}
Expand Down

0 comments on commit 90d7b21

Please sign in to comment.