Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
116488: backupccl: wrap a few errors r=msbutler a=stevendanna

Errors from the underlying parser calls used by rewrite functions can return confusing errors that make it appear as if the user's backup statement was invalid SQL.

Epic: none

Release note: None

117922: roachtest: remove unnecessary cluster setting change r=msbutler a=stevendanna

As of #117116 this is now the default and the plan is to keep it the default in 24.1.

Epic: none

Release note: None

117974: bulk: add tracing spans to addSStable in SSTBatcher r=msbutler a=stevendanna

Epic: None

Release note: None

117983: roachtest: update tpce tests with new connection options r=herkolategan,srosenberg a=DarrylWong

The tpc-e driver has been updated to accept options for a fixture bucket, port, username and password. This change adds those options accordingly to the tpce tests.

Previously, tpce hardcoded the sql port as 26257. Now that tpce can accept a port option, we can instead discover the port and pass it.

This change also includes a new SQLPort function that returns the sql port of the given nodes, as well as fixes miscellaneous function signatures to correctly say nodes instead of node.

Release note: None
Epic: None
Fixes: #117567

Co-authored-by: Steven Danna <danna@cockroachlabs.com>
Co-authored-by: DarrylWong <darryl@cockroachlabs.com>
  • Loading branch information
3 people committed Jan 26, 2024
5 parents 3cf8f36 + 487496f + af5bff7 + 3c62854 + ea0d314 commit 0488eb6
Show file tree
Hide file tree
Showing 12 changed files with 170 additions and 71 deletions.
10 changes: 5 additions & 5 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -2126,19 +2126,19 @@ func doRestorePlan(
overrideDBName = newDBName
}
if err := rewrite.TableDescs(tables, descriptorRewrites, overrideDBName); err != nil {
return err
return errors.Wrapf(err, "table descriptor rewrite failed")
}
if err := rewrite.DatabaseDescs(databases, descriptorRewrites, map[descpb.ID]struct{}{}); err != nil {
return err
return errors.Wrapf(err, "database descriptor rewrite failed")
}
if err := rewrite.SchemaDescs(schemas, descriptorRewrites); err != nil {
return err
return errors.Wrapf(err, "schema descriptor rewrite failed")
}
if err := rewrite.TypeDescs(types, descriptorRewrites); err != nil {
return err
return errors.Wrapf(err, "type descriptor rewrite failed")
}
if err := rewrite.FunctionDescs(functions, descriptorRewrites, overrideDBName); err != nil {
return err
return errors.Wrapf(err, "function descriptor rewrite failed")
}

encodedTables := make([]*descpb.TableDescriptor, len(tables))
Expand Down
86 changes: 54 additions & 32 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,12 +1421,12 @@ func newHealthStatusResult(node int, status int, body []byte, err error) *Health

