Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
71685: sql: implement WITH RECURSIVE with UNION r=RaduBerinde a=RaduBerinde

#### sql: implement WITH RECURSIVE with UNION

This change implements the UNION variant of WITH RECURSIVE, where rows
are deduplicated. We achieve this by storing all rows in a
deduplicating container and inserting in that container first,
detecting if the row is a duplicate.

Fixes #46642.

Release note (sql change): The WITH RECURSIVE variant that uses UNION
(as opposed to UNION ALL) is now supported.

#### sql: capitalize rowContainerHelper API

This change capitalizes the API of `rowContainerHelper` and
`rowContainerIterator` to make it more clear what functions are not
internal to the structure.

Release note: None

71774: roachtest: improve `rapid-restart` by using `c.Start` r=erikgrinaker a=cameronnunez

Fixes #70559.

This test manually started CRDB with `c.Run`. This patch uses `c.Start` instead of `c.Run`, avoiding
incorrect test failures from unexpected SSH exit codes resulting from using `c.Run`.

Release note: None

71823: vendor: bump Pebble to 7fec828fc1af r=bananabrick a=bananabrick

7fec828f db,sstable: block property collectors and filters
f2339cc7 compaction: read compaction fixes
e95e7374 pebble: add additional mechanism to schedule read compactions
cdbcfe9f db: Expose logic to calculate the table cache size
0ee4290c db: schedule sstable validation on ingestion
543bc6fa db: use locked suffix for function names; simplify boolean expression
5393ca16 internal/rate: skip two flaky tests on darwin
7cd88488 sstable: print additional fields in verbose block layout
4f6402b1 *: enable staticcheck

Release note:

Co-authored-by: Radu Berinde <radu@cockroachlabs.com>
Co-authored-by: Cameron Nunez <cameron@cockroachlabs.com>
Co-authored-by: Arjun Nair <nair@cockroachlabs.com>
  • Loading branch information
4 people committed Oct 21, 2021
4 parents c95c999 + ac476c3 + 68a5295 + 596f788 commit 7265486
Show file tree
Hide file tree
Showing 25 changed files with 398 additions and 186 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -887,8 +887,8 @@ def go_deps():
name = "com_github_cockroachdb_pebble",
build_file_proto_mode = "disable_global",
importpath = "github.com/cockroachdb/pebble",
sum = "h1:d+OlrpxjR43JYIIdHzADu7CbsfBxsufssvpFiRUdAxg=",
version = "v0.0.0-20211011160653-59007c613a66",
sum = "h1:NY+UDVTyU+Y2wKr0ocnBSbXGYTHEGwnQ9ukP+qg7xfY=",
version = "v0.0.0-20211019184201-7fec828fc1af",
)

