Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
59635: workload/schemachange: "fix" workload r=arulajmani a=ajwerner

This is a variety of commits which fixes bugs in the schemachange workload. It also disables a number of features.

Co-authored-by: Andrew Werner <ajwerner@cockroachlabs.com>
Co-authored-by: Andrew Werner <awerner32@gmail.com>
  • Loading branch information
3 people committed Mar 16, 2021
2 parents df8d0c2 + 4f62dba commit db4f939
Show file tree
Hide file tree
Showing 9 changed files with 487 additions and 94 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ ALL_TESTS = [
"//pkg/workload/cli:cli_test",
"//pkg/workload/faker:faker_test",
"//pkg/workload/movr:movr_test",
"//pkg/workload/schemachange:schemachange_test",
"//pkg/workload/tpcc:tpcc_test",
"//pkg/workload/workloadimpl:workloadimpl_test",
"//pkg/workload/workloadsql:workloadsql_test",
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/catalog/lease/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ func (m *Manager) Publish(
updates := func(_ *kv.Txn, descs map[descpb.ID]catalog.MutableDescriptor) error {
desc, ok := descs[id]
if !ok {
return errors.AssertionFailedf("required descriptor with ID %d not provided to update closure", id)
return errors.AssertionFailedf(
"required descriptor with ID %d not provided to update closure", id)
}
return update(desc)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (p *planner) dropViewImpl(
dependencyDesc, err := p.Descriptors().GetMutableTableVersionByID(ctx, depID, p.txn)
if err != nil {
return cascadeDroppedViews,
errors.Errorf("error resolving dependency relation ID %d: %v", depID, err)
errors.Wrapf(err, "error resolving dependency relation ID %d", depID)
}
// The dependency is also being deleted, so we don't have to remove the
// references.
Expand Down
27 changes: 26 additions & 1 deletion pkg/workload/schemachange/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//build:STRINGER.bzl", "stringer")

go_library(
Expand Down Expand Up @@ -43,3 +43,28 @@ stringer(
src = "operation_generator.go",
typ = "opType",
)

go_test(
name = "schemachange_test",
srcs = [
"main_test.go",
"schema_change_external_test.go",
],
deps = [
"//pkg/base",
"//pkg/ccl",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"//pkg/workload",
"//pkg/workload/histogram",
"@com_github_stretchr_testify//require",
"@org_golang_x_sync//errgroup",
],
)
193 changes: 179 additions & 14 deletions pkg/workload/schemachange/error_screening.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx"
)

Expand Down Expand Up @@ -74,7 +75,7 @@ func tableHasRows(tx *pgx.Tx, tableName *tree.TableName) (bool, error) {

func scanBool(tx *pgx.Tx, query string, args ...interface{}) (b bool, err error) {
err = tx.QueryRow(query, args...).Scan(&b)
return b, err
return b, errors.Wrapf(err, "scanBool: %q %q", query, args)
}

func schemaExists(tx *pgx.Tx, schemaName string) (bool, error) {
Expand All @@ -98,6 +99,8 @@ func tableHasDependencies(tx *pgx.Tx, tableName *tree.TableName) (bool, error) {
ns.oid = c.relnamespace
WHERE c.relname = $1 AND ns.nspname = $2
)
AND fd.descriptor_id != fd.dependedonby_id
AND fd.dependedonby_type != 'sequence'
)
`, tableName.Object(), tableName.Schema())
}
Expand Down Expand Up @@ -133,6 +136,7 @@ func columnIsDependedOn(tx *pgx.Tx, tableName *tree.TableName, columnName string
AS fd
WHERE fd.descriptor_id
= $1::REGCLASS
AND fd.dependedonby_type != 'sequence'
)
UNION (
SELECT unnest(confkey) AS column_id
Expand Down Expand Up @@ -306,7 +310,7 @@ func violatesUniqueConstraintsHelper(
func scanStringArrayRows(tx *pgx.Tx, query string, args ...interface{}) ([][]string, error) {
rows, err := tx.Query(query, args...)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "scanStringArrayRows: %q %q", query, args)
}
defer rows.Close()

Expand All @@ -315,12 +319,12 @@ func scanStringArrayRows(tx *pgx.Tx, query string, args ...interface{}) ([][]str
var columnNames []string
err := rows.Scan(&columnNames)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "scan: %q, args %q, scanArgs %q", query, columnNames, args)
}
results = append(results, columnNames)
}

return results, err
return results, nil
}

func indexExists(tx *pgx.Tx, tableName *tree.TableName, indexName string) (bool, error) {
Expand Down Expand Up @@ -366,7 +370,7 @@ func columnsStoredInPrimaryIdx(

func scanStringArray(tx *pgx.Tx, query string, args ...interface{}) (b []string, err error) {
err = tx.QueryRow(query, args...).Scan(&b)
return b, err
return b, errors.Wrapf(err, "scanStringArray %q %q", query, args)
}

// canApplyUniqueConstraint checks if the rows in a table are unique with respect
Expand Down Expand Up @@ -470,17 +474,36 @@ func constraintIsUnique(
`, tableName.String(), constraintName))
}

func columnIsStoredComputed(
tx *pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
// Note that we COALESCE because the column may not exist.
return scanBool(tx, `
SELECT COALESCE(
(
SELECT attgenerated
FROM pg_catalog.pg_attribute
WHERE attrelid = $1:::REGCLASS AND attname = $2
)
= 's',
false
);
`, tableName.String(), columnName)
}

func columnIsComputed(tx *pgx.Tx, tableName *tree.TableName, columnName string) (bool, error) {
// Note that we COALESCE because the column may not exist.
return scanBool(tx, `
SELECT (
SELECT is_generated
FROM information_schema.columns
WHERE table_schema = $1
AND table_name = $2
AND column_name = $3
)
= 'YES'
`, tableName.Schema(), tableName.Object(), columnName)
SELECT COALESCE(
(
SELECT attgenerated
FROM pg_catalog.pg_attribute
WHERE attrelid = $1:::REGCLASS AND attname = $2
)
!= '',
false
);
`, tableName.String(), columnName)
}

func constraintExists(tx *pgx.Tx, constraintName string) (bool, error) {
Expand Down Expand Up @@ -605,3 +628,145 @@ func violatesFkConstraintsHelper(
)
`, parentTableSchema, parentTableName, parentColumn, childValue))
}

