Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql/rowexec: check MustBeStreaming on instantiated generators #85802

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,6 @@ func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) {

func TestTenantStreamingDeleteRange(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 85630, "flaky test")
defer log.Scope(t).Close(t)

// TODO(casper): disabled due to error when setting a cluster setting
Expand Down
28 changes: 23 additions & 5 deletions pkg/sql/rowexec/project_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,15 @@ func newProjectSetProcessor(
// MustBeStreaming implements the execinfra.Processor interface.
func (ps *projectSetProcessor) MustBeStreaming() bool {
// If we have a single streaming generator, then the processor is such too.
for _, gen := range ps.gens {
for _, fn := range ps.funcs {
if fn == nil {
continue
}
gen, err := ps.generatorForFuncExpr(fn)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bummer to do this here, because I think that constructing this generator actually does work in some cases.

if err != nil {
continue
}
defer gen.Close(ps.Ctx)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this Close() might assume that Start() has run. Calling Start() here would be an even bigger bummer.

So, I think this isn't how we want to initialize these.

if eval.IsStreamingValueGenerator(gen) {
return true
}
Expand All @@ -140,6 +148,19 @@ func (ps *projectSetProcessor) Start(ctx context.Context) {
ps.cancelChecker.Reset(ctx)
}

func (ps *projectSetProcessor) generatorForFuncExpr(
fn *tree.FuncExpr,
) (eval.ValueGenerator, error) {
gen, err := eval.GetGenerator(ps.EvalCtx, fn)
if err != nil {
return nil, err
}
if gen == nil {
gen = builtins.EmptyGenerator()
}
return gen, nil
}

// nextInputRow returns the next row or metadata from ps.input. It also
// initializes the value generators for that row.
func (ps *projectSetProcessor) nextInputRow() (
Expand All @@ -161,13 +182,10 @@ func (ps *projectSetProcessor) nextInputRow() (
ps.exprHelpers[i].Row = row

ps.EvalCtx.IVarContainer = ps.exprHelpers[i]
gen, err := eval.GetGenerator(ps.EvalCtx, fn)
gen, err := ps.generatorForFuncExpr(fn)
if err != nil {
return nil, nil, err
}
if gen == nil {
gen = builtins.EmptyGenerator()
}
if err := gen.Start(ps.Ctx, ps.FlowCtx.Txn); err != nil {
return nil, nil, err
}
Expand Down