go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55
github.com/cockroachdb/gostdlib v1.13.0
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f
github.com/cockroachdb/pebble v0.0.0-20211011160653-59007c613a66
github.com/cockroachdb/pebble v0.0.0-20211019184201-7fec828fc1af
github.com/cockroachdb/redact v1.1.3
github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ github.com/cockroachdb/gostdlib v1.13.0 h1:TzSEPYgkKDNei3gbLc0rrHu4iHyBp7/+NxPOF
github.com/cockroachdb/gostdlib v1.13.0/go.mod h1:eXX95p9QDrYwJfJ6AgeN9QnRa/lqqid9LAzWz/l5OgA=
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f h1:o/kfcElHqOiXqcou5a3rIlMc7oJbMQkeLk0VQJ7zgqY=
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI=
github.com/cockroachdb/pebble v0.0.0-20211011160653-59007c613a66 h1:d+OlrpxjR43JYIIdHzADu7CbsfBxsufssvpFiRUdAxg=
github.com/cockroachdb/pebble v0.0.0-20211011160653-59007c613a66/go.mod h1:buxOO9GBtOcq1DiXDpIPYrmxY020K2A8lOrwno5FetU=
github.com/cockroachdb/pebble v0.0.0-20211019184201-7fec828fc1af h1:NY+UDVTyU+Y2wKr0ocnBSbXGYTHEGwnQ9ukP+qg7xfY=
github.com/cockroachdb/pebble v0.0.0-20211019184201-7fec828fc1af/go.mod h1:buxOO9GBtOcq1DiXDpIPYrmxY020K2A8lOrwno5FetU=
github.com/cockroachdb/redact v1.0.8/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/redact v1.1.0/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/redact v1.1.3 h1:AKZds10rFSIj7qADf0g46UixK8NNLwWTNdCIGS5wfSQ=
Expand Down
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ go_library(
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/search",
"//pkg/util/sysutil",
"//pkg/util/timeutil",
"//pkg/util/version",
"//pkg/workload/histogram",
Expand Down
55 changes: 12 additions & 43 deletions pkg/cmd/roachtest/tests/rapid_restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,19 @@ import (
"context"
"math/rand"
"net/http"
"os/exec"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

func runRapidRestart(ctx context.Context, t test.Test, c cluster.Cluster) {
// Use a single-node cluster which speeds the stop/start cycle.
nodes := c.Node(1)
c.Put(ctx, t.Cockroach(), "./cockroach", nodes)
node := c.Node(1)
c.Put(ctx, t.Cockroach(), "./cockroach", node)

// In a loop, bootstrap a new single-node cluster and immediately kill
// it. This is more effective at finding problems than restarting an existing
Expand All @@ -41,21 +38,16 @@ func runRapidRestart(ctx context.Context, t test.Test, c cluster.Cluster) {
return timeutil.Now().After(deadline)
}
for j := 1; !done(); j++ {
c.Wipe(ctx, nodes)
c.Wipe(ctx, node)

// The first 2 iterations we start the cockroach node and kill it right
// away. The 3rd iteration we let cockroach run so that we can check after
// the loop that everything is ok.
for i := 0; i < 3; i++ {
exitCh := make(chan error, 1)
go func() {
err := c.RunE(ctx, nodes,
`mkdir -p {log-dir} && ./cockroach start-single-node --insecure --store={store-dir} `+
`--log-dir={log-dir} --cache=10% --max-sql-memory=10% `+
`--listen-addr=:{pgport:1} --http-addr=$[{pgport:1}+1] `+
`> {log-dir}/cockroach.stdout 2> {log-dir}/cockroach.stderr`)
exitCh <- err
}()
if err := c.StartE(ctx, node, option.StartArgs("--skip-init")); err != nil {
t.Fatalf("error during start: %v", err)
}

if i == 2 {
break
}
Expand All @@ -65,31 +57,8 @@ func runRapidRestart(ctx context.Context, t test.Test, c cluster.Cluster) {

sig := [2]string{"2", "9"}[rand.Intn(2)]

var err error
for err == nil {
c.Stop(ctx, nodes, option.StopArgs("--sig="+sig))
select {
case <-ctx.Done():
return
case err = <-exitCh:
case <-time.After(10 * time.Second):
// We likely ended up killing before the process spawned.
// Loop around.
t.L().Printf("no exit status yet, killing again")
}
}
if exitErr := (*exec.ExitError)(nil); errors.As(err, &exitErr) {
switch status := sysutil.ExitStatus(exitErr); status {
case -1:
// Received SIGINT before setting up our own signal handlers or
// SIGKILL.
case 20:
// Exit code from a SIGINT received by our signal handlers.
default:
t.Fatalf("unexpected exit status %d", status)
}
} else {
t.Fatalf("unexpected exit err: %v", err)
if err := c.StopE(ctx, node, option.StopArgs("--sig="+sig)); err != nil {
t.Fatalf("error during stop: %v", err)
}
}

Expand All @@ -100,7 +69,7 @@ func runRapidRestart(ctx context.Context, t test.Test, c cluster.Cluster) {
// Verify the cluster is ok by torturing the prometheus endpoint until it
// returns success. A side-effect is to prevent regression of #19559.
for !done() {
adminUIAddrs, err := c.ExternalAdminUIAddr(ctx, nodes)
adminUIAddrs, err := c.ExternalAdminUIAddr(ctx, node)
if err != nil {
t.Fatal(err)
}
Expand All @@ -122,6 +91,6 @@ func runRapidRestart(ctx context.Context, t test.Test, c cluster.Cluster) {
// Clean up for the test harness. Usually we want to leave nodes running so
// that consistency checks can be run, but in this case there's not much
// there in the first place anyway.
c.Stop(ctx, nodes)
c.Wipe(ctx, nodes)
c.Stop(ctx, node)
c.Wipe(ctx, node)
}
30 changes: 14 additions & 16 deletions pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (a *applyJoinNode) startExec(params runParams) error {
}
}
a.run.out = make(tree.Datums, len(a.columns))
a.run.rightRows.init(a.rightTypes, params.extendedEvalCtx, "apply-join" /* opName */)
a.run.rightRows.Init(a.rightTypes, params.extendedEvalCtx, "apply-join" /* opName */)
return nil
}

Expand All @@ -123,7 +123,7 @@ func (a *applyJoinNode) Next(params runParams) (bool, error) {
for {
// Note that if a.rightTypes has zero length, non-nil rrow is
// returned the correct number of times.
rrow, err := a.run.rightRowsIterator.next()
rrow, err := a.run.rightRowsIterator.Next()
if err != nil {
return false, err
}
Expand Down Expand Up @@ -229,10 +229,10 @@ func (a *applyJoinNode) Next(params runParams) (bool, error) {
// clearRightRows clears rightRows and resets rightRowsIterator. This function
// must be called before reusing rightRows and rightRowIterator.
func (a *applyJoinNode) clearRightRows(params runParams) error {
if err := a.run.rightRows.clear(params.ctx); err != nil {
if err := a.run.rightRows.Clear(params.ctx); err != nil {
return err
}
a.run.rightRowsIterator.close()
a.run.rightRowsIterator.Close()
a.run.rightRowsIterator = nil
return nil
}
Expand All @@ -243,21 +243,19 @@ func (a *applyJoinNode) clearRightRows(params runParams) error {
// wrong during execution of the right hand side of the join, and that we should
// completely give up on the outer join.
func (a *applyJoinNode) runRightSidePlan(params runParams, plan *planComponents) error {
if err := runPlanInsidePlan(params, plan, &a.run.rightRows); err != nil {
rowResultWriter := NewRowResultWriter(&a.run.rightRows)
if err := runPlanInsidePlan(params, plan, rowResultWriter); err != nil {
return err
}
a.run.rightRowsIterator = newRowContainerIterator(params.ctx, a.run.rightRows, a.rightTypes)
return nil
}

// runPlanInsidePlan is used to run a plan and gather the results in a row
// container, as part of the execution of an "outer" plan.
func runPlanInsidePlan(
params runParams, plan *planComponents, rowContainer *rowContainerHelper,
) error {
rowResultWriter := NewRowResultWriter(rowContainer)
// runPlanInsidePlan is used to run a plan and gather the results in the
// resultWriter, as part of the execution of an "outer" plan.
func runPlanInsidePlan(params runParams, plan *planComponents, resultWriter rowResultWriter) error {
recv := MakeDistSQLReceiver(
params.ctx, rowResultWriter, tree.Rows,
params.ctx, resultWriter, tree.Rows,
params.ExecCfg().RangeDescriptorCache,
params.p.Txn(),
params.ExecCfg().Clock,
Expand Down Expand Up @@ -298,7 +296,7 @@ func runPlanInsidePlan(
recv,
&subqueryResultMemAcc,
) {
return rowResultWriter.Err()
return resultWriter.Err()
}
}

Expand All @@ -318,7 +316,7 @@ func runPlanInsidePlan(
params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.PlanAndRun(
params.ctx, evalCtx, planCtx, params.p.Txn(), plan.main, recv,
)()
return rowResultWriter.Err()
return resultWriter.Err()
}

func (a *applyJoinNode) Values() tree.Datums {
Expand All @@ -327,9 +325,9 @@ func (a *applyJoinNode) Values() tree.Datums {

func (a *applyJoinNode) Close(ctx context.Context) {
a.input.plan.Close(ctx)
a.run.rightRows.close(ctx)
a.run.rightRows.Close(ctx)
if a.run.rightRowsIterator != nil {
a.run.rightRowsIterator.close()
a.run.rightRowsIterator.Close()
a.run.rightRowsIterator = nil
}
}
10 changes: 5 additions & 5 deletions pkg/sql/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type bufferNode struct {

func (n *bufferNode) startExec(params runParams) error {
n.typs = planTypes(n.plan)
n.rows.init(n.typs, params.extendedEvalCtx, n.label)
n.rows.Init(n.typs, params.extendedEvalCtx, n.label)
return nil
}

Expand All @@ -50,7 +50,7 @@ func (n *bufferNode) Next(params runParams) (bool, error) {
return false, nil
}
n.currentRow = n.plan.Values()
if err = n.rows.addRow(params.ctx, n.currentRow); err != nil {
if err = n.rows.AddRow(params.ctx, n.currentRow); err != nil {
return false, err
}
return true, nil
Expand All @@ -62,7 +62,7 @@ func (n *bufferNode) Values() tree.Datums {

func (n *bufferNode) Close(ctx context.Context) {
n.plan.Close(ctx)
n.rows.close(ctx)
n.rows.Close(ctx)
}

// scanBufferNode behaves like an iterator into the bufferNode it is
Expand All @@ -85,7 +85,7 @@ func (n *scanBufferNode) startExec(params runParams) error {

func (n *scanBufferNode) Next(runParams) (bool, error) {
var err error
n.currentRow, err = n.iterator.next()
n.currentRow, err = n.iterator.Next()
if n.currentRow == nil || err != nil {
return false, err
}
Expand All @@ -98,7 +98,7 @@ func (n *scanBufferNode) Values() tree.Datums {

func (n *scanBufferNode) Close(context.Context) {
if n.iterator != nil {
n.iterator.close()
n.iterator.Close()
n.iterator = nil
}
}
Loading

0 comments on commit 7265486

Please sign in to comment.