Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: workload/schemachange: "fix" workload #67597

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b691e7a
sql: wrap error correctly in drop view
ajwerner Jan 22, 2021
6b0c075
workload/schemachange: add a test to run the workload against a testc…
ajwerner Jan 25, 2021
523fca7
workload/schemachange: extend test to have concurrency
ajwerner Mar 10, 2021
6b59554
workload/schemachange: properly detect that self-references are ok
ajwerner Jan 22, 2021
7bdc18e
workload/schemachange: detect when a column is part of an index mutation
ajwerner Jan 22, 2021
74201be
workload/schemachange: detect not null constraints being concurrently…
ajwerner Jan 22, 2021
d31d06f
workload/schemachange: don't expect errors if the sequence already ex…
ajwerner Jan 22, 2021
cd3b150
workload/schemachange: properly detect writes preceding DDLs
ajwerner Jan 22, 2021
58290a7
workload/schemachange: stop swallowing all errors during op generation
ajwerner Jan 22, 2021
0f41850
workload/schemachange: deal with columns not existing in check
ajwerner Jan 25, 2021
e3363a9
workload/schemachange: only check if a table has rows if it exists
ajwerner Jan 27, 2021
50e75ad
workload/schemachange: fix broken query that was not scanning a schem…
ajwerner Jan 27, 2021
d2d4443
workload/schemachange: detect schemas with cross-schema type references
ajwerner Jan 27, 2021
bfa28a7
workload/schemachange: wrap errors for easier debugging
ajwerner Jan 27, 2021
654f1e0
workload/schemachange: add support for multi-column inverted indexes
ajwerner Feb 24, 2021
74a6dd8
workload/schemachange: fix locking bug when an error is encountered
ajwerner Mar 10, 2021
43777c7
workload/schemachange: detect when a constraint is being dropped
ajwerner Mar 1, 2021
fad579e
workload/schemachange: detect virtual computed columns
ajwerner Mar 10, 2021
f2a11d1
workload/schemachange: partial work to support insert row
ajwerner Mar 10, 2021
f299b37
workload/schemachange: disable operations which do not currently work
ajwerner Mar 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ ALL_TESTS = [
"//pkg/workload/cli:cli_test",
"//pkg/workload/faker:faker_test",
"//pkg/workload/movr:movr_test",
"//pkg/workload/schemachange:schemachange_test",
"//pkg/workload/tpcc:tpcc_test",
"//pkg/workload/workloadimpl:workloadimpl_test",
"//pkg/workload/workloadsql:workloadsql_test",
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/catalog/lease/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ func (m *Manager) Publish(
updates := func(_ *kv.Txn, descs map[descpb.ID]catalog.MutableDescriptor) error {
desc, ok := descs[id]
if !ok {
return errors.AssertionFailedf("required descriptor with ID %d not provided to update closure", id)
return errors.AssertionFailedf(
"required descriptor with ID %d not provided to update closure", id)
}
return update(desc)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (p *planner) dropViewImpl(
dependencyDesc, err := p.Descriptors().GetMutableTableVersionByID(ctx, depID, p.txn)
if err != nil {
return cascadeDroppedViews,
errors.Errorf("error resolving dependency relation ID %d: %v", depID, err)
errors.Wrapf(err, "error resolving dependency relation ID %d", depID)
}
// The dependency is also being deleted, so we don't have to remove the
// references.
Expand Down
27 changes: 26 additions & 1 deletion pkg/workload/schemachange/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//build:STRINGER.bzl", "stringer")

go_library(
Expand Down Expand Up @@ -43,3 +43,28 @@ stringer(
src = "operation_generator.go",
typ = "opType",
)

go_test(
name = "schemachange_test",
srcs = [
"main_test.go",
"schema_change_external_test.go",
],
deps = [
"//pkg/base",
"//pkg/ccl",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"//pkg/workload",
"//pkg/workload/histogram",
"@com_github_stretchr_testify//require",
"@org_golang_x_sync//errgroup",
],
)
193 changes: 179 additions & 14 deletions pkg/workload/schemachange/error_screening.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx"
)

Expand Down Expand Up @@ -74,7 +75,7 @@ func tableHasRows(tx *pgx.Tx, tableName *tree.TableName) (bool, error) {

func scanBool(tx *pgx.Tx, query string, args ...interface{}) (b bool, err error) {
err = tx.QueryRow(query, args...).Scan(&b)
return b, err
return b, errors.Wrapf(err, "scanBool: %q %q", query, args)
}

func schemaExists(tx *pgx.Tx, schemaName string) (bool, error) {
Expand All @@ -98,6 +99,8 @@ func tableHasDependencies(tx *pgx.Tx, tableName *tree.TableName) (bool, error) {
ns.oid = c.relnamespace
WHERE c.relname = $1 AND ns.nspname = $2
)
AND fd.descriptor_id != fd.dependedonby_id
AND fd.dependedonby_type != 'sequence'
)
`, tableName.Object(), tableName.Schema())
}
Expand Down Expand Up @@ -133,6 +136,7 @@ func columnIsDependedOn(tx *pgx.Tx, tableName *tree.TableName, columnName string
AS fd
WHERE fd.descriptor_id
= $1::REGCLASS
AND fd.dependedonby_type != 'sequence'
)
UNION (
SELECT unnest(confkey) AS column_id
Expand Down Expand Up @@ -306,7 +310,7 @@ func violatesUniqueConstraintsHelper(
func scanStringArrayRows(tx *pgx.Tx, query string, args ...interface{}) ([][]string, error) {
rows, err := tx.Query(query, args...)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "scanStringArrayRows: %q %q", query, args)
}
defer rows.Close()

Expand All @@ -315,12 +319,12 @@ func scanStringArrayRows(tx *pgx.Tx, query string, args ...interface{}) ([][]str
var columnNames []string
err := rows.Scan(&columnNames)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "scan: %q, args %q, scanArgs %q", query, columnNames, args)
}
results = append(results, columnNames)
}

return results, err
return results, nil
}

func indexExists(tx *pgx.Tx, tableName *tree.TableName, indexName string) (bool, error) {
Expand Down Expand Up @@ -366,7 +370,7 @@ func columnsStoredInPrimaryIdx(

func scanStringArray(tx *pgx.Tx, query string, args ...interface{}) (b []string, err error) {
err = tx.QueryRow(query, args...).Scan(&b)
return b, err
return b, errors.Wrapf(err, "scanStringArray %q %q", query, args)
}

// canApplyUniqueConstraint checks if the rows in a table are unique with respect
Expand Down Expand Up @@ -470,17 +474,36 @@ func constraintIsUnique(
`, tableName.String(), constraintName))
}

func columnIsStoredComputed(
tx *pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
// Note that we COALESCE because the column may not exist.
return scanBool(tx, `
SELECT COALESCE(
(
SELECT attgenerated
FROM pg_catalog.pg_attribute
WHERE attrelid = $1:::REGCLASS AND attname = $2
)
= 's',
false
);
`, tableName.String(), columnName)
}

func columnIsComputed(tx *pgx.Tx, tableName *tree.TableName, columnName string) (bool, error) {
// Note that we COALESCE because the column may not exist.
return scanBool(tx, `
SELECT (
SELECT is_generated
FROM information_schema.columns
WHERE table_schema = $1
AND table_name = $2
AND column_name = $3
)
= 'YES'
`, tableName.Schema(), tableName.Object(), columnName)
SELECT COALESCE(
(
SELECT attgenerated
FROM pg_catalog.pg_attribute
WHERE attrelid = $1:::REGCLASS AND attname = $2
)
!= '',
false
);
`, tableName.String(), columnName)
}

func constraintExists(tx *pgx.Tx, constraintName string) (bool, error) {
Expand Down Expand Up @@ -605,3 +628,145 @@ func violatesFkConstraintsHelper(
)
`, parentTableSchema, parentTableName, parentColumn, childValue))
}

