Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workload/schemachange: "fix" workload #59635

Merged
merged 20 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6600f45
sql: wrap error correctly in drop view
ajwerner Jan 22, 2021
9cd7be7
workload/schemachange: add a test to run the workload against a testc…
ajwerner Jan 25, 2021
ebc40a7
workload/schemachange: extend test to have concurrency
ajwerner Mar 10, 2021
01be19c
workload/schemachange: properly detect that self-references are ok
ajwerner Jan 22, 2021
85668ca
workload/schemachange: detect when a column is part of an index mutation
ajwerner Jan 22, 2021
503059e
workload/schemachange: detect not null constraints being concurrently…
ajwerner Jan 22, 2021
e8547db
workload/schemachange: don't expect errors if the sequence already ex…
ajwerner Jan 22, 2021
6304018
workload/schemachange: properly detect writes preceding DDLs
ajwerner Jan 22, 2021
7d5c67c
workload/schemachange: stop swallowing all errors during op generation
ajwerner Jan 22, 2021
33bd870
workload/schemachange: deal with columns not existing in check
ajwerner Jan 25, 2021
8367eb7
workload/schemachange: only check if a table has rows if it exists
ajwerner Jan 27, 2021
8946d0c
workload/schemachange: fix broken query that was not scanning a schem…
ajwerner Jan 27, 2021
fe16420
workload/schemachange: detect schemas with cross-schema type references
ajwerner Jan 27, 2021
62d64d3
workload/schemachange: wrap errors for easier debugging
ajwerner Jan 27, 2021
a699c4f
workload/schemachange: add support for multi-column inverted indexes
ajwerner Feb 24, 2021
6fe27e4
workload/schemachange: fix locking bug when an error is encountered
ajwerner Mar 10, 2021
2eb6c86
workload/schemachange: detect when a constraint is being dropped
ajwerner Mar 1, 2021
cea972d
workload/schemachange: detect virtual computed columns
ajwerner Mar 10, 2021
661955a
workload/schemachange: partial work to support insert row
ajwerner Mar 10, 2021
4f62dba
workload/schemachange: disable operations which do not currently work
ajwerner Mar 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ ALL_TESTS = [
"//pkg/workload/cli:cli_test",
"//pkg/workload/faker:faker_test",
"//pkg/workload/movr:movr_test",
"//pkg/workload/schemachange:schemachange_test",
"//pkg/workload/tpcc:tpcc_test",
"//pkg/workload/workloadimpl:workloadimpl_test",
"//pkg/workload/workloadsql:workloadsql_test",
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/catalog/lease/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ func (m *Manager) Publish(
updates := func(_ *kv.Txn, descs map[descpb.ID]catalog.MutableDescriptor) error {
desc, ok := descs[id]
if !ok {
return errors.AssertionFailedf("required descriptor with ID %d not provided to update closure", id)
return errors.AssertionFailedf(
"required descriptor with ID %d not provided to update closure", id)
}
return update(desc)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (p *planner) dropViewImpl(
dependencyDesc, err := p.Descriptors().GetMutableTableVersionByID(ctx, depID, p.txn)
if err != nil {
return cascadeDroppedViews,
errors.Errorf("error resolving dependency relation ID %d: %v", depID, err)
errors.Wrapf(err, "error resolving dependency relation ID %d", depID)
}
// The dependency is also being deleted, so we don't have to remove the
// references.
Expand Down
27 changes: 26 additions & 1 deletion pkg/workload/schemachange/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//build:STRINGER.bzl", "stringer")

go_library(
Expand Down Expand Up @@ -43,3 +43,28 @@ stringer(
src = "operation_generator.go",
typ = "opType",
)

go_test(
name = "schemachange_test",
srcs = [
"main_test.go",
"schema_change_external_test.go",
],
deps = [
"//pkg/base",
"//pkg/ccl",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"//pkg/workload",
"//pkg/workload/histogram",
"@com_github_stretchr_testify//require",
"@org_golang_x_sync//errgroup",
],
)
193 changes: 179 additions & 14 deletions pkg/workload/schemachange/error_screening.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx"
)

Expand Down Expand Up @@ -74,7 +75,7 @@ func tableHasRows(tx *pgx.Tx, tableName *tree.TableName) (bool, error) {

func scanBool(tx *pgx.Tx, query string, args ...interface{}) (b bool, err error) {
err = tx.QueryRow(query, args...).Scan(&b)
return b, err
return b, errors.Wrapf(err, "scanBool: %q %q", query, args)
}

func schemaExists(tx *pgx.Tx, schemaName string) (bool, error) {
Expand All @@ -98,6 +99,8 @@ func tableHasDependencies(tx *pgx.Tx, tableName *tree.TableName) (bool, error) {
ns.oid = c.relnamespace
WHERE c.relname = $1 AND ns.nspname = $2
)
AND fd.descriptor_id != fd.dependedonby_id
AND fd.dependedonby_type != 'sequence'
)
`, tableName.Object(), tableName.Schema())
}
Expand Down Expand Up @@ -133,6 +136,7 @@ func columnIsDependedOn(tx *pgx.Tx, tableName *tree.TableName, columnName string
AS fd
WHERE fd.descriptor_id
= $1::REGCLASS
AND fd.dependedonby_type != 'sequence'
)
UNION (
SELECT unnest(confkey) AS column_id
Expand Down Expand Up @@ -306,7 +310,7 @@ func violatesUniqueConstraintsHelper(
func scanStringArrayRows(tx *pgx.Tx, query string, args ...interface{}) ([][]string, error) {
rows, err := tx.Query(query, args...)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "scanStringArrayRows: %q %q", query, args)
}
defer rows.Close()

Expand All @@ -315,12 +319,12 @@ func scanStringArrayRows(tx *pgx.Tx, query string, args ...interface{}) ([][]str
var columnNames []string
err := rows.Scan(&columnNames)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "scan: %q, args %q, scanArgs %q", query, columnNames, args)
}
results = append(results, columnNames)
}

return results, err
return results, nil
}

func indexExists(tx *pgx.Tx, tableName *tree.TableName, indexName string) (bool, error) {
Expand Down Expand Up @@ -366,7 +370,7 @@ func columnsStoredInPrimaryIdx(

func scanStringArray(tx *pgx.Tx, query string, args ...interface{}) (b []string, err error) {
err = tx.QueryRow(query, args...).Scan(&b)
return b, err
return b, errors.Wrapf(err, "scanStringArray %q %q", query, args)
}

// canApplyUniqueConstraint checks if the rows in a table are unique with respect
Expand Down Expand Up @@ -470,17 +474,36 @@ func constraintIsUnique(
`, tableName.String(), constraintName))
}

func columnIsStoredComputed(
tx *pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
// Note that we COALESCE because the column may not exist.
return scanBool(tx, `
SELECT COALESCE(
(
SELECT attgenerated
FROM pg_catalog.pg_attribute
WHERE attrelid = $1:::REGCLASS AND attname = $2
)
= 's',
false
);
`, tableName.String(), columnName)
}

func columnIsComputed(tx *pgx.Tx, tableName *tree.TableName, columnName string) (bool, error) {
// Note that we COALESCE because the column may not exist.
return scanBool(tx, `
SELECT (
SELECT is_generated
FROM information_schema.columns
WHERE table_schema = $1
AND table_name = $2
AND column_name = $3
)
= 'YES'
`, tableName.Schema(), tableName.Object(), columnName)
SELECT COALESCE(
(
SELECT attgenerated
FROM pg_catalog.pg_attribute
WHERE attrelid = $1:::REGCLASS AND attname = $2
)
!= '',
false
);
`, tableName.String(), columnName)
}

func constraintExists(tx *pgx.Tx, constraintName string) (bool, error) {
Expand Down Expand Up @@ -605,3 +628,145 @@ func violatesFkConstraintsHelper(
)
`, parentTableSchema, parentTableName, parentColumn, childValue))
}