// HealthStatus returns the result of the /health?ready=1 endpoint for each node.
func (c *clusterImpl) HealthStatus(
ctx context.Context, l *logger.Logger, node option.NodeListOption,
ctx context.Context, l *logger.Logger, nodes option.NodeListOption,
) ([]*HealthStatusResult, error) {
if len(node) < 1 {
if len(nodes) < 1 {
return nil, nil // unit tests
}
adminAddrs, err := c.ExternalAdminUIAddr(ctx, l, node)
adminAddrs, err := c.ExternalAdminUIAddr(ctx, l, nodes)
if err != nil {
return nil, errors.WithDetail(err, "Unable to get admin UI address(es)")
}
Expand Down Expand Up @@ -2466,20 +2466,20 @@ func (c *clusterImpl) loggerForCmd(
return l, logFile, nil
}

// pgURLErr returns the Postgres endpoint for the specified node. It accepts a
// pgURLErr returns the Postgres endpoint for the specified nodes. It accepts a
// flag specifying whether the URL should include the node's internal or
// external IP address. In general, inter-cluster communication and should use
// internal IPs and communication from a test driver to nodes in a cluster
// should use external IPs.
func (c *clusterImpl) pgURLErr(
ctx context.Context,
l *logger.Logger,
node option.NodeListOption,
nodes option.NodeListOption,
external bool,
tenant string,
sqlInstance int,
) ([]string, error) {
urls, err := roachprod.PgURL(ctx, l, c.MakeNodes(node), c.localCertsDir, roachprod.PGURLOptions{
urls, err := roachprod.PgURL(ctx, l, c.MakeNodes(nodes), c.localCertsDir, roachprod.PGURLOptions{
External: external,
Secure: c.localCertsDir != "",
VirtualClusterName: tenant,
Expand All @@ -2496,19 +2496,27 @@ func (c *clusterImpl) pgURLErr(

// InternalPGUrl returns the internal Postgres endpoint for the specified nodes.
func (c *clusterImpl) InternalPGUrl(
ctx context.Context, l *logger.Logger, node option.NodeListOption, tenant string, sqlInstance int,
ctx context.Context,
l *logger.Logger,
nodes option.NodeListOption,
tenant string,
sqlInstance int,
) ([]string, error) {
return c.pgURLErr(ctx, l, node, false, tenant, sqlInstance)
return c.pgURLErr(ctx, l, nodes, false, tenant, sqlInstance)
}

// Silence unused warning.
var _ = (&clusterImpl{}).InternalPGUrl

// ExternalPGUrl returns the external Postgres endpoint for the specified nodes.
func (c *clusterImpl) ExternalPGUrl(
ctx context.Context, l *logger.Logger, node option.NodeListOption, tenant string, sqlInstance int,
ctx context.Context,
l *logger.Logger,
nodes option.NodeListOption,
tenant string,
sqlInstance int,
) ([]string, error) {
return c.pgURLErr(ctx, l, node, true, tenant, sqlInstance)
return c.pgURLErr(ctx, l, nodes, true, tenant, sqlInstance)
}

func addrToAdminUIAddr(addr string) (string, error) {
Expand Down Expand Up @@ -2552,32 +2560,46 @@ func addrToHostPort(addr string) (string, int, error) {
}

// InternalAdminUIAddr returns the internal Admin UI address in the form host:port
// for the specified node.
// for the specified nodes.
func (c *clusterImpl) InternalAdminUIAddr(
ctx context.Context, l *logger.Logger, node option.NodeListOption,
ctx context.Context, l *logger.Logger, nodes option.NodeListOption,
) ([]string, error) {
return c.adminUIAddr(ctx, l, node, false)
return c.adminUIAddr(ctx, l, nodes, false)
}

// ExternalAdminUIAddr returns the external Admin UI address in the form host:port
// for the specified node.
// for the specified nodes.
func (c *clusterImpl) ExternalAdminUIAddr(
ctx context.Context, l *logger.Logger, node option.NodeListOption,
ctx context.Context, l *logger.Logger, nodes option.NodeListOption,
) ([]string, error) {
return c.adminUIAddr(ctx, l, node, true)
return c.adminUIAddr(ctx, l, nodes, true)
}

func (c *clusterImpl) SQLPorts(
ctx context.Context,
l *logger.Logger,
nodes option.NodeListOption,
tenant string,
sqlInstance int,
) ([]int, error) {
return roachprod.SQLPorts(ctx, l, c.MakeNodes(nodes), c.IsSecure(), tenant, sqlInstance)
}

func (c *clusterImpl) AdminUIPorts(
ctx context.Context, l *logger.Logger, nodes option.NodeListOption,
ctx context.Context,
l *logger.Logger,
nodes option.NodeListOption,
tenant string,
sqlInstance int,
) ([]int, error) {
return roachprod.AdminPorts(ctx, l, c.MakeNodes(nodes), c.IsSecure())
return roachprod.AdminPorts(ctx, l, c.MakeNodes(nodes), c.IsSecure(), tenant, sqlInstance)
}

func (c *clusterImpl) adminUIAddr(
ctx context.Context, l *logger.Logger, node option.NodeListOption, external bool,
ctx context.Context, l *logger.Logger, nodes option.NodeListOption, external bool,
) ([]string, error) {
var addrs []string
adminURLs, err := roachprod.AdminURL(ctx, l, c.MakeNodes(node), "", 0, "",
adminURLs, err := roachprod.AdminURL(ctx, l, c.MakeNodes(nodes), "", 0, "",
external, false, false)
if err != nil {
return nil, err
Expand All @@ -2598,32 +2620,32 @@ func (c *clusterImpl) adminUIAddr(

// InternalIP returns the internal IP addresses for the specified nodes.
func (c *clusterImpl) InternalIP(
ctx context.Context, l *logger.Logger, node option.NodeListOption,
ctx context.Context, l *logger.Logger, nodes option.NodeListOption,
) ([]string, error) {
return roachprod.IP(l, c.MakeNodes(node), false)
return roachprod.IP(l, c.MakeNodes(nodes), false)
}

// InternalAddr returns the internal address in the form host:port for the
// specified nodes.
func (c *clusterImpl) InternalAddr(
ctx context.Context, l *logger.Logger, node option.NodeListOption,
ctx context.Context, l *logger.Logger, nodes option.NodeListOption,
) ([]string, error) {
return c.addr(ctx, l, node, false)
return c.addr(ctx, l, nodes, false)
}

// ExternalAddr returns the external address in the form host:port for the
// specified node.
// specified nodes.
func (c *clusterImpl) ExternalAddr(
ctx context.Context, l *logger.Logger, node option.NodeListOption,
ctx context.Context, l *logger.Logger, nodes option.NodeListOption,
) ([]string, error) {
return c.addr(ctx, l, node, true)
return c.addr(ctx, l, nodes, true)
}

func (c *clusterImpl) addr(
ctx context.Context, l *logger.Logger, node option.NodeListOption, external bool,
ctx context.Context, l *logger.Logger, nodes option.NodeListOption, external bool,
) ([]string, error) {
var addrs []string
urls, err := c.pgURLErr(ctx, l, node, external, "" /* tenant */, 0 /* sqlInstance */)
urls, err := c.pgURLErr(ctx, l, nodes, external, "" /* tenant */, 0 /* sqlInstance */)
if err != nil {
return nil, err
}
Expand All @@ -2637,12 +2659,12 @@ func (c *clusterImpl) addr(
return addrs, nil
}

// ExternalIP returns the external IP addresses for the specified node.
// ExternalIP returns the external IP addresses for the specified nodes.
func (c *clusterImpl) ExternalIP(
ctx context.Context, l *logger.Logger, node option.NodeListOption,
ctx context.Context, l *logger.Logger, nodes option.NodeListOption,
) ([]string, error) {
var ips []string
addrs, err := c.ExternalAddr(ctx, l, node)
addrs, err := c.ExternalAddr(ctx, l, nodes)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/cluster/cluster_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Cluster interface {
InternalIP(ctx context.Context, l *logger.Logger, node option.NodeListOption) ([]string, error)
ExternalAddr(ctx context.Context, l *logger.Logger, node option.NodeListOption) ([]string, error)
ExternalIP(ctx context.Context, l *logger.Logger, node option.NodeListOption) ([]string, error)
SQLPorts(ctx context.Context, l *logger.Logger, node option.NodeListOption, tenant string, sqlInstance int) ([]int, error)

// SQL connection strings.

Expand All @@ -90,7 +91,7 @@ type Cluster interface {

InternalAdminUIAddr(ctx context.Context, l *logger.Logger, node option.NodeListOption) ([]string, error)
ExternalAdminUIAddr(ctx context.Context, l *logger.Logger, node option.NodeListOption) ([]string, error)
AdminUIPorts(ctx context.Context, l *logger.Logger, node option.NodeListOption) ([]int, error)
AdminUIPorts(ctx context.Context, l *logger.Logger, node option.NodeListOption, tenant string, sqlInstance int) ([]int, error)

// Running commands on nodes.

Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ SELECT count(replicas)
t.L().Printf("killing all nodes\n")
c.Stop(ctx, t.L(), option.DefaultStopOpts())

adminPorts, err := c.AdminUIPorts(ctx, t.L(), c.Node(1))
adminPorts, err := c.AdminUIPorts(ctx, t.L(), c.Node(1), "" /* tenant */, 0 /* sqlInstance */)
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/cmd/roachtest/tests/online_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,6 @@ func registerOnlineRestore(r registry.Registry) {
if _, err := db.Exec("SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget='1h'"); err != nil {
return err
}
if _, err := db.Exec("SET CLUSTER SETTING kv.snapshot_receiver.excise.enabled=true"); err != nil {
return err
}
// TODO(dt): AC appears periodically reduce the workload to 0 QPS
// during the download phase (sudden jumps from 0 to 2k qps to 0).
// Disable for now until we figure out how to smooth this out.
Expand Down
16 changes: 10 additions & 6 deletions pkg/cmd/roachtest/tests/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,10 @@ func (tpce tpceRestore) init(
) {
spec := tpce.getSpec(ctx, t, c, sp)
spec.init(ctx, t, c, tpceCmdOptions{
customers: tpce.customers,
racks: sp.nodes})
customers: tpce.customers,
racks: sp.nodes,
connectionOpts: defaultTPCEConnectionOpts(),
})
}

func (tpce tpceRestore) run(
Expand All @@ -759,10 +761,12 @@ func (tpce tpceRestore) run(
spec := tpce.getSpec(ctx, t, c, sp)
_, err := spec.run(ctx, t, c, tpceCmdOptions{
// Set the duration to be a week to ensure the workload never exits early.
duration: time.Hour * 7 * 24,
customers: tpce.customers,
racks: sp.nodes,
threads: sp.cpus * sp.nodes})
duration: time.Hour * 7 * 24,
customers: tpce.customers,
racks: sp.nodes,
threads: sp.cpus * sp.nodes,
connectionOpts: defaultTPCEConnectionOpts(),
})
return err
}

Expand Down
50 changes: 41 additions & 9 deletions pkg/cmd/roachtest/tests/tpce.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type tpceSpec struct {
loadNode int
roachNodes option.NodeListOption
roachNodeIPFlags []string
portFlag string
}

type tpceCmdOptions struct {
Expand All @@ -42,6 +43,27 @@ type tpceCmdOptions struct {
duration time.Duration
threads int
skipCleanup bool
connectionOpts tpceConnectionOpts
}

type tpceConnectionOpts struct {
fixtureBucket string
user string
password string
}

const (
defaultFixtureBucket = "gs://cockroach-fixtures-us-east1/tpce-csv"
defaultUser = "root"
defaultPassword = ""
)

func defaultTPCEConnectionOpts() tpceConnectionOpts {
return tpceConnectionOpts{
fixtureBucket: defaultFixtureBucket,
user: defaultUser,
password: defaultPassword,
}
}

func (to tpceCmdOptions) AddCommandOptions(cmd *roachtestutil.Command) {
Expand All @@ -51,6 +73,9 @@ func (to tpceCmdOptions) AddCommandOptions(cmd *roachtestutil.Command) {
cmd.MaybeFlag(to.duration != 0, "duration", to.duration)
cmd.MaybeFlag(to.threads != 0, "threads", to.threads)
cmd.MaybeFlag(to.skipCleanup, "skip-cleanup", "")
cmd.MaybeFlag(to.connectionOpts.fixtureBucket != "", "bucket", to.connectionOpts.fixtureBucket)
cmd.MaybeFlag(to.connectionOpts.user != "", "pg-user", to.connectionOpts.user)
cmd.MaybeFlag(to.connectionOpts.password != "", "pg-password", to.connectionOpts.password)
}

func initTPCESpec(
Expand All @@ -72,10 +97,16 @@ func initTPCESpec(
for i, ip := range roachNodeIPs {
roachNodeIPFlags[i] = fmt.Sprintf("--hosts=%s", ip)
}
ports, err := c.SQLPorts(ctx, l, roachNodes, "" /* tenant */, 0 /* sqlInstance */)
if err != nil {
return nil, err
}
port := fmt.Sprintf("--pg-port=%d", ports[0])
return &tpceSpec{
loadNode: loadNode,
roachNodes: roachNodes,
roachNodeIPFlags: roachNodeIPFlags,
portFlag: port,
}, nil
}

Expand All @@ -89,14 +120,14 @@ func (ts *tpceSpec) newCmd(o tpceCmdOptions) *roachtestutil.Command {
// import of the data and schema creation.
func (ts *tpceSpec) init(ctx context.Context, t test.Test, c cluster.Cluster, o tpceCmdOptions) {
cmd := ts.newCmd(o).Option("init")
c.Run(ctx, option.WithNodes(c.Node(ts.loadNode)), fmt.Sprintf("%s %s", cmd, ts.roachNodeIPFlags[0]))
c.Run(ctx, option.WithNodes(c.Node(ts.loadNode)), fmt.Sprintf("%s %s %s", cmd, ts.portFlag, ts.roachNodeIPFlags[0]))
}

// run runs the tpce workload on cluster that has been initialized with the tpce schema.
func (ts *tpceSpec) run(
ctx context.Context, t test.Test, c cluster.Cluster, o tpceCmdOptions,
) (install.RunResultDetails, error) {
cmd := fmt.Sprintf("%s %s", ts.newCmd(o), strings.Join(ts.roachNodeIPFlags, " "))
cmd := fmt.Sprintf("%s %s %s", ts.newCmd(o), ts.portFlag, strings.Join(ts.roachNodeIPFlags, " "))
return c.RunWithDetailsSingleNode(ctx, t.L(), option.WithNodes(c.Node(ts.loadNode)), cmd)
}

Expand Down Expand Up @@ -147,7 +178,6 @@ func runTPCE(ctx context.Context, t test.Test, c cluster.Cluster, opts tpceOptio
t.Status("installing cockroach")
startOpts := option.DefaultStartOpts()
startOpts.RoachprodOpts.StoreCount = opts.ssds
roachtestutil.SetDefaultSQLPort(c, &startOpts.RoachprodOpts)
settings := install.MakeClusterSettings(install.NumRacksOption(racks))
c.Start(ctx, t.L(), startOpts, settings, crdbNodes)
}
Expand Down Expand Up @@ -186,8 +216,9 @@ func runTPCE(ctx context.Context, t test.Test, c cluster.Cluster, opts tpceOptio
}
t.Status(fmt.Sprintf("initializing %d tpc-e customers%s", opts.customers, estimatedSetupTimeStr))
tpceSpec.init(ctx, t, c, tpceCmdOptions{
customers: opts.customers,
racks: racks,
customers: opts.customers,
racks: racks,
connectionOpts: defaultTPCEConnectionOpts(),
})
return nil
})
Expand All @@ -207,10 +238,11 @@ func runTPCE(ctx context.Context, t test.Test, c cluster.Cluster, opts tpceOptio
workloadDuration = 2 * time.Hour
}
runOptions := tpceCmdOptions{
customers: opts.customers,
racks: racks,
duration: workloadDuration,
threads: opts.nodes * opts.cpus,
customers: opts.customers,
racks: racks,
duration: workloadDuration,
threads: opts.nodes * opts.cpus,
connectionOpts: defaultTPCEConnectionOpts(),
}
if opts.activeCustomers != 0 {
runOptions.activeCustomers = opts.activeCustomers
Expand Down
Loading

0 comments on commit 0488eb6

Please sign in to comment.