From 56e5b178e54aa895ea4b4143326b7136713abf5f Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Sat, 20 Jul 2019 13:01:10 -0600 Subject: [PATCH 1/6] endtoend: factor out waitForMessage Signed-off-by: Derek Perkins --- go/vt/vttablet/endtoend/message_test.go | 61 ++++++++----------------- 1 file changed, 19 insertions(+), 42 deletions(-) diff --git a/go/vt/vttablet/endtoend/message_test.go b/go/vt/vttablet/endtoend/message_test.go index 79a86227671..07cb1c275bb 100644 --- a/go/vt/vttablet/endtoend/message_test.go +++ b/go/vt/vttablet/endtoend/message_test.go @@ -59,20 +59,7 @@ func TestMessage(t *testing.T) { } // Start goroutine to consume message stream. - go func() { - if err := client.MessageStream("vitess_message", func(qr *sqltypes.Result) error { - select { - case <-done: - return io.EOF - default: - } - ch <- qr - return nil - }); err != nil { - t.Error(err) - } - close(ch) - }() + go waitForMessage(t, client, "vitess_message", ch, done) got := <-ch want := &sqltypes.Result{ Fields: []*querypb.Field{{ @@ -248,20 +235,7 @@ func TestThreeColMessage(t *testing.T) { } defer client.Execute("drop table vitess_message3", nil) - go func() { - if err := client.MessageStream("vitess_message3", func(qr *sqltypes.Result) error { - select { - case <-done: - return io.EOF - default: - } - ch <- qr - return nil - }); err != nil { - t.Error(err) - } - close(ch) - }() + go waitForMessage(t, client, "vitess_message3", ch, done) // Verify fields. got := <-ch @@ -358,20 +332,7 @@ func TestMessageAuto(t *testing.T) { defer client.Execute("drop table vitess_message_auto", nil) // Start goroutine to consume message stream. - go func() { - if err := client.MessageStream("vitess_message_auto", func(qr *sqltypes.Result) error { - select { - case <-done: - return io.EOF - default: - } - ch <- qr - return nil - }); err != nil { - t.Error(err) - } - close(ch) - }() + go waitForMessage(t, client, "vitess_message_auto", ch, done) <-ch defer func() { close(done) }() @@ -461,3 +422,19 @@ func TestMessageAuto(t *testing.T) { t.Errorf("message received:\n%v, want\n%v", got, want) } } + + +func waitForMessage(t *testing.T, client *framework.QueryClient, tableName string, ch chan *sqltypes.Result, done chan struct{}) { + if err := client.MessageStream(tableName, func(qr *sqltypes.Result) error { + select { + case <-done: + return io.EOF + default: + } + ch <- qr + return nil + }); err != nil { + t.Error(err) + } + close(ch) +} From 181aaf3f071b65f373e9ffb5aa6e0b4ae5d948e6 Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Thu, 25 Jul 2019 22:17:47 -0600 Subject: [PATCH 2/6] schema: add getString from comments Signed-off-by: Derek Perkins --- go/vt/vttablet/tabletserver/schema/load_table.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/go/vt/vttablet/tabletserver/schema/load_table.go b/go/vt/vttablet/tabletserver/schema/load_table.go index 3257af030d0..c7dcf16aa19 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table.go +++ b/go/vt/vttablet/tabletserver/schema/load_table.go @@ -239,3 +239,11 @@ func getNum(in map[string]string, key string) (int, error) { } return v, nil } + +func getString(in map[string]string, key string) (string, error) { + sv := in[key] + if sv == "" { + return "", fmt.Errorf("attribute %s not specified for message table", key) + } + return sv, nil +} From 0bf354546d962b361a6b8da2e8ef88e4ba627fe8 Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Thu, 25 Jul 2019 22:19:37 -0600 Subject: [PATCH 3/6] messages: add topic abstraction like Google PubSub Signed-off-by: Derek Perkins --- go/vt/vttablet/endtoend/main_test.go | 21 ++ go/vt/vttablet/endtoend/message_test.go | 261 ++++++++++++++++++ .../vttablet/tabletserver/planbuilder/dml.go | 24 ++ .../vttablet/tabletserver/planbuilder/plan.go | 5 + .../planbuilder/testdata/exec_cases.txt | 45 +++ .../planbuilder/testdata/schema_test.json | 59 ++++ go/vt/vttablet/tabletserver/schema/engine.go | 87 +++++- .../tabletserver/schema/load_table.go | 6 + .../tabletserver/schema/load_table_test.go | 77 ++++++ go/vt/vttablet/tabletserver/schema/schema.go | 22 ++ go/vt/vttablet/tabletserver/tabletserver.go | 70 +++-- 11 files changed, 656 insertions(+), 21 deletions(-) diff --git a/go/vt/vttablet/endtoend/main_test.go b/go/vt/vttablet/endtoend/main_test.go index f341d8fba36..8bba8fc6db7 100644 --- a/go/vt/vttablet/endtoend/main_test.go +++ b/go/vt/vttablet/endtoend/main_test.go @@ -250,6 +250,27 @@ var tableACLConfig = `{ "writers": ["dev"], "admins": ["dev"] }, + { + "name": "test_topic", + "table_names_or_prefixes": ["test_topic"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_topic_subscriber_1", + "table_names_or_prefixes": ["vitess_topic_subscriber_1"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_topic_subscriber_2", + "table_names_or_prefixes": ["vitess_topic_subscriber_2"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, { "name": "vitess_acl_unmatched", "table_names_or_prefixes": ["vitess_acl_unmatched"], diff --git a/go/vt/vttablet/endtoend/message_test.go b/go/vt/vttablet/endtoend/message_test.go index 07cb1c275bb..6a908c3582d 100644 --- a/go/vt/vttablet/endtoend/message_test.go +++ b/go/vt/vttablet/endtoend/message_test.go @@ -423,6 +423,267 @@ func TestMessageAuto(t *testing.T) { } } +var createMessageTopic1 = `create table vitess_topic_subscriber_1( + time_scheduled bigint, + id bigint, + time_next bigint, + epoch bigint, + time_created bigint, + time_acked bigint, + message varchar(128), + primary key(time_scheduled, id), + unique index id_idx(id), + index next_idx(time_next, epoch)) +comment 'vitess_message,vt_topic=test_topic,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=1'` + +var createMessageTopic2 = `create table vitess_topic_subscriber_2( + time_scheduled bigint, + id bigint, + time_next bigint, + epoch bigint, + time_created bigint, + time_acked bigint, + message varchar(128), + primary key(time_scheduled, id), + unique index id_idx(id), + index next_idx(time_next, epoch)) +comment 'vitess_message,vt_topic=test_topic,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=1'` + +// TestMessageTopic tests for the case where id is an auto-inc column. +func TestMessageTopic(t *testing.T) { + ch1 := make(chan *sqltypes.Result) + ch2 := make(chan *sqltypes.Result) + done := make(chan struct{}) + client := framework.NewClient() + + // + // phase 1 tests inserts into a topic going to two subscribed message tables + // + if _, err := client.Execute(createMessageTopic1, nil); err != nil { + t.Fatal(err) + } + + if _, err := client.Execute(createMessageTopic2, nil); err != nil { + t.Fatal(err) + } + + // Start goroutines to consume message stream. + go waitForMessage(t, client, "vitess_topic_subscriber_1", ch1, done) + <-ch1 + go waitForMessage(t, client, "vitess_topic_subscriber_2", ch2, done) + <-ch2 + defer func() { close(done) }() + + // Create message. + err := client.Begin(false) + if err != nil { + t.Error(err) + return + } + // This insert should cause the engine to make a best-effort guess at generated ids. + // It will expedite the first two rows with null values, and the third row, and will + // give up on the last row, which should eventually be picked up by the poller. + _, err = client.Execute("insert into test_topic(id, message) values(1, 'msg1'), (2, 'msg2'), (3, 'msg3')", nil) + if err != nil { + t.Error(err) + return + } + + err = client.Commit() + if err != nil { + t.Error(err) + return + } + + // Only three messages should be queued on the first subscription table + if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_1.Queued"), 3; got != want { + t.Errorf("Messages/vitess_topic_subscriber_1.Queued: %d, want %d", got, want) + } + + // Only three messages should be queued on the second subscription table + if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_2.Queued"), 3; got != want { + t.Errorf("Messages/vitess_topic_subscriber_2.Queued: %d, want %d", got, want) + } + + wantResults := []*sqltypes.Result{{ + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt64(1), + sqltypes.NULL, + sqltypes.NewVarChar("msg1"), + }}, + }, { + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt64(2), + sqltypes.NULL, + sqltypes.NewVarChar("msg2"), + }}, + }, { + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt64(3), + sqltypes.NULL, + sqltypes.NewVarChar("msg3"), + }}, + }} + + // Consume first three messages + // and ensure they were received promptly. + start := time.Now() + for i := 0; i < 3; i++ { + // make sure the first message table received all three messages + got1 := <-ch1 + got1.Rows[0][1] = sqltypes.NULL + + // Results can come in any order. + found := false + for _, want := range wantResults { + if reflect.DeepEqual(got1, want) { + found = true + } + } + if !found { + t.Errorf("message fetch 1: %v not found in expected list: %v", got1, wantResults) + } + + // make sure the second message table received all three messages + got2 := <-ch2 + got2.Rows[0][1] = sqltypes.NULL + + // Results can come in any order. + found = false + for _, want := range wantResults { + if reflect.DeepEqual(got2, want) { + found = true + } + } + if !found { + t.Errorf("message fetch 2: %v not found in expected list: %v", got2, wantResults) + } + } + if d := time.Since(start); d > 1*time.Second { + t.Errorf("messages were delayed: %v", d) + } + + // ack the first subscriber + _, err = client.MessageAck("vitess_topic_subscriber_1", []string{"1, 2, 3"}) + if err != nil { + t.Error(err) + } + + // ack the second subscriber + _, err = client.MessageAck("vitess_topic_subscriber_2", []string{"1, 2, 3"}) + if err != nil { + t.Error(err) + } + + // + // phase 2 tests deleting one of the subscribers and making sure + // that inserts into a topic go to one subscribed message table + // + + client.Execute("drop table vitess_topic_subscriber_1", nil) + + // Create message. + err = client.Begin(false) + if err != nil { + t.Error(err) + return + } + // This insert should cause the engine to make a best-effort guess at generated ids. + // It will expedite the first two rows with null values, and the third row, and will + // give up on the last row, which should eventually be picked up by the poller. + _, err = client.Execute("insert into test_topic(id, message) values(4, 'msg4'), (5, 'msg5'), (6, 'msg6')", nil) + if err != nil { + t.Error(err) + return + } + + err = client.Commit() + if err != nil { + t.Error(err) + return + } + + // no messages should be queued on the first subscription table + if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_1.Queued"), 3; got != want { + t.Errorf("Messages/vitess_topic_subscriber_1.Queued: %d, want %d", got, want) + } + + // Only three messages should be queued on the second subscription table + if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_2.Queued"), 6; got != want { + t.Errorf("Messages/vitess_topic_subscriber_2.Queued: %d, want %d", got, want) + } + + wantResults = []*sqltypes.Result{{ + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt64(4), + sqltypes.NULL, + sqltypes.NewVarChar("msg4"), + }}, + }, { + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt64(5), + sqltypes.NULL, + sqltypes.NewVarChar("msg5"), + }}, + }, { + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt64(6), + sqltypes.NULL, + sqltypes.NewVarChar("msg6"), + }}, + }} + + // Consume first three messages + // and ensure they were received promptly. + start = time.Now() + for i := 0; i < 3; i++ { + // make sure the second message table received all three messages + got2 := <-ch2 + got2.Rows[0][1] = sqltypes.NULL + + // Results can come in any order. + found := false + for _, want := range wantResults { + if reflect.DeepEqual(got2, want) { + found = true + } + } + if !found { + t.Errorf("message fetch 2: %v not found in expected list: %v", got2, wantResults) + } + } + if d := time.Since(start); d > 1*time.Second { + t.Errorf("messages were delayed: %v", d) + } + + // ack the second subscriber + _, err = client.MessageAck("vitess_topic_subscriber_2", []string{"4, 5, 6"}) + if err != nil { + t.Error(err) + } + + // + // phase 3 tests deleting the last subscriber and making sure + // that inserts into a topic error out with table not found + // + + // remove the second subscriber which should remove the topic + if _, err := client.Execute("drop table vitess_topic_subscriber_2", nil); err != nil { + t.Fatal(err) + } + + // this should fail because the topic doesn't exist. Any other outcome fails the test + _, err = client.Execute("insert into test_topic(id, message) values(4, 'msg4'), (5, 'msg5'), (6, 'msg6')", nil) + switch { + case err == nil: + t.Error("test_topic shouldn't have existed for inserts to succeed") + + case err.Error() == "table test_topic not found in schema (CallerID: dev)": + + default: + t.Error(err) + } +} func waitForMessage(t *testing.T, client *framework.QueryClient, tableName string, ch chan *sqltypes.Result, done chan struct{}) { if err := client.MessageStream(tableName, func(qr *sqltypes.Result) error { diff --git a/go/vt/vttablet/tabletserver/planbuilder/dml.go b/go/vt/vttablet/tabletserver/planbuilder/dml.go index 5e67140fa72..bc3ab72f5ba 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/dml.go +++ b/go/vt/vttablet/tabletserver/planbuilder/dml.go @@ -50,6 +50,12 @@ func analyzeUpdate(upd *sqlparser.Update, tables map[string]*schema.Table) (plan } table, tableErr := plan.setTable(tableName, tables) + // Updates aren't supported on topics + if tableErr == nil && table.IsTopic() { + plan.Reason = ReasonTopic + return plan, nil + } + // In passthrough dml mode, allow the operation even if the // table is unknown in the schema. if PassthroughDMLs { @@ -118,6 +124,12 @@ func analyzeDelete(del *sqlparser.Delete, tables map[string]*schema.Table) (plan } table, tableErr := plan.setTable(tableName, tables) + // Deletes aren't supported on topics + if tableErr == nil && table.IsTopic() { + plan.Reason = ReasonTopic + return plan, nil + } + // In passthrough dml mode, allow the operation even if the // table is unknown in the schema. if PassthroughDMLs { @@ -202,6 +214,12 @@ func analyzeSelect(sel *sqlparser.Select, tables map[string]*schema.Table) (plan return nil, err } + // Selects aren't supported on topics + if table.IsTopic() { + plan.Reason = ReasonTopic + return plan, nil + } + if sel.Where != nil { comp, ok := sel.Where.Expr.(*sqlparser.ComparisonExpr) if ok && comp.IsImpossible() { @@ -323,6 +341,12 @@ func analyzeInsert(ins *sqlparser.Insert, tables map[string]*schema.Table) (plan } table, tableErr := plan.setTable(tableName, tables) + if tableErr == nil && table.IsTopic() { + plan.PlanID = PlanInsertTopic + plan.Reason = ReasonTopic + return plan, nil + } + switch { case tableErr == nil && table.Type == schema.Message: // message inserts need to continue being strict, even in passthrough dml mode, diff --git a/go/vt/vttablet/tabletserver/planbuilder/plan.go b/go/vt/vttablet/tabletserver/planbuilder/plan.go index 65aa41c71a1..b332c67dcf6 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/plan.go +++ b/go/vt/vttablet/tabletserver/planbuilder/plan.go @@ -69,6 +69,8 @@ const ( PlanInsertSubquery // PlanUpsertPK is for insert ... on duplicate key constructs. PlanUpsertPK + // PlanInsertTopic is for inserting into message topics. + PlanInsertTopic // PlanInsertMessage is for inserting into message tables. PlanInsertMessage // PlanSet is for SET statements. @@ -100,6 +102,7 @@ var planName = [NumPlans]string{ "INSERT_PK", "INSERT_SUBQUERY", "UPSERT_PK", + "INSERT_TOPIC", "INSERT_MESSAGE", "SET", "DDL", @@ -153,6 +156,7 @@ const ( ReasonUpsertMultiRow ReasonReplace ReasonMultiTable + ReasonTopic NumReasons ) @@ -167,6 +171,7 @@ var reasonName = [NumReasons]string{ "UPSERT_MULTI_ROW", "REPLACE", "MULTI_TABLE", + "TOPIC", } // String returns a string representation of a ReasonType. diff --git a/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt b/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt index b23e26fa529..5468c2eea0d 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt +++ b/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt @@ -958,6 +958,51 @@ options:PassthroughDMLs "PKValues": [[1, 3], [2, 4]] } +# topic insert with time_scheduled specified +"insert into test_topic(time_scheduled, id, message) values(1, 2, 'aa')" +{ + "PlanID": "INSERT_TOPIC", + "Reason": "TOPIC", + "TableName": "test_topic", + "Permissions": [ + { + "TableName": "test_topic", + "Role": 1 + } + ], + "FullQuery": "insert into test_topic(time_scheduled, id, message) values (1, 2, 'aa')" +} + +# topic update +"update test_topic set time_next = 1 where id = 1" +{ + "PlanID": "PASS_DML", + "Reason": "TOPIC", + "TableName": "test_topic", + "Permissions": [ + { + "TableName": "test_topic", + "Role": 1 + } + ], + "FullQuery": "update test_topic set time_next = 1 where id = 1" +} + +# topic delete +"delete from test_topic where id = 1" +{ + "PlanID": "PASS_DML", + "Reason": "TOPIC", + "TableName": "test_topic", + "Permissions": [ + { + "TableName": "test_topic", + "Role": 1 + } + ], + "FullQuery": "delete from test_topic where id = 1" +} + # message insert with time_scheduled specified "insert into msg(time_scheduled, id, message) values(1, 2, 'aa')" { diff --git a/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json b/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json index 1a23e93eea4..6441dd36eea 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json +++ b/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json @@ -355,6 +355,65 @@ ], "Type": 2 }, + { + "Name": "msg1_with_topic", + "Columns": [ + { + "Name": "time_scheduled" + }, + { + "Name": "id" + }, + { + "Name": "time_next" + }, + { + "Name": "epoch" + }, + { + "Name": "time_created" + }, + { + "Name": "time_acked" + }, + { + "Name": "message" + } + ], + "Indexes": [ + { + "Name": "PRIMARY", + "Unique": true, + "Columns": [ + "time_scheduled", + "id" + ], + "Cardinality": [ + 1 + ], + "DataColumns": [ + ] + } + ], + "PKColumns": [ + 0, + 1 + ], + "Type": 2, + "MessageInfo": { + "Topic": "test_topic" + } + }, + { + "Name": "test_topic", + "TopicInfo": { + "Subscribers": [ + { + "Name": "msg1_with_topic" + } + ] + } + }, { "Name": "dual", "Type": 0 diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 65ad2dc0961..38553c1eb81 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -180,6 +180,11 @@ func (se *Engine) Open() error { } se.tables = tables se.lastChange = curTime + + // register message topics on the engine if necessary + // must run after se.tables is set + se.registerTopics() + se.ticks.Start(func() { if err := se.Reload(ctx); err != nil { log.Errorf("periodic schema reload failed: %v", err) @@ -274,13 +279,24 @@ func (se *Engine) Reload(ctx context.Context) error { // Handle table drops var dropped []string - for tableName := range se.tables { + for tableName, table := range se.tables { if curTables[tableName] { continue } - delete(se.tables, tableName) - dropped = append(dropped, tableName) + + se.unregisterTopic(table) + + // only keep track of non-topic table drops + if !se.tables[tableName].IsTopic() { + dropped = append(dropped, tableName) + delete(se.tables, tableName) + } } + + // register message topics on the engine if necessary + // must run after se.tables is set + se.registerTopics() + // We only need to broadcast dropped tables because // tableWasCreatedOrAltered will broadcast the other changes. if len(dropped) > 0 { @@ -336,6 +352,7 @@ func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string tabletenv.InternalErrors.Add("Schema", 1) return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "tableWasCreatedOrAltered: failed to load table %s: %v", tableName, err) } + // table_rows, data_length, index_length, max_data_length table.SetMysqlStats(row[4], row[5], row[6], row[7], row[8]) @@ -355,6 +372,70 @@ func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string return nil } +// registerTopics optionally connects the vt_topic metadata on a message table +// to a map of topic strings. A table can belong to only one topic. +func (se *Engine) registerTopics() { + for _, table := range se.tables { + se.registerTopic(table) + } +} + +func (se *Engine) registerTopic(ta *Table) { + if ta.MessageInfo == nil || ta.MessageInfo.Topic == "" { + return + } + + topicName := ta.MessageInfo.Topic + topicTable, ok := se.tables[topicName] + if !ok { + // initialize topic table if necessary + topicTable = NewTable(topicName) + topicTable.TopicInfo = &TopicInfo{ + Subscribers: make([]*Table, 0, 1), + } + se.tables[topicName] = topicTable + log.Infof("creating topic table '%s'", topicName) + } else { + // check to see if this table is already registered to the topic + // so we don't double register + for _, t := range topicTable.TopicInfo.Subscribers { + if t.Name == ta.Name { + return + } + } + } + + // append this table to the list of subscribed tables to the topic + log.Infof("subscribing message table '%s' to topic '%s'", ta.Name.String(), topicName) + topicTable.TopicInfo.Subscribers = append(topicTable.TopicInfo.Subscribers, ta) +} + +func (se *Engine) unregisterTopic(ta *Table) { + if ta.MessageInfo == nil || ta.MessageInfo.Topic == "" { + return + } + + topicName := ta.MessageInfo.Topic + topicTable, ok := se.tables[topicName] + if !ok { + panic("topic should have already been created") + } + // remove this table from the topic + for i, t := range topicTable.TopicInfo.Subscribers { + // remove the table from the topic + if t.Name == ta.Name { + log.Infof("unsubscribing message table '%s' from topic '%s'", ta.Name.String(), topicName) + topicTable.TopicInfo.Subscribers = append(topicTable.TopicInfo.Subscribers[:i], topicTable.TopicInfo.Subscribers[i+1:]...) + } + } + + // delete the topic table if there are no more subscribers + if len(topicTable.TopicInfo.Subscribers) == 0 { + log.Infof("deleting topic table '%s'", topicName) + delete(se.tables, topicName) + } +} + // RegisterNotifier registers the function for schema change notification. // It also causes an immediate notification to the caller. The notified // function must not change the map or its contents. The only exception diff --git a/go/vt/vttablet/tabletserver/schema/load_table.go b/go/vt/vttablet/tabletserver/schema/load_table.go index c7dcf16aa19..96741cfc27f 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table.go +++ b/go/vt/vttablet/tabletserver/schema/load_table.go @@ -157,6 +157,12 @@ func loadMessageInfo(ta *Table, comment string) error { keyvals[kv[0]] = kv[1] } var err error + if ta.MessageInfo.Topic, err = getString(keyvals, "vt_topic"); err != nil { + // the topic is an optional value + if err.Error() != "attribute vt_topic not specified for message table" { + return err + } + } if ta.MessageInfo.AckWaitDuration, err = getDuration(keyvals, "vt_ack_wait"); err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/schema/load_table_test.go b/go/vt/vttablet/tabletserver/schema/load_table_test.go index e184c417149..7d1a583defb 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table_test.go +++ b/go/vt/vttablet/tabletserver/schema/load_table_test.go @@ -200,6 +200,83 @@ func TestLoadTableMessage(t *testing.T) { } } +func TestLoadTableMessageTopic(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + for query, result := range getMessageTableQueries() { + db.AddQuery(query, result) + } + table, err := newTestLoadTable("USER_TABLE", "vitess_message,vt_topic=test_topic,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30", db) + if err != nil { + t.Fatal(err) + } + want := &Table{ + Name: sqlparser.NewTableIdent("test_table"), + Type: Message, + MessageInfo: &MessageInfo{ + IDPKIndex: 1, + Fields: []*querypb.Field{{ + Name: "id", + Type: sqltypes.Int64, + }, { + Name: "time_scheduled", + Type: sqltypes.Int64, + }, { + Name: "message", + Type: sqltypes.VarBinary, + }}, + AckWaitDuration: 30 * time.Second, + PurgeAfterDuration: 120 * time.Second, + BatchSize: 1, + CacheSize: 10, + PollInterval: 30 * time.Second, + Topic: "test_topic", + }, + } + table.Columns = nil + table.Indexes = nil + table.PKColumns = nil + if !reflect.DeepEqual(table, want) { + t.Errorf("Table:\n%+v, want\n%+v", table, want) + t.Errorf("Table:\n%+v, want\n%+v", table.MessageInfo, want.MessageInfo) + } + + // Missing property + _, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_topic=test_topic,vt_ack_wait=30", db) + wanterr := "not specified for message table" + if err == nil || !strings.Contains(err.Error(), wanterr) { + t.Errorf("newTestLoadTable: %v, want %s", err, wanterr) + } + + // id column must be part of primary key. + for query, result := range getMessageTableQueries() { + db.AddQuery(query, result) + } + db.AddQuery( + "show index from test_table", + &sqltypes.Result{ + Fields: mysql.ShowIndexFromTableFields, + RowsAffected: 1, + Rows: [][]sqltypes.Value{ + mysql.ShowIndexFromTableRow("test_table", true, "PRIMARY", 1, "time_scheduled", false), + }, + }) + _, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_topic=test_topic,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30", db) + wanterr = "id column is not part of the primary key for message table: test_table" + if err == nil || err.Error() != wanterr { + t.Errorf("newTestLoadTable: %v, want %s", err, wanterr) + } + + for query, result := range getTestLoadTableQueries() { + db.AddQuery(query, result) + } + _, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_topic=test_topic,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30", db) + wanterr = "missing from message table: test_table" + if err == nil || !strings.Contains(err.Error(), wanterr) { + t.Errorf("newTestLoadTable: %v, must contain %s", err, wanterr) + } +} + func TestLoadTableWithBitColumn(t *testing.T) { db := fakesqldb.New(t) defer db.Close() diff --git a/go/vt/vttablet/tabletserver/schema/schema.go b/go/vt/vttablet/tabletserver/schema/schema.go index 5e11e33ceab..6748545af4c 100644 --- a/go/vt/vttablet/tabletserver/schema/schema.go +++ b/go/vt/vttablet/tabletserver/schema/schema.go @@ -68,6 +68,9 @@ type Table struct { // MessageInfo contains info for message tables. MessageInfo *MessageInfo + // TopicInfo contains info for message topics. + TopicInfo *TopicInfo + // These vars can be accessed concurrently. TableRows sync2.AtomicInt64 DataLength sync2.AtomicInt64 @@ -87,6 +90,13 @@ type SequenceInfo struct { LastVal int64 } +// TopicInfo contains info specific to message topics. +type TopicInfo struct { + // Subscribers links to all the message tables + // subscribed to this topic + Subscribers []*Table +} + // MessageInfo contains info specific to message tables. type MessageInfo struct { // IDPKIndex is the index of the ID column @@ -99,6 +109,10 @@ type MessageInfo struct { // returned for subscribers. Fields []*querypb.Field + // Optional topic to subscribe to. Any messages + // published to the topic will be added to this table. + Topic string + // AckWaitDuration specifies how long to wait after // the message was first sent. The back-off doubles // every attempt. @@ -201,6 +215,14 @@ func (ta *Table) HasPrimary() bool { return len(ta.Indexes) != 0 && ta.Indexes[0].Name.EqualString("primary") } +// IsTopic returns true if TopicInfo is not nil. +func (ta *Table) IsTopic() bool { + if ta.TopicInfo == nil { + return false + } + return true +} + // UniqueIndexes returns the number of unique indexes on the table func (ta *Table) UniqueIndexes() int { unique := 0 diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 4c62eca4b10..b428ec7dff9 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1010,30 +1010,64 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq if err != nil { return err } - qre := &QueryExecutor{ - query: query, - marginComments: comments, - bindVars: bindVariables, - transactionID: transactionID, - options: options, - plan: plan, - ctx: ctx, - logStats: logStats, - tsv: tsv, - } - extras := tsv.watcher.ComputeExtras(options) - result, err = qre.Execute() - if err != nil { - return err + if plan.Reason == planbuilder.ReasonTopic { + result, err = tsv.topicExecute(ctx, query, comments, bindVariables, transactionID, options, plan, logStats) + } else { + result, err = tsv.qreExecute(ctx, query, comments, bindVariables, transactionID, options, plan, logStats) } - result.Extras = extras - result = result.StripMetadata(sqltypes.IncludeFieldsOrDefault(options)) - return nil + + return err }, ) return result, err } +func (tsv *TabletServer) topicExecute(ctx context.Context, query string, comments sqlparser.MarginComments, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions, plan *TabletPlan, logStats *tabletenv.LogStats) (result *sqltypes.Result, err error) { + if plan.PlanID != planbuilder.PlanInsertTopic { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Only inserts allowed on topics") + } + + // choose InsertMessage as the new PlanID + plan.PlanID = planbuilder.PlanInsertMessage + for _, subscriber := range plan.Table.TopicInfo.Subscribers { + // replace the topic name with the subscribed message table name + newQuery := strings.Replace(query, plan.Table.Name.String(), subscriber.Name.String(), -1) + var newPlan *TabletPlan + newPlan, err = tsv.qe.GetPlan(ctx, logStats, newQuery, skipQueryPlanCache(options)) + if err != nil { + return nil, err + } + + // because there isn't an option to return multiple results, only the last + // message table result is returned + result, err = tsv.qreExecute(ctx, newQuery, comments, bindVariables, transactionID, options, newPlan, logStats) + } + return result, err +} + +func (tsv *TabletServer) qreExecute(ctx context.Context, query string, comments sqlparser.MarginComments, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions, plan *TabletPlan, logStats *tabletenv.LogStats) (result *sqltypes.Result, err error) { + qre := &QueryExecutor{ + query: query, + marginComments: comments, + bindVars: bindVariables, + transactionID: transactionID, + options: options, + plan: plan, + ctx: ctx, + logStats: logStats, + tsv: tsv, + } + extras := tsv.watcher.ComputeExtras(options) + result, err = qre.Execute() + if err != nil { + return nil, err + } + result.Extras = extras + result = result.StripMetadata(sqltypes.IncludeFieldsOrDefault(options)) + + return result, nil +} + // StreamExecute executes the query and streams the result. // The first QueryResult will have Fields set (and Rows nil). // The subsequent QueryResult will have Rows set (and Fields nil). From f6a162adbcbdb5d30bce27cb35fe7df87cf7f11a Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Sun, 28 Jul 2019 16:42:12 -0700 Subject: [PATCH 4/6] messages: resolve topic suggestions Signed-off-by: Derek Perkins --- .../vttablet/tabletserver/planbuilder/dml.go | 20 ++++++-------- .../planbuilder/testdata/exec_cases.txt | 26 ++----------------- .../tabletserver/schema/load_table.go | 16 +++--------- go/vt/vttablet/tabletserver/tabletserver.go | 8 +----- 4 files changed, 15 insertions(+), 55 deletions(-) diff --git a/go/vt/vttablet/tabletserver/planbuilder/dml.go b/go/vt/vttablet/tabletserver/planbuilder/dml.go index bc3ab72f5ba..3b5f35c155c 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/dml.go +++ b/go/vt/vttablet/tabletserver/planbuilder/dml.go @@ -52,8 +52,7 @@ func analyzeUpdate(upd *sqlparser.Update, tables map[string]*schema.Table) (plan // Updates aren't supported on topics if tableErr == nil && table.IsTopic() { - plan.Reason = ReasonTopic - return plan, nil + return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "updates not allowed on topics") } // In passthrough dml mode, allow the operation even if the @@ -126,8 +125,7 @@ func analyzeDelete(del *sqlparser.Delete, tables map[string]*schema.Table) (plan // Deletes aren't supported on topics if tableErr == nil && table.IsTopic() { - plan.Reason = ReasonTopic - return plan, nil + return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "deletes not allowed on topics") } // In passthrough dml mode, allow the operation even if the @@ -216,8 +214,7 @@ func analyzeSelect(sel *sqlparser.Select, tables map[string]*schema.Table) (plan // Selects aren't supported on topics if table.IsTopic() { - plan.Reason = ReasonTopic - return plan, nil + return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "selects not allowed on topics") } if sel.Where != nil { @@ -341,17 +338,16 @@ func analyzeInsert(ins *sqlparser.Insert, tables map[string]*schema.Table) (plan } table, tableErr := plan.setTable(tableName, tables) - if tableErr == nil && table.IsTopic() { - plan.PlanID = PlanInsertTopic - plan.Reason = ReasonTopic - return plan, nil - } - switch { case tableErr == nil && table.Type == schema.Message: // message inserts need to continue being strict, even in passthrough dml mode, // because field defaults are set here + case tableErr == nil && table.IsTopic(): + plan.PlanID = PlanInsertTopic + plan.Reason = ReasonTopic + return plan, nil + case PassthroughDMLs: // In passthrough dml mode, allow the operation even if the // table is unknown in the schema. diff --git a/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt b/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt index 5468c2eea0d..0090a3ebcdd 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt +++ b/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt @@ -975,33 +975,11 @@ options:PassthroughDMLs # topic update "update test_topic set time_next = 1 where id = 1" -{ - "PlanID": "PASS_DML", - "Reason": "TOPIC", - "TableName": "test_topic", - "Permissions": [ - { - "TableName": "test_topic", - "Role": 1 - } - ], - "FullQuery": "update test_topic set time_next = 1 where id = 1" -} +"updates not allowed on topics" # topic delete "delete from test_topic where id = 1" -{ - "PlanID": "PASS_DML", - "Reason": "TOPIC", - "TableName": "test_topic", - "Permissions": [ - { - "TableName": "test_topic", - "Role": 1 - } - ], - "FullQuery": "delete from test_topic where id = 1" -} +"deletes not allowed on topics" # message insert with time_scheduled specified "insert into msg(time_scheduled, id, message) values(1, 2, 'aa')" diff --git a/go/vt/vttablet/tabletserver/schema/load_table.go b/go/vt/vttablet/tabletserver/schema/load_table.go index 96741cfc27f..af57966d6ab 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table.go +++ b/go/vt/vttablet/tabletserver/schema/load_table.go @@ -157,12 +157,8 @@ func loadMessageInfo(ta *Table, comment string) error { keyvals[kv[0]] = kv[1] } var err error - if ta.MessageInfo.Topic, err = getString(keyvals, "vt_topic"); err != nil { - // the topic is an optional value - if err.Error() != "attribute vt_topic not specified for message table" { - return err - } - } + ta.MessageInfo.Topic = getTopic(keyvals) + if ta.MessageInfo.AckWaitDuration, err = getDuration(keyvals, "vt_ack_wait"); err != nil { return err } @@ -246,10 +242,6 @@ func getNum(in map[string]string, key string) (int, error) { return v, nil } -func getString(in map[string]string, key string) (string, error) { - sv := in[key] - if sv == "" { - return "", fmt.Errorf("attribute %s not specified for message table", key) - } - return sv, nil +func getTopic(in map[string]string) string { + return in["vt_topic"] } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index b428ec7dff9..b3d4566d8cb 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1010,7 +1010,7 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq if err != nil { return err } - if plan.Reason == planbuilder.ReasonTopic { + if plan.PlanID == planbuilder.PlanInsertTopic { result, err = tsv.topicExecute(ctx, query, comments, bindVariables, transactionID, options, plan, logStats) } else { result, err = tsv.qreExecute(ctx, query, comments, bindVariables, transactionID, options, plan, logStats) @@ -1023,12 +1023,6 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq } func (tsv *TabletServer) topicExecute(ctx context.Context, query string, comments sqlparser.MarginComments, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions, plan *TabletPlan, logStats *tabletenv.LogStats) (result *sqltypes.Result, err error) { - if plan.PlanID != planbuilder.PlanInsertTopic { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Only inserts allowed on topics") - } - - // choose InsertMessage as the new PlanID - plan.PlanID = planbuilder.PlanInsertMessage for _, subscriber := range plan.Table.TopicInfo.Subscribers { // replace the topic name with the subscribed message table name newQuery := strings.Replace(query, plan.Table.Name.String(), subscriber.Name.String(), -1) From c84cab21308f9cb6b8505e23283ff39fe8ad389f Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Wed, 31 Jul 2019 19:46:42 -0700 Subject: [PATCH 5/6] messages: rebuild topics on every table Reload Signed-off-by: Derek Perkins --- go/vt/vttablet/tabletserver/schema/engine.go | 38 +++++--------------- 1 file changed, 9 insertions(+), 29 deletions(-) diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 38553c1eb81..dc06d429f9f 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -279,13 +279,11 @@ func (se *Engine) Reload(ctx context.Context) error { // Handle table drops var dropped []string - for tableName, table := range se.tables { + for tableName := range se.tables { if curTables[tableName] { continue } - se.unregisterTopic(table) - // only keep track of non-topic table drops if !se.tables[tableName].IsTopic() { dropped = append(dropped, tableName) @@ -375,6 +373,14 @@ func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string // registerTopics optionally connects the vt_topic metadata on a message table // to a map of topic strings. A table can belong to only one topic. func (se *Engine) registerTopics() { + // first drop all topics + for tableName, table := range se.tables { + if table.IsTopic() { + delete(se.tables, tableName) + } + } + + // then register all the topics from scratch for _, table := range se.tables { se.registerTopic(table) } @@ -410,32 +416,6 @@ func (se *Engine) registerTopic(ta *Table) { topicTable.TopicInfo.Subscribers = append(topicTable.TopicInfo.Subscribers, ta) } -func (se *Engine) unregisterTopic(ta *Table) { - if ta.MessageInfo == nil || ta.MessageInfo.Topic == "" { - return - } - - topicName := ta.MessageInfo.Topic - topicTable, ok := se.tables[topicName] - if !ok { - panic("topic should have already been created") - } - // remove this table from the topic - for i, t := range topicTable.TopicInfo.Subscribers { - // remove the table from the topic - if t.Name == ta.Name { - log.Infof("unsubscribing message table '%s' from topic '%s'", ta.Name.String(), topicName) - topicTable.TopicInfo.Subscribers = append(topicTable.TopicInfo.Subscribers[:i], topicTable.TopicInfo.Subscribers[i+1:]...) - } - } - - // delete the topic table if there are no more subscribers - if len(topicTable.TopicInfo.Subscribers) == 0 { - log.Infof("deleting topic table '%s'", topicName) - delete(se.tables, topicName) - } -} - // RegisterNotifier registers the function for schema change notification. // It also causes an immediate notification to the caller. The notified // function must not change the map or its contents. The only exception From 24d3d1be8fec36646868335c0e33bfd39d2c3851 Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Wed, 14 Aug 2019 21:47:21 -0600 Subject: [PATCH 6/6] schema: delay broadcast until end of Reload Signed-off-by: Derek Perkins --- go/vt/vttablet/tabletserver/schema/engine.go | 36 +++++++++---------- .../tabletserver/schema/engine_test.go | 27 +++----------- 2 files changed, 23 insertions(+), 40 deletions(-) diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index dc06d429f9f..2afa16c8f53 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -262,6 +262,7 @@ func (se *Engine) Reload(ctx context.Context) error { // The following section requires us to hold mu. rec := concurrency.AllErrorRecorder{} curTables := map[string]bool{"dual": true} + var created, altered []string for _, row := range tableData.Rows { tableName := row[0].ToString() curTables[tableName] = true @@ -269,7 +270,13 @@ func (se *Engine) Reload(ctx context.Context) error { // Check if we know about the table or it has been recreated. if _, ok := se.tables[tableName]; !ok || createTime >= se.lastChange { log.Infof("Reloading schema for table: %s", tableName) - rec.RecordError(se.tableWasCreatedOrAltered(ctx, tableName)) + wasCreated, err := se.tableWasCreatedOrAltered(ctx, tableName) + rec.RecordError(err) + if wasCreated { + created = append(created, tableName) + } else { + altered = append(altered, tableName) + } } else { // Only update table_rows, data_length, index_length, max_data_length se.tables[tableName].SetMysqlStats(row[4], row[5], row[6], row[7], row[8]) @@ -295,11 +302,7 @@ func (se *Engine) Reload(ctx context.Context) error { // must run after se.tables is set se.registerTopics() - // We only need to broadcast dropped tables because - // tableWasCreatedOrAltered will broadcast the other changes. - if len(dropped) > 0 { - se.broadcast(nil, nil, dropped) - } + se.broadcast(created, altered, dropped) return rec.Error() } @@ -320,24 +323,24 @@ func (se *Engine) mysqlTime(ctx context.Context, conn *connpool.DBConn) (int64, // tableWasCreatedOrAltered must be called if a DDL was applied to that table. // the se.mu mutex _must_ be locked before entering this method -func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string) error { +func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string) (bool, error) { if !se.isOpen { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "DDL called on closed schema") + return false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "DDL called on closed schema") } conn, err := se.conns.Get(ctx) if err != nil { - return err + return false, err } defer conn.Recycle() tableData, err := conn.Exec(ctx, mysql.BaseShowTablesForTable(tableName), 1, false) if err != nil { tabletenv.InternalErrors.Add("Schema", 1) - return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "tableWasCreatedOrAltered: information_schema query failed for table %s: %v", tableName, err) + return false, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "tableWasCreatedOrAltered: information_schema query failed for table %s: %v", tableName, err) } if len(tableData.Rows) != 1 { // This can happen if DDLs race with each other. - return nil + return false, nil } row := tableData.Rows[0] table, err := LoadTable( @@ -348,26 +351,23 @@ func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string ) if err != nil { tabletenv.InternalErrors.Add("Schema", 1) - return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "tableWasCreatedOrAltered: failed to load table %s: %v", tableName, err) + return false, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "tableWasCreatedOrAltered: failed to load table %s: %v", tableName, err) } // table_rows, data_length, index_length, max_data_length table.SetMysqlStats(row[4], row[5], row[6], row[7], row[8]) - var created, altered []string + wasCreated := true if _, ok := se.tables[tableName]; ok { // If the table already exists, we overwrite it with the latest info. // This also means that the query cache needs to be cleared. // Otherwise, the query plans may not be in sync with the schema. log.Infof("Updating table %s", tableName) - altered = append(altered, tableName) - } else { - created = append(created, tableName) + wasCreated = false } se.tables[tableName] = table log.Infof("Initialized table: %s, type: %s", tableName, TypeNames[table.Type]) - se.broadcast(created, altered, nil) - return nil + return wasCreated, nil } // registerTopics optionally connects the vt_topic metadata on a message table diff --git a/go/vt/vttablet/tabletserver/schema/engine_test.go b/go/vt/vttablet/tabletserver/schema/engine_test.go index 09ab7089379..dcf3c4d8fc8 100644 --- a/go/vt/vttablet/tabletserver/schema/engine_test.go +++ b/go/vt/vttablet/tabletserver/schema/engine_test.go @@ -21,7 +21,6 @@ import ( "fmt" "net/http" "net/http/httptest" - "reflect" "strings" "testing" "time" @@ -267,29 +266,13 @@ func TestCreateOrUpdateTable(t *testing.T) { mysql.BaseShowTablesRow(existingTable, false, ""), }, }) - i := 0 - se.RegisterNotifier("test", func(schema map[string]*Table, created, altered, dropped []string) { - switch i { - case 0: - if len(created) != 5 { - t.Errorf("callback 0: %v, want len of 5\n", created) - } - case 1: - want := []string{"test_table_01"} - if !reflect.DeepEqual(altered, want) { - t.Errorf("callback 0: %v, want %v\n", created, want) - } - default: - t.Fatal("unexpected") - } - i++ - }) - defer se.UnregisterNotifier("test") - if err := se.tableWasCreatedOrAltered(context.Background(), "test_table_01"); err != nil { + + wasCreated, err := se.tableWasCreatedOrAltered(context.Background(), existingTable) + if err != nil { t.Fatal(err) } - if i < 2 { - t.Error("Notifier did not get called") + if wasCreated { + t.Error("wanted wasCreated == false") } }