func columnIsInDroppingIndex(
tx *pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
return scanBool(tx, `
SELECT EXISTS(
SELECT index_id
FROM (
SELECT DISTINCT index_id
FROM crdb_internal.index_columns
WHERE descriptor_id = $1::REGCLASS AND column_name = $2
) AS indexes
JOIN crdb_internal.schema_changes AS sc ON sc.target_id
= indexes.index_id
AND table_id = $1::REGCLASS
AND type = 'INDEX'
AND direction = 'DROP'
);
`, tableName.String(), columnName)
}

// A pair of CTE definitions that expect the first argument to be a table name.
const descriptorsAndConstraintMutationsCTE = `descriptors AS (
SELECT crdb_internal.pb_to_json(
'cockroach.sql.sqlbase.Descriptor',
descriptor
)->'table' AS d
FROM system.descriptor
WHERE id = $1::REGCLASS
),
constraint_mutations AS (
SELECT mut
FROM (
SELECT json_array_elements(
d->'mutations'
) AS mut
FROM descriptors
)
WHERE (mut->'constraint') IS NOT NULL
)`

func constraintInDroppingState(
tx *pgx.Tx, tableName *tree.TableName, constraintName string,
) (bool, error) {
// TODO(ajwerner): Figure out how to plumb the column name into this query.
return scanBool(tx, `
WITH `+descriptorsAndConstraintMutationsCTE+`
SELECT true
IN (
SELECT (t.f).value @> json_set('{"validity": "Dropping"}', ARRAY['name'], to_json($2:::STRING))
FROM (
SELECT json_each(mut->'constraint') AS f
FROM constraint_mutations
) AS t
);
`, tableName.String(), constraintName)
}