func columnIsInDroppingIndex(
tx *pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
return scanBool(tx, `
SELECT EXISTS(
SELECT index_id
FROM (
SELECT DISTINCT index_id
FROM crdb_internal.index_columns
WHERE descriptor_id = $1::REGCLASS AND column_name = $2
) AS indexes
JOIN crdb_internal.schema_changes AS sc ON sc.target_id
= indexes.index_id
AND table_id = $1::REGCLASS
AND type = 'INDEX'
AND direction = 'DROP'
);
`, tableName.String(), columnName)
}

// A pair of CTE definitions that expect the first argument to be a table name.
const descriptorsAndConstraintMutationsCTE = `descriptors AS (
SELECT crdb_internal.pb_to_json(
'cockroach.sql.sqlbase.Descriptor',
descriptor
)->'table' AS d
FROM system.descriptor
WHERE id = $1::REGCLASS
),
constraint_mutations AS (
SELECT mut
FROM (
SELECT json_array_elements(
d->'mutations'
) AS mut
FROM descriptors
)
WHERE (mut->'constraint') IS NOT NULL
)`

func constraintInDroppingState(
tx *pgx.Tx, tableName *tree.TableName, constraintName string,
) (bool, error) {
// TODO(ajwerner): Figure out how to plumb the column name into this query.
return scanBool(tx, `
WITH `+descriptorsAndConstraintMutationsCTE+`
SELECT true
IN (
SELECT (t.f).value @> json_set('{"validity": "Dropping"}', ARRAY['name'], to_json($2:::STRING))
FROM (
SELECT json_each(mut->'constraint') AS f
FROM constraint_mutations
) AS t
);
`, tableName.String(), constraintName)
}

