Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proj optimization prototype #2675

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func resultForMax1RowIter(ctx *sql.Context, schema sql.Schema, iter sql.RowIter,
return nil, err
}

outputRow, err := rowToSQL(ctx, schema, row)
outputRow, err := rowToSQL(ctx, schema, row, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -567,6 +567,15 @@ func (h *Handler) resultForDefaultIter(
wg := sync.WaitGroup{}
wg.Add(2)

var projections []sql.Expression
if trackedIter, ok := iter.(*plan.TrackedRowIter); ok {
if commitNode, ok := trackedIter.Node.(*plan.TransactionCommittingNode); ok {
if proj, ok := commitNode.Child().(*plan.Project); ok {
projections = proj.Projections
}
}
}

// Read rows off the row iterator and send them to the row channel.
var rowChan = make(chan sql.Row, 512)
eg.Go(func() error {
Expand Down Expand Up @@ -639,7 +648,7 @@ func (h *Handler) resultForDefaultIter(
continue
}

outputRow, err := rowToSQL(ctx, schema, row)
outputRow, err := rowToSQL(ctx, schema, row, projections)
if err != nil {
return err
}
Expand Down Expand Up @@ -901,21 +910,34 @@ func updateMaxUsedConnectionsStatusVariable() {
}()
}

func rowToSQL(ctx *sql.Context, s sql.Schema, row sql.Row) ([]sqltypes.Value, error) {
o := make([]sqltypes.Value, len(row))
func rowToSQL(ctx *sql.Context, s sql.Schema, row sql.Row, projections []sql.Expression) ([]sqltypes.Value, error) {
o := make([]sqltypes.Value, max(len(row), len(projections))) // TODO: maybe should be length of schema?
// need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock)
if len(s) == 0 {
return o, nil
}
var err error
for i, v := range row {
if v == nil {
o[i] = sqltypes.NULL
continue
if len(projections) > 0 {
for i, proj := range projections {
field, err := proj.Eval(ctx, row)
if err != nil {
return nil, err
}
o[i], err = s[i].Type.SQL(ctx, nil, field)
if err != nil {
return nil, err
}
}
o[i], err = s[i].Type.SQL(ctx, nil, v)
if err != nil {
return nil, err
} else {
var err error
for i, v := range row {
if v == nil {
o[i] = sqltypes.NULL
continue
}
o[i], err = s[i].Type.SQL(ctx, nil, v)
if err != nil {
return nil, err
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func NewServer(cfg Config, e *sqle.Engine, sb SessionBuilder, listener ServerEve
} else {
tracer = sql.NoopTracer
}

sm := NewSessionManager(sb, tracer, e.Analyzer.Catalog.Database, e.MemoryManager, e.ProcessList, cfg.Address)
e.Analyzer.ServerMode = true
handler := &Handler{
e: e,
sm: sm,
Expand Down Expand Up @@ -98,6 +98,7 @@ func NewServerWithHandler(
}

sm := NewSessionManager(sb, tracer, e.Analyzer.Catalog.Database, e.MemoryManager, e.ProcessList, cfg.Address)
e.Analyzer.ServerMode = true
h := &Handler{
e: e,
sm: sm,
Expand Down
2 changes: 2 additions & 0 deletions sql/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ type Analyzer struct {
// EventScheduler is used to communiate with the event scheduler
// for any EVENT related statements. It can be nil if EventScheduler is not defined.
EventScheduler sql.EventScheduler

ServerMode bool
}

// NewDefault creates a default Analyzer instance with all default Rules and configuration.
Expand Down
1 change: 1 addition & 0 deletions sql/analyzer/rule_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ const (
cacheSubqueryResultsId // cacheSubqueryResults
cacheSubqueryAliasesInJoinsId // cacheSubqueryAliasesInJoins
backtickDefaulColumnValueNamesId // backtickDefaulColumnValueNames
deferProjectionsId // deferProjections
AutocommitId // addAutocommitNode
TrackProcessId // trackProcess
parallelizeId // parallelize
Expand Down
13 changes: 7 additions & 6 deletions sql/analyzer/ruleid_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sql/analyzer/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func init() {
{inlineSubqueryAliasRefsId, inlineSubqueryAliasRefs},
{cacheSubqueryAliasesInJoinsId, cacheSubqueryAliasesInJoins},
{backtickDefaulColumnValueNamesId, backtickDefaultColumnValueNames},
{deferProjectionsId, deferProjections},
{AutocommitId, addAutocommitNode},
{TrackProcessId, trackProcess},
{parallelizeId, parallelize},
Expand Down
21 changes: 21 additions & 0 deletions sql/analyzer/warnings.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package analyzer

import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/expression"
"github.com/dolthub/go-mysql-server/sql/plan"
"github.com/dolthub/go-mysql-server/sql/transform"
)
Expand All @@ -40,3 +41,23 @@ func clearWarnings(ctx *sql.Context, a *Analyzer, node sql.Node, scope *plan.Sco
ctx.ClearWarnings()
return node, transform.SameTree, nil
}

// TODO: move this somewhere else
func deferProjections(ctx *sql.Context, a *Analyzer, node sql.Node, scope *plan.Scope, sel RuleSelector, qFlags *sql.QueryFlags) (sql.Node, transform.TreeIdentity, error) {
if !a.ServerMode {
return node, transform.SameTree, nil
}
// Find top-level projection, and mark as deferred
if proj, ok := node.(*plan.Project); ok {
// Default value expressions require a second pass, so punt on deferring for now
for _, expr := range proj.Projections {
switch expr.(type) {
case *expression.Wrapper, *sql.ColumnDefaultValue:
return node, transform.SameTree, nil
}
}
proj.Deferred = true
return proj, transform.NewTree, nil
}
return node, transform.SameTree, nil
}
4 changes: 2 additions & 2 deletions sql/plan/alter_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func NewCheckDefinition(ctx *sql.Context, check *sql.CheckConstraint) (*sql.Chec
}, nil
}

// DropConstraint is a temporary node to handle dropping a named constraint on a table. The type of the constraint is
// DropConstraint is a temporary Node to handle dropping a named constraint on a table. The type of the constraint is
// not known, and is determined during analysis.
type DropConstraint struct {
UnaryNode
Expand Down Expand Up @@ -236,7 +236,7 @@ func (d *DropConstraint) CollationCoercibility(ctx *sql.Context) (collation sql.
return sql.Collation_binary, 7
}

// NewDropConstraint returns a new DropConstraint node
// NewDropConstraint returns a new DropConstraint Node
func NewDropConstraint(table *UnresolvedTable, name string) *DropConstraint {
return &DropConstraint{
UnaryNode: UnaryNode{table},
Expand Down
4 changes: 2 additions & 2 deletions sql/plan/alter_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var _ sql.Node = (*AlterDefaultDrop)(nil)
var _ sql.SchemaTarget = (*AlterDefaultDrop)(nil)
var _ sql.CollationCoercible = (*AlterDefaultDrop)(nil)

// NewAlterDefaultSet returns a *AlterDefaultSet node.
// NewAlterDefaultSet returns a *AlterDefaultSet Node.
func NewAlterDefaultSet(database sql.Database, table sql.Node, columnName string, defVal *sql.ColumnDefaultValue) *AlterDefaultSet {
return &AlterDefaultSet{
ddlNode: ddlNode{Db: database},
Expand Down Expand Up @@ -162,7 +162,7 @@ func (d *AlterDefaultSet) WithDefault(expr sql.Expression) (sql.Node, error) {
return &nd, nil
}

// NewAlterDefaultDrop returns a *AlterDefaultDrop node.
// NewAlterDefaultDrop returns a *AlterDefaultDrop Node.
func NewAlterDefaultDrop(database sql.Database, table sql.Node, columnName string) *AlterDefaultDrop {
return &AlterDefaultDrop{
ddlNode: ddlNode{Db: database},
Expand Down
2 changes: 1 addition & 1 deletion sql/plan/alter_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type AlterEvent struct {
scheduler sql.EventScheduler
}

// NewAlterEvent returns a *AlterEvent node.
// NewAlterEvent returns a *AlterEvent Node.
func NewAlterEvent(
db sql.Database,
name, definer string,
Expand Down
4 changes: 2 additions & 2 deletions sql/plan/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var _ sql.Node = (*RenameTable)(nil)
var _ sql.Databaser = (*RenameTable)(nil)
var _ sql.CollationCoercible = (*RenameTable)(nil)

// NewRenameTable creates a new RenameTable node
// NewRenameTable creates a new RenameTable Node
func NewRenameTable(db sql.Database, oldNames, newNames []string, alterTbl bool) *RenameTable {
return &RenameTable{
ddlNode: ddlNode{db},
Expand Down Expand Up @@ -322,7 +322,7 @@ func (a AddColumn) WithExpressions(exprs ...sql.Expression) (sql.Node, error) {
return nil, err
}

// *sql.Column is a reference type, make a copy before we modify it so we don't affect the original node
// *sql.Column is a reference type, make a copy before we modify it so we don't affect the original Node
a.column = colSchema[0]
return &a, nil
}
Expand Down
2 changes: 1 addition & 1 deletion sql/plan/begin_end_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type BeginEndBlock struct {
Label string
}

// NewBeginEndBlock creates a new *BeginEndBlock node.
// NewBeginEndBlock creates a new *BeginEndBlock Node.
func NewBeginEndBlock(label string, block *Block) *BeginEndBlock {
return &BeginEndBlock{
Block: block,
Expand Down
10 changes: 5 additions & 5 deletions sql/plan/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ type Block struct {
Pref *expression.ProcedureReference
}

// RepresentsBlock is an interface that defines whether a node contains a Block node, or contains multiple child
// statements similar to a block node. As a rule of thumb, if a parent node depends upon a child node, either explicitly
// RepresentsBlock is an interface that defines whether a Node contains a Block Node, or contains multiple child
// statements similar to a block Node. As a rule of thumb, if a parent Node depends upon a child Node, either explicitly
// or implicitly, then it does not represent a Block.
type RepresentsBlock interface {
sql.Node
implementsRepresentsBlock()
}

// RepresentsLabeledBlock is an interface that defines whether a node represents a Block node, while also carrying a
// RepresentsLabeledBlock is an interface that defines whether a Node represents a Block Node, while also carrying a
// label that may be referenced by statements within the block (such as LEAVE, ITERATE, etc.). Some statements that use
// labels only look for labels on statements that loop (such as LOOP and REPEAT), so there's an additional function
// to check whether this also represents a loop.
Expand All @@ -44,7 +44,7 @@ type RepresentsLabeledBlock interface {
RepresentsLoop() bool
}

// RepresentsScope is an interface that defines whether a node represents a new scope. Scopes define boundaries that
// RepresentsScope is an interface that defines whether a Node represents a new scope. Scopes define boundaries that
// are used for variable resolution and control flow modification (via condition handling, etc.).
type RepresentsScope interface {
RepresentsBlock
Expand All @@ -56,7 +56,7 @@ var _ sql.DebugStringer = (*Block)(nil)
var _ sql.CollationCoercible = (*Block)(nil)
var _ RepresentsBlock = (*Block)(nil)

// NewBlock creates a new *Block node.
// NewBlock creates a new *Block Node.
func NewBlock(statements []sql.Node) *Block {
return &Block{statements: statements}
}
Expand Down
8 changes: 4 additions & 4 deletions sql/plan/cached_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var ErrRowIterDisposed = errors.New("attempted to call RowIter() on a disposed N

// NewCachedResults returns a cached results plan Node, which will use a
// RowCache to cache results generated by Child.RowIter() and return those
// results for future calls to RowIter. This node is only safe to use if the
// results for future calls to RowIter. This Node is only safe to use if the
// Child is deterministic and is not dependent on the |row| parameter in the
// call to RowIter.
func NewCachedResults(n sql.Node) *CachedResults {
Expand All @@ -41,7 +41,7 @@ func NewCachedResults(n sql.Node) *CachedResults {
}
}

// CachedResults tees the child node iterator into an in-memory cache
// CachedResults tees the child Node iterator into an in-memory cache
// for faster subsequent retrieval. This is usually combined with a
// HashLookup, whose RowIter defers to a CachedResult child to populate
// rows in memory on a first iteration. The second RowIter moves the
Expand Down Expand Up @@ -135,7 +135,7 @@ func (i *emptyCacheIter) Next(ctx *sql.Context) (sql.Row, error) { return nil, i
func (i *emptyCacheIter) Close(ctx *sql.Context) error { return nil }

// cachedResultsManager manages the saved results collected by CachedResults nodes. It is necessary to do this outside
// of the CachedResult node instances themselves, since executing a query plan can make transient transforms that are
// of the CachedResult Node instances themselves, since executing a query plan can make transient transforms that are
// not persisted back and can cause cache memory leaks.
type cachedResultsManager struct {
// cachedResultsCaches tracks caches used by CachedResults globally so that even if a CachedResult
Expand All @@ -149,7 +149,7 @@ type cachedResultsManager struct {
mutex sync.Mutex

// cachedResultsUniqueIdCounter stores a counter that should only be incremented atomically and is used
// as a unique ID when a new CachedResults object is created for a node.
// as a unique ID when a new CachedResults object is created for a Node.
cachedResultsUniqueIdCounter uint64
}

Expand Down
2 changes: 1 addition & 1 deletion sql/plan/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var _ sql.CollationCoercible = (*Call)(nil)
var _ sql.Expressioner = (*Call)(nil)
var _ Versionable = (*Call)(nil)

// NewCall returns a *Call node.
// NewCall returns a *Call Node.
func NewCall(db sql.Database, name string, params []sql.Expression, asOf sql.Expression, catalog sql.Catalog) *Call {
return &Call{
db: db,
Expand Down
2 changes: 1 addition & 1 deletion sql/plan/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var _ sql.DebugStringer = (*CaseStatement)(nil)
var _ sql.Expressioner = (*CaseStatement)(nil)
var _ sql.CollationCoercible = (*CaseStatement)(nil)

// NewCaseStatement creates a new *NewCaseStatement or *IfElseBlock node.
// NewCaseStatement creates a new *NewCaseStatement or *IfElseBlock Node.
func NewCaseStatement(caseExpr sql.Expression, ifConditionals []*IfConditional, elseStatement sql.Node) sql.Node {
if elseStatement == nil {
elseStatement = ElseCaseError{}
Expand Down
2 changes: 1 addition & 1 deletion sql/plan/close.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var _ sql.Node = (*Close)(nil)
var _ sql.CollationCoercible = (*Close)(nil)
var _ expression.ProcedureReferencable = (*Close)(nil)

// NewClose returns a new *Close node.
// NewClose returns a new *Close Node.
func NewClose(name string) *Close {
return &Close{
Name: name,
Expand Down
Loading
Loading