func columnNotNullConstraintInMutation(
tx *pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
return scanBool(tx, `
WITH `+descriptorsAndConstraintMutationsCTE+`,
col AS (
SELECT (c->>'id')::INT8 AS id
FROM (
SELECT json_array_elements(d->'columns') AS c
FROM descriptors
)
WHERE c->>'name' = $2
)
SELECT EXISTS(
SELECT *
FROM constraint_mutations
JOIN col ON mut->'constraint'->>'constraintType' = 'NOT_NULL'
AND (mut->'constraint'->>'notNullColumn')::INT8 = id
);
`, tableName.String(), columnName)
}

func schemaContainsTypesWithCrossSchemaReferences(tx *pgx.Tx, schemaName string) (bool, error) {
return scanBool(tx, `
WITH database_id AS (
SELECT id
FROM system.namespace
WHERE "parentID" = 0
AND "parentSchemaID" = 0
AND name = current_database()
),
schema_id AS (
SELECT nsp.id
FROM system.namespace AS nsp
JOIN database_id ON "parentID" = database_id.id
AND "parentSchemaID" = 0
AND name = $1
),
descriptor_ids AS (
SELECT nsp.id
FROM system.namespace AS nsp,
schema_id,
database_id
WHERE nsp."parentID" = database_id.id
AND nsp."parentSchemaID" = schema_id.id
),
descriptors AS (
SELECT crdb_internal.pb_to_json(
'cockroach.sql.sqlbase.Descriptor',
descriptor
) AS descriptor
FROM system.descriptor AS descriptors
JOIN descriptor_ids ON descriptors.id
= descriptor_ids.id
),
types AS (
SELECT descriptor
FROM descriptors
WHERE (descriptor->'type') IS NOT NULL
),
table_references AS (
SELECT json_array_elements(
descriptor->'table'->'dependedOnBy'
) AS ref
FROM descriptors
WHERE (descriptor->'table') IS NOT NULL
),
dependent AS (
SELECT (ref->>'id')::INT8 AS id FROM table_references
),
referenced_descriptors AS (
SELECT json_array_elements_text(
descriptor->'type'->'referencingDescriptorIds'
)::INT8 AS id
FROM types
)
SELECT EXISTS(
SELECT *
FROM system.namespace
WHERE id IN (SELECT id FROM referenced_descriptors)
AND "parentSchemaID" NOT IN (SELECT id FROM schema_id)
AND id NOT IN (SELECT id FROM dependent)
);`, schemaName)
}
34 changes: 34 additions & 0 deletions pkg/workload/schemachange/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package schemachange_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)

os.Exit(m.Run())
}

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go
Loading