func columnIsInDroppingIndex(
tx *pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
return scanBool(tx, `
SELECT EXISTS(
SELECT index_id
FROM (
SELECT DISTINCT index_id
FROM crdb_internal.index_columns
WHERE descriptor_id = $1::REGCLASS AND column_name = $2
) AS indexes
JOIN crdb_internal.schema_changes AS sc ON sc.target_id
= indexes.index_id
AND table_id = $1::REGCLASS
AND type = 'INDEX'
AND direction = 'DROP'
);
`, tableName.String(), columnName)
}

// A pair of CTE definitions that expect the first argument to be a table name.
const descriptorsAndConstraintMutationsCTE = `descriptors AS (
SELECT crdb_internal.pb_to_json(
'cockroach.sql.sqlbase.Descriptor',
descriptor
)->'table' AS d
FROM system.descriptor
WHERE id = $1::REGCLASS
),
constraint_mutations AS (
SELECT mut
FROM (
SELECT json_array_elements(
d->'mutations'
) AS mut
FROM descriptors
)
WHERE (mut->'constraint') IS NOT NULL
)`

func constraintInDroppingState(
tx *pgx.Tx, tableName *tree.TableName, constraintName string,
) (bool, error) {
// TODO(ajwerner): Figure out how to plumb the column name into this query.
return scanBool(tx, `
WITH `+descriptorsAndConstraintMutationsCTE+`
SELECT true
IN (
SELECT (t.f).value @> json_set('{"validity": "Dropping"}', ARRAY['name'], to_json($2:::STRING))
FROM (
SELECT json_each(mut->'constraint') AS f
FROM constraint_mutations
) AS t
);
`, tableName.String(), constraintName)
}

func columnNotNullConstraintInMutation(
tx *pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
return scanBool(tx, `
WITH `+descriptorsAndConstraintMutationsCTE+`,
col AS (
SELECT (c->>'id')::INT8 AS id
FROM (
SELECT json_array_elements(d->'columns') AS c
FROM descriptors
)
WHERE c->>'name' = $2
)
SELECT EXISTS(
SELECT *
FROM constraint_mutations
JOIN col ON mut->'constraint'->>'constraintType' = 'NOT_NULL'
AND (mut->'constraint'->>'notNullColumn')::INT8 = id
);
`, tableName.String(), columnName)
}

func schemaContainsTypesWithCrossSchemaReferences(tx *pgx.Tx, schemaName string) (bool, error) {
return scanBool(tx, `
WITH database_id AS (
SELECT id
FROM system.namespace
WHERE "parentID" = 0
AND "parentSchemaID" = 0
AND name = current_database()
),
schema_id AS (
SELECT nsp.id
FROM system.namespace AS nsp
JOIN database_id ON "parentID" = database_id.id
AND "parentSchemaID" = 0
AND name = $1
),
descriptor_ids AS (
SELECT nsp.id
FROM system.namespace AS nsp,
schema_id,
database_id
WHERE nsp."parentID" = database_id.id
AND nsp."parentSchemaID" = schema_id.id
),
descriptors AS (
SELECT crdb_internal.pb_to_json(
'cockroach.sql.sqlbase.Descriptor',
descriptor
) AS descriptor
FROM system.descriptor AS descriptors
JOIN descriptor_ids ON descriptors.id
= descriptor_ids.id
),
types AS (
SELECT descriptor
FROM descriptors
WHERE (descriptor->'type') IS NOT NULL
),
table_references AS (
SELECT json_array_elements(
descriptor->'table'->'dependedOnBy'
) AS ref
FROM descriptors
WHERE (descriptor->'table') IS NOT NULL
),
dependent AS (
SELECT (ref->>'id')::INT8 AS id FROM table_references
),
referenced_descriptors AS (
SELECT json_array_elements_text(
descriptor->'type'->'referencingDescriptorIds'
)::INT8 AS id
FROM types
)
SELECT EXISTS(
SELECT *
FROM system.namespace
WHERE id IN (SELECT id FROM referenced_descriptors)
AND "parentSchemaID" NOT IN (SELECT id FROM schema_id)
AND id NOT IN (SELECT id FROM dependent)
);`, schemaName)
}
34 changes: 34 additions & 0 deletions pkg/workload/schemachange/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package schemachange_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)

os.Exit(m.Run())
}

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go
Loading

0 comments on commit db4f939

Please sign in to comment.