func columnNotNullConstraintInMutation(
tx *pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
return scanBool(tx, `
WITH `+descriptorsAndConstraintMutationsCTE+`,
col AS (
SELECT (c->>'id')::INT8 AS id
FROM (
SELECT json_array_elements(d->'columns') AS c
FROM descriptors
)
WHERE c->>'name' = $2
)
SELECT EXISTS(
SELECT *
FROM constraint_mutations
JOIN col ON mut->'constraint'->>'constraintType' = 'NOT_NULL'
AND (mut->'constraint'->>'notNullColumn')::INT8 = id
);
`, tableName.String(), columnName)
}

func schemaContainsTypesWithCrossSchemaReferences(tx *pgx.Tx, schemaName string) (bool, error) {
return scanBool(tx, `
WITH database_id AS (
SELECT id
FROM system.namespace
WHERE "parentID" = 0
AND "parentSchemaID" = 0
AND name = current_database()
),
schema_id AS (
SELECT nsp.id
FROM system.namespace AS nsp
JOIN database_id ON "parentID" = database_id.id
AND "parentSchemaID" = 0
AND name = $1
),
descriptor_ids AS (
SELECT nsp.id
FROM system.namespace AS nsp,
schema_id,
database_id
WHERE nsp."parentID" = database_id.id
AND nsp."parentSchemaID" = schema_id.id
),
descriptors AS (
SELECT crdb_internal.pb_to_json(
'cockroach.sql.sqlbase.Descriptor',
descriptor
) AS descriptor
FROM system.descriptor AS descriptors
JOIN descriptor_ids ON descriptors.id
= descriptor_ids.id
),
types AS (
SELECT descriptor
FROM descriptors
WHERE (descriptor->'type') IS NOT NULL
),
table_references AS (
SELECT json_array_elements(
descriptor->'table'->'dependedOnBy'
) AS ref
FROM descriptors
WHERE (descriptor->'table') IS NOT NULL
),
dependent AS (
SELECT (ref->>'id')::INT8 AS id FROM table_references
),
referenced_descriptors AS (
SELECT json_array_elements_text(
descriptor->'type'->'referencingDescriptorIds'
)::INT8 AS id
FROM types
)
SELECT EXISTS(
SELECT *
FROM system.namespace
WHERE id IN (SELECT id FROM referenced_descriptors)
AND "parentSchemaID" NOT IN (SELECT id FROM schema_id)
AND id NOT IN (SELECT id FROM dependent)
);`, schemaName)
}
34 changes: 34 additions & 0 deletions pkg/workload/schemachange/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package schemachange_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)

os.Exit(m.Run())
}

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go
Loading