Skip to content

Commit

Permalink
refactor resultForDefaultIter (#2674)
Browse files Browse the repository at this point in the history
  • Loading branch information
jycor authored Sep 27, 2024
1 parent 77ed13c commit 786fe98
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 25 deletions.
34 changes: 16 additions & 18 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,19 +552,23 @@ func (h *Handler) resultForDefaultIter(

eg, ctx := ctx.NewErrgroup()

var rowChan chan sql.Row

rowChan = make(chan sql.Row, 512)

pan2err := func() {
if recoveredPanic := recover(); recoveredPanic != nil {
returnErr = fmt.Errorf("handler caught panic: %v", recoveredPanic)
}
}

pollCtx, cancelF := ctx.NewSubContext()
eg.Go(func() error {
defer pan2err()
return h.pollForClosedConnection(pollCtx, c)
})

wg := sync.WaitGroup{}
wg.Add(2)

// Read rows off the row iterator and send them to the row channel.
var rowChan = make(chan sql.Row, 512)
eg.Go(func() error {
defer pan2err()
defer wg.Done()
Expand All @@ -590,12 +594,6 @@ func (h *Handler) resultForDefaultIter(
}
})

pollCtx, cancelF := ctx.NewSubContext()
eg.Go(func() error {
defer pan2err()
return h.pollForClosedConnection(pollCtx, c)
})

// Default waitTime is one minute if there is no timeout configured, in which case
// it will loop to iterate again unless the socket died by the OS timeout or other problems.
// If there is a timeout, it will be enforced to ensure that Vitess has a chance to
Expand All @@ -607,7 +605,7 @@ func (h *Handler) resultForDefaultIter(
timer := time.NewTimer(waitTime)
defer timer.Stop()

// reads rows from the channel, converts them to wire format,
// Reads rows from the channel, converts them to wire format,
// and calls |callback| to give them to vitess.
eg.Go(func() error {
defer pan2err()
Expand Down Expand Up @@ -679,7 +677,6 @@ func (h *Handler) resultForDefaultIter(
}
returnErr = err
}

return
}

Expand Down Expand Up @@ -906,18 +903,19 @@ func updateMaxUsedConnectionsStatusVariable() {

func rowToSQL(ctx *sql.Context, s sql.Schema, row sql.Row) ([]sqltypes.Value, error) {
o := make([]sqltypes.Value, len(row))
// need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock)
if len(s) == 0 {
return o, nil
}
var err error
for i, v := range row {
if v == nil {
o[i] = sqltypes.NULL
continue
}
// need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock)
if len(s) > 0 {
o[i], err = s[i].Type.SQL(ctx, nil, v)
if err != nil {
return nil, err
}
o[i], err = s[i].Type.SQL(ctx, nil, v)
if err != nil {
return nil, err
}
}

Expand Down
13 changes: 6 additions & 7 deletions sql/rowexec/rel_iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,25 +148,24 @@ func ProjectRow(
projections []sql.Expression,
row sql.Row,
) (sql.Row, error) {
var secondPass []int
var fields sql.Row
var fields = make(sql.Row, len(projections))
var secondPass = make([]int, 0, len(projections))
for i, expr := range projections {
// Default values that are expressions may reference other fields, thus they must evaluate after all other exprs.
// Also default expressions may not refer to other columns that come after them if they also have a default expr.
// This ensures that all columns referenced by expressions will have already been evaluated.
// Since literals do not reference other columns, they're evaluated on the first pass.
defaultVal, isDefaultVal := defaultValFromProjectExpr(expr)
if isDefaultVal && !defaultVal.IsLiteral() {
fields = append(fields, nil)
secondPass = append(secondPass, i)
continue
}
f, fErr := expr.Eval(ctx, row)
field, fErr := expr.Eval(ctx, row)
if fErr != nil {
return nil, fErr
}
f = normalizeNegativeZeros(f)
fields = append(fields, f)
field = normalizeNegativeZeros(field)
fields[i] = field
}
for _, index := range secondPass {
field, err := projections[index].Eval(ctx, fields)
Expand All @@ -176,7 +175,7 @@ func ProjectRow(
field = normalizeNegativeZeros(field)
fields[index] = field
}
return sql.NewRow(fields...), nil
return fields, nil
}

func defaultValFromProjectExpr(e sql.Expression) (*sql.ColumnDefaultValue, bool) {
Expand Down

0 comments on commit 786fe98

Please sign in to comment.