Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect and signal schema changes on vttablets #8005

Merged
merged 15 commits into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions go/mysql/endtoend/schema_change_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package endtoend

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
)

var ctx = context.Background()

const (
createDb = `create database if not exists _vt`
createUserTable = `create table vttest.product (id bigint(20) primary key, name char(10) CHARACTER SET utf8 COLLATE utf8_unicode_ci, created bigint(20))`
dropTestTable = `drop table if exists product`
)

func TestChangeSchemaIsNoticed(t *testing.T) {
conn, err := mysql.Connect(ctx, &connParams)
require.NoError(t, err)
defer conn.Close()

_, err = conn.ExecuteFetch(createDb, 1000, true)
require.NoError(t, err)
_, err = conn.ExecuteFetch(mysql.CreateSchemaCopyTable, 1000, true)
require.NoError(t, err)

tests := []struct {
name string
changeQ string
}{{
name: "add column",
changeQ: "alter table vttest.product add column phone VARCHAR(15)",
}, {
name: "rename column",
changeQ: "alter table vttest.product change name firstname char(10)",
}, {
name: "change column type",
changeQ: "alter table vttest.product change name name char(100)",
}, {
name: "remove column",
changeQ: "alter table vttest.product drop column name",
}, {
name: "remove last column",
changeQ: "alter table vttest.product drop column created",
}, {
name: "remove table",
changeQ: "drop table product",
}, {
name: "create table",
changeQ: `create table vttest.new_table (id bigint(20) primary key)`,
}, {
name: "change character set",
changeQ: "alter table vttest.product change name name char(10) CHARACTER SET utf8mb4",
}, {
name: "change collation",
changeQ: "alter table vttest.product change name name char(10) COLLATE utf8_unicode_520_ci",
}, {
name: "drop PK",
changeQ: "alter table vttest.product drop primary key",
}, {
name: "change PK",
changeQ: "alter table vttest.product drop primary key, add primary key (name)",
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// reset schemacopy
_, err := conn.ExecuteFetch(mysql.ClearSchemaCopy, 1000, true)
require.NoError(t, err)
_, err = conn.ExecuteFetch(dropTestTable, 1000, true)
require.NoError(t, err)
_, err = conn.ExecuteFetch(createUserTable, 1000, true)
require.NoError(t, err)
rs, err := conn.ExecuteFetch(mysql.InsertIntoSchemaCopy, 1000, true)
require.NoError(t, err)
require.NotZero(t, rs.RowsAffected)

// make sure no changes are detected
rs, err = conn.ExecuteFetch(mysql.DetectSchemaChange, 1000, true)
require.NoError(t, err)
require.Empty(t, rs.Rows)

// make the schema change
_, err = conn.ExecuteFetch(test.changeQ, 1000, true)
require.NoError(t, err)

// make sure the change is detected
rs, err = conn.ExecuteFetch(mysql.DetectSchemaChange, 1000, true)
require.NoError(t, err)
require.NotEmpty(t, rs.Rows)
})

}

}
66 changes: 66 additions & 0 deletions go/mysql/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,74 @@ const (
BaseShowPrimary = "SELECT table_name, column_name FROM information_schema.key_column_usage WHERE table_schema=database() AND constraint_name='PRIMARY' ORDER BY table_name, ordinal_position"
// ShowRowsRead is the query used to find the number of rows read.
ShowRowsRead = "show status like 'Innodb_rows_read'"

// CreateVTDatabase creates the _vt database
CreateVTDatabase = `CREATE DATABASE IF NOT EXISTS _vt`

// CreateSchemaCopyTable query creates schemacopy table in _vt schema.
CreateSchemaCopyTable = `
CREATE TABLE if not exists _vt.schemacopy (
table_schema varchar(64) NOT NULL,
table_name varchar(64) NOT NULL,
column_name varchar(64) NOT NULL,
ordinal_position bigint(21) unsigned NOT NULL,
character_set_name varchar(32) DEFAULT NULL,
collation_name varchar(32) DEFAULT NULL,
column_type longtext NOT NULL,
column_key varchar(3) NOT NULL,
PRIMARY KEY (table_schema, table_name, ordinal_position))`

detectNewColumns = `
select ISC.table_name
from information_schema.columns as ISC
left join _vt.schemacopy as c on
ISC.table_name = c.table_name and
ISC.table_schema=c.table_schema and
ISC.ordinal_position = c.ordinal_position
where ISC.table_schema = database() AND c.table_schema is null`
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved

detectChangeColumns = `
select ISC.table_name
from information_schema.columns as ISC
join _vt.schemacopy as c on
ISC.table_name = c.table_name and
ISC.table_schema=c.table_schema and
ISC.ordinal_position = c.ordinal_position
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
where ISC.table_schema = database()
AND (not(c.column_name <=> ISC.column_name)
OR not(ISC.character_set_name <=> c.character_set_name)
OR not(ISC.collation_name <=> c.collation_name)
OR not(ISC.column_type <=> c.column_type)
OR not(ISC.column_key <=> c.column_key))`
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved

detectRemoveColumns = `
select c.table_name
from information_schema.columns as ISC
right join _vt.schemacopy as c on
ISC.table_name = c.table_name and
ISC.table_schema=c.table_schema and
ISC.ordinal_position = c.ordinal_position
where c.table_schema = database() AND ISC.table_schema is null`

// DetectSchemaChange query detects if there is any schema change from previous copy.
DetectSchemaChange = detectChangeColumns + " UNION " + detectNewColumns + " UNION " + detectRemoveColumns

// ClearSchemaCopy query clears the schemacopy table.
ClearSchemaCopy = `delete from _vt.schemacopy`

// InsertIntoSchemaCopy query copies over the schema information from information_schema.columns table.
InsertIntoSchemaCopy = `insert _vt.schemacopy
select table_schema, table_name, column_name, ordinal_position, character_set_name, collation_name, column_type, column_key
from information_schema.columns
where table_schema = database()`
)

// VTDatabaseInit contains all the schema creation queries needed to
var VTDatabaseInit = []string{
CreateVTDatabase,
CreateSchemaCopyTable,
}

// BaseShowTablesFields contains the fields returned by a BaseShowTables or a BaseShowTablesForTable command.
// They are validated by the
// testBaseShowTables test.
Expand Down
Loading