diff --git a/go/vt/vttablet/endtoend/main_test.go b/go/vt/vttablet/endtoend/main_test.go index f341d8fba36..8bba8fc6db7 100644 --- a/go/vt/vttablet/endtoend/main_test.go +++ b/go/vt/vttablet/endtoend/main_test.go @@ -250,6 +250,27 @@ var tableACLConfig = `{ "writers": ["dev"], "admins": ["dev"] }, + { + "name": "test_topic", + "table_names_or_prefixes": ["test_topic"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_topic_subscriber_1", + "table_names_or_prefixes": ["vitess_topic_subscriber_1"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, + { + "name": "vitess_topic_subscriber_2", + "table_names_or_prefixes": ["vitess_topic_subscriber_2"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] + }, { "name": "vitess_acl_unmatched", "table_names_or_prefixes": ["vitess_acl_unmatched"], diff --git a/go/vt/vttablet/endtoend/message_test.go b/go/vt/vttablet/endtoend/message_test.go index 79a86227671..6a908c3582d 100644 --- a/go/vt/vttablet/endtoend/message_test.go +++ b/go/vt/vttablet/endtoend/message_test.go @@ -59,20 +59,7 @@ func TestMessage(t *testing.T) { } // Start goroutine to consume message stream. - go func() { - if err := client.MessageStream("vitess_message", func(qr *sqltypes.Result) error { - select { - case <-done: - return io.EOF - default: - } - ch <- qr - return nil - }); err != nil { - t.Error(err) - } - close(ch) - }() + go waitForMessage(t, client, "vitess_message", ch, done) got := <-ch want := &sqltypes.Result{ Fields: []*querypb.Field{{ @@ -248,20 +235,7 @@ func TestThreeColMessage(t *testing.T) { } defer client.Execute("drop table vitess_message3", nil) - go func() { - if err := client.MessageStream("vitess_message3", func(qr *sqltypes.Result) error { - select { - case <-done: - return io.EOF - default: - } - ch <- qr - return nil - }); err != nil { - t.Error(err) - } - close(ch) - }() + go waitForMessage(t, client, "vitess_message3", ch, done) // Verify fields. got := <-ch @@ -358,20 +332,7 @@ func TestMessageAuto(t *testing.T) { defer client.Execute("drop table vitess_message_auto", nil) // Start goroutine to consume message stream. - go func() { - if err := client.MessageStream("vitess_message_auto", func(qr *sqltypes.Result) error { - select { - case <-done: - return io.EOF - default: - } - ch <- qr - return nil - }); err != nil { - t.Error(err) - } - close(ch) - }() + go waitForMessage(t, client, "vitess_message_auto", ch, done) <-ch defer func() { close(done) }() @@ -461,3 +422,280 @@ func TestMessageAuto(t *testing.T) { t.Errorf("message received:\n%v, want\n%v", got, want) } } + +var createMessageTopic1 = `create table vitess_topic_subscriber_1( + time_scheduled bigint, + id bigint, + time_next bigint, + epoch bigint, + time_created bigint, + time_acked bigint, + message varchar(128), + primary key(time_scheduled, id), + unique index id_idx(id), + index next_idx(time_next, epoch)) +comment 'vitess_message,vt_topic=test_topic,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=1'` + +var createMessageTopic2 = `create table vitess_topic_subscriber_2( + time_scheduled bigint, + id bigint, + time_next bigint, + epoch bigint, + time_created bigint, + time_acked bigint, + message varchar(128), + primary key(time_scheduled, id), + unique index id_idx(id), + index next_idx(time_next, epoch)) +comment 'vitess_message,vt_topic=test_topic,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=1'` + +// TestMessageTopic tests for the case where id is an auto-inc column. +func TestMessageTopic(t *testing.T) { + ch1 := make(chan *sqltypes.Result) + ch2 := make(chan *sqltypes.Result) + done := make(chan struct{}) + client := framework.NewClient() + + // + // phase 1 tests inserts into a topic going to two subscribed message tables + // + if _, err := client.Execute(createMessageTopic1, nil); err != nil { + t.Fatal(err) + } + + if _, err := client.Execute(createMessageTopic2, nil); err != nil { + t.Fatal(err) + } + + // Start goroutines to consume message stream. + go waitForMessage(t, client, "vitess_topic_subscriber_1", ch1, done) + <-ch1 + go waitForMessage(t, client, "vitess_topic_subscriber_2", ch2, done) + <-ch2 + defer func() { close(done) }() + + // Create message. + err := client.Begin(false) + if err != nil { + t.Error(err) + return + } + // This insert should cause the engine to make a best-effort guess at generated ids. + // It will expedite the first two rows with null values, and the third row, and will + // give up on the last row, which should eventually be picked up by the poller. + _, err = client.Execute("insert into test_topic(id, message) values(1, 'msg1'), (2, 'msg2'), (3, 'msg3')", nil) + if err != nil { + t.Error(err) + return + } + + err = client.Commit() + if err != nil { + t.Error(err) + return + } + + // Only three messages should be queued on the first subscription table + if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_1.Queued"), 3; got != want { + t.Errorf("Messages/vitess_topic_subscriber_1.Queued: %d, want %d", got, want) + } + + // Only three messages should be queued on the second subscription table + if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_2.Queued"), 3; got != want { + t.Errorf("Messages/vitess_topic_subscriber_2.Queued: %d, want %d", got, want) + } + + wantResults := []*sqltypes.Result{{ + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt64(1), + sqltypes.NULL, + sqltypes.NewVarChar("msg1"), + }}, + }, { + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt64(2), + sqltypes.NULL, + sqltypes.NewVarChar("msg2"), + }}, + }, { + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt64(3), + sqltypes.NULL, + sqltypes.NewVarChar("msg3"), + }}, + }} + + // Consume first three messages + // and ensure they were received promptly. + start := time.Now() + for i := 0; i < 3; i++ { + // make sure the first message table received all three messages + got1 := <-ch1 + got1.Rows[0][1] = sqltypes.NULL + + // Results can come in any order. + found := false + for _, want := range wantResults { + if reflect.DeepEqual(got1, want) { + found = true + } + } + if !found { + t.Errorf("message fetch 1: %v not found in expected list: %v", got1, wantResults) + } + + // make sure the second message table received all three messages + got2 := <-ch2 + got2.Rows[0][1] = sqltypes.NULL + + // Results can come in any order. + found = false + for _, want := range wantResults { + if reflect.DeepEqual(got2, want) { + found = true + } + } + if !found { + t.Errorf("message fetch 2: %v not found in expected list: %v", got2, wantResults) + } + } + if d := time.Since(start); d > 1*time.Second { + t.Errorf("messages were delayed: %v", d) + } + + // ack the first subscriber + _, err = client.MessageAck("vitess_topic_subscriber_1", []string{"1, 2, 3"}) + if err != nil { + t.Error(err) + } + + // ack the second subscriber + _, err = client.MessageAck("vitess_topic_subscriber_2", []string{"1, 2, 3"}) + if err != nil { + t.Error(err) + } + + // + // phase 2 tests deleting one of the subscribers and making sure + // that inserts into a topic go to one subscribed message table + // + + client.Execute("drop table vitess_topic_subscriber_1", nil) + + // Create message. + err = client.Begin(false) + if err != nil { + t.Error(err) + return + } + // This insert should cause the engine to make a best-effort guess at generated ids. + // It will expedite the first two rows with null values, and the third row, and will + // give up on the last row, which should eventually be picked up by the poller. + _, err = client.Execute("insert into test_topic(id, message) values(4, 'msg4'), (5, 'msg5'), (6, 'msg6')", nil) + if err != nil { + t.Error(err) + return + } + + err = client.Commit() + if err != nil { + t.Error(err) + return + } + + // no messages should be queued on the first subscription table + if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_1.Queued"), 3; got != want { + t.Errorf("Messages/vitess_topic_subscriber_1.Queued: %d, want %d", got, want) + } + + // Only three messages should be queued on the second subscription table + if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_2.Queued"), 6; got != want { + t.Errorf("Messages/vitess_topic_subscriber_2.Queued: %d, want %d", got, want) + } + + wantResults = []*sqltypes.Result{{ + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt64(4), + sqltypes.NULL, + sqltypes.NewVarChar("msg4"), + }}, + }, { + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt64(5), + sqltypes.NULL, + sqltypes.NewVarChar("msg5"), + }}, + }, { + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt64(6), + sqltypes.NULL, + sqltypes.NewVarChar("msg6"), + }}, + }} + + // Consume first three messages + // and ensure they were received promptly. + start = time.Now() + for i := 0; i < 3; i++ { + // make sure the second message table received all three messages + got2 := <-ch2 + got2.Rows[0][1] = sqltypes.NULL + + // Results can come in any order. + found := false + for _, want := range wantResults { + if reflect.DeepEqual(got2, want) { + found = true + } + } + if !found { + t.Errorf("message fetch 2: %v not found in expected list: %v", got2, wantResults) + } + } + if d := time.Since(start); d > 1*time.Second { + t.Errorf("messages were delayed: %v", d) + } + + // ack the second subscriber + _, err = client.MessageAck("vitess_topic_subscriber_2", []string{"4, 5, 6"}) + if err != nil { + t.Error(err) + } + + // + // phase 3 tests deleting the last subscriber and making sure + // that inserts into a topic error out with table not found + // + + // remove the second subscriber which should remove the topic + if _, err := client.Execute("drop table vitess_topic_subscriber_2", nil); err != nil { + t.Fatal(err) + } + + // this should fail because the topic doesn't exist. Any other outcome fails the test + _, err = client.Execute("insert into test_topic(id, message) values(4, 'msg4'), (5, 'msg5'), (6, 'msg6')", nil) + switch { + case err == nil: + t.Error("test_topic shouldn't have existed for inserts to succeed") + + case err.Error() == "table test_topic not found in schema (CallerID: dev)": + + default: + t.Error(err) + } +} + +func waitForMessage(t *testing.T, client *framework.QueryClient, tableName string, ch chan *sqltypes.Result, done chan struct{}) { + if err := client.MessageStream(tableName, func(qr *sqltypes.Result) error { + select { + case <-done: + return io.EOF + default: + } + ch <- qr + return nil + }); err != nil { + t.Error(err) + } + close(ch) +} diff --git a/go/vt/vttablet/tabletserver/planbuilder/dml.go b/go/vt/vttablet/tabletserver/planbuilder/dml.go index 5e67140fa72..3b5f35c155c 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/dml.go +++ b/go/vt/vttablet/tabletserver/planbuilder/dml.go @@ -50,6 +50,11 @@ func analyzeUpdate(upd *sqlparser.Update, tables map[string]*schema.Table) (plan } table, tableErr := plan.setTable(tableName, tables) + // Updates aren't supported on topics + if tableErr == nil && table.IsTopic() { + return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "updates not allowed on topics") + } + // In passthrough dml mode, allow the operation even if the // table is unknown in the schema. if PassthroughDMLs { @@ -118,6 +123,11 @@ func analyzeDelete(del *sqlparser.Delete, tables map[string]*schema.Table) (plan } table, tableErr := plan.setTable(tableName, tables) + // Deletes aren't supported on topics + if tableErr == nil && table.IsTopic() { + return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "deletes not allowed on topics") + } + // In passthrough dml mode, allow the operation even if the // table is unknown in the schema. if PassthroughDMLs { @@ -202,6 +212,11 @@ func analyzeSelect(sel *sqlparser.Select, tables map[string]*schema.Table) (plan return nil, err } + // Selects aren't supported on topics + if table.IsTopic() { + return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "selects not allowed on topics") + } + if sel.Where != nil { comp, ok := sel.Where.Expr.(*sqlparser.ComparisonExpr) if ok && comp.IsImpossible() { @@ -328,6 +343,11 @@ func analyzeInsert(ins *sqlparser.Insert, tables map[string]*schema.Table) (plan // message inserts need to continue being strict, even in passthrough dml mode, // because field defaults are set here + case tableErr == nil && table.IsTopic(): + plan.PlanID = PlanInsertTopic + plan.Reason = ReasonTopic + return plan, nil + case PassthroughDMLs: // In passthrough dml mode, allow the operation even if the // table is unknown in the schema. diff --git a/go/vt/vttablet/tabletserver/planbuilder/plan.go b/go/vt/vttablet/tabletserver/planbuilder/plan.go index 65aa41c71a1..b332c67dcf6 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/plan.go +++ b/go/vt/vttablet/tabletserver/planbuilder/plan.go @@ -69,6 +69,8 @@ const ( PlanInsertSubquery // PlanUpsertPK is for insert ... on duplicate key constructs. PlanUpsertPK + // PlanInsertTopic is for inserting into message topics. + PlanInsertTopic // PlanInsertMessage is for inserting into message tables. PlanInsertMessage // PlanSet is for SET statements. @@ -100,6 +102,7 @@ var planName = [NumPlans]string{ "INSERT_PK", "INSERT_SUBQUERY", "UPSERT_PK", + "INSERT_TOPIC", "INSERT_MESSAGE", "SET", "DDL", @@ -153,6 +156,7 @@ const ( ReasonUpsertMultiRow ReasonReplace ReasonMultiTable + ReasonTopic NumReasons ) @@ -167,6 +171,7 @@ var reasonName = [NumReasons]string{ "UPSERT_MULTI_ROW", "REPLACE", "MULTI_TABLE", + "TOPIC", } // String returns a string representation of a ReasonType. diff --git a/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt b/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt index b23e26fa529..0090a3ebcdd 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt +++ b/go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt @@ -958,6 +958,29 @@ options:PassthroughDMLs "PKValues": [[1, 3], [2, 4]] } +# topic insert with time_scheduled specified +"insert into test_topic(time_scheduled, id, message) values(1, 2, 'aa')" +{ + "PlanID": "INSERT_TOPIC", + "Reason": "TOPIC", + "TableName": "test_topic", + "Permissions": [ + { + "TableName": "test_topic", + "Role": 1 + } + ], + "FullQuery": "insert into test_topic(time_scheduled, id, message) values (1, 2, 'aa')" +} + +# topic update +"update test_topic set time_next = 1 where id = 1" +"updates not allowed on topics" + +# topic delete +"delete from test_topic where id = 1" +"deletes not allowed on topics" + # message insert with time_scheduled specified "insert into msg(time_scheduled, id, message) values(1, 2, 'aa')" { diff --git a/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json b/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json index 1a23e93eea4..6441dd36eea 100644 --- a/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json +++ b/go/vt/vttablet/tabletserver/planbuilder/testdata/schema_test.json @@ -355,6 +355,65 @@ ], "Type": 2 }, + { + "Name": "msg1_with_topic", + "Columns": [ + { + "Name": "time_scheduled" + }, + { + "Name": "id" + }, + { + "Name": "time_next" + }, + { + "Name": "epoch" + }, + { + "Name": "time_created" + }, + { + "Name": "time_acked" + }, + { + "Name": "message" + } + ], + "Indexes": [ + { + "Name": "PRIMARY", + "Unique": true, + "Columns": [ + "time_scheduled", + "id" + ], + "Cardinality": [ + 1 + ], + "DataColumns": [ + ] + } + ], + "PKColumns": [ + 0, + 1 + ], + "Type": 2, + "MessageInfo": { + "Topic": "test_topic" + } + }, + { + "Name": "test_topic", + "TopicInfo": { + "Subscribers": [ + { + "Name": "msg1_with_topic" + } + ] + } + }, { "Name": "dual", "Type": 0 diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 65ad2dc0961..2afa16c8f53 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -180,6 +180,11 @@ func (se *Engine) Open() error { } se.tables = tables se.lastChange = curTime + + // register message topics on the engine if necessary + // must run after se.tables is set + se.registerTopics() + se.ticks.Start(func() { if err := se.Reload(ctx); err != nil { log.Errorf("periodic schema reload failed: %v", err) @@ -257,6 +262,7 @@ func (se *Engine) Reload(ctx context.Context) error { // The following section requires us to hold mu. rec := concurrency.AllErrorRecorder{} curTables := map[string]bool{"dual": true} + var created, altered []string for _, row := range tableData.Rows { tableName := row[0].ToString() curTables[tableName] = true @@ -264,7 +270,13 @@ func (se *Engine) Reload(ctx context.Context) error { // Check if we know about the table or it has been recreated. if _, ok := se.tables[tableName]; !ok || createTime >= se.lastChange { log.Infof("Reloading schema for table: %s", tableName) - rec.RecordError(se.tableWasCreatedOrAltered(ctx, tableName)) + wasCreated, err := se.tableWasCreatedOrAltered(ctx, tableName) + rec.RecordError(err) + if wasCreated { + created = append(created, tableName) + } else { + altered = append(altered, tableName) + } } else { // Only update table_rows, data_length, index_length, max_data_length se.tables[tableName].SetMysqlStats(row[4], row[5], row[6], row[7], row[8]) @@ -278,14 +290,19 @@ func (se *Engine) Reload(ctx context.Context) error { if curTables[tableName] { continue } - delete(se.tables, tableName) - dropped = append(dropped, tableName) - } - // We only need to broadcast dropped tables because - // tableWasCreatedOrAltered will broadcast the other changes. - if len(dropped) > 0 { - se.broadcast(nil, nil, dropped) + + // only keep track of non-topic table drops + if !se.tables[tableName].IsTopic() { + dropped = append(dropped, tableName) + delete(se.tables, tableName) + } } + + // register message topics on the engine if necessary + // must run after se.tables is set + se.registerTopics() + + se.broadcast(created, altered, dropped) return rec.Error() } @@ -306,24 +323,24 @@ func (se *Engine) mysqlTime(ctx context.Context, conn *connpool.DBConn) (int64, // tableWasCreatedOrAltered must be called if a DDL was applied to that table. // the se.mu mutex _must_ be locked before entering this method -func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string) error { +func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string) (bool, error) { if !se.isOpen { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "DDL called on closed schema") + return false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "DDL called on closed schema") } conn, err := se.conns.Get(ctx) if err != nil { - return err + return false, err } defer conn.Recycle() tableData, err := conn.Exec(ctx, mysql.BaseShowTablesForTable(tableName), 1, false) if err != nil { tabletenv.InternalErrors.Add("Schema", 1) - return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "tableWasCreatedOrAltered: information_schema query failed for table %s: %v", tableName, err) + return false, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "tableWasCreatedOrAltered: information_schema query failed for table %s: %v", tableName, err) } if len(tableData.Rows) != 1 { // This can happen if DDLs race with each other. - return nil + return false, nil } row := tableData.Rows[0] table, err := LoadTable( @@ -334,25 +351,69 @@ func (se *Engine) tableWasCreatedOrAltered(ctx context.Context, tableName string ) if err != nil { tabletenv.InternalErrors.Add("Schema", 1) - return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "tableWasCreatedOrAltered: failed to load table %s: %v", tableName, err) + return false, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "tableWasCreatedOrAltered: failed to load table %s: %v", tableName, err) } + // table_rows, data_length, index_length, max_data_length table.SetMysqlStats(row[4], row[5], row[6], row[7], row[8]) - var created, altered []string + wasCreated := true if _, ok := se.tables[tableName]; ok { // If the table already exists, we overwrite it with the latest info. // This also means that the query cache needs to be cleared. // Otherwise, the query plans may not be in sync with the schema. log.Infof("Updating table %s", tableName) - altered = append(altered, tableName) - } else { - created = append(created, tableName) + wasCreated = false } se.tables[tableName] = table log.Infof("Initialized table: %s, type: %s", tableName, TypeNames[table.Type]) - se.broadcast(created, altered, nil) - return nil + return wasCreated, nil +} + +// registerTopics optionally connects the vt_topic metadata on a message table +// to a map of topic strings. A table can belong to only one topic. +func (se *Engine) registerTopics() { + // first drop all topics + for tableName, table := range se.tables { + if table.IsTopic() { + delete(se.tables, tableName) + } + } + + // then register all the topics from scratch + for _, table := range se.tables { + se.registerTopic(table) + } +} + +func (se *Engine) registerTopic(ta *Table) { + if ta.MessageInfo == nil || ta.MessageInfo.Topic == "" { + return + } + + topicName := ta.MessageInfo.Topic + topicTable, ok := se.tables[topicName] + if !ok { + // initialize topic table if necessary + topicTable = NewTable(topicName) + topicTable.TopicInfo = &TopicInfo{ + Subscribers: make([]*Table, 0, 1), + } + se.tables[topicName] = topicTable + log.Infof("creating topic table '%s'", topicName) + } else { + // check to see if this table is already registered to the topic + // so we don't double register + for _, t := range topicTable.TopicInfo.Subscribers { + if t.Name == ta.Name { + return + } + } + } + + // append this table to the list of subscribed tables to the topic + log.Infof("subscribing message table '%s' to topic '%s'", ta.Name.String(), topicName) + topicTable.TopicInfo.Subscribers = append(topicTable.TopicInfo.Subscribers, ta) } // RegisterNotifier registers the function for schema change notification. diff --git a/go/vt/vttablet/tabletserver/schema/engine_test.go b/go/vt/vttablet/tabletserver/schema/engine_test.go index 09ab7089379..dcf3c4d8fc8 100644 --- a/go/vt/vttablet/tabletserver/schema/engine_test.go +++ b/go/vt/vttablet/tabletserver/schema/engine_test.go @@ -21,7 +21,6 @@ import ( "fmt" "net/http" "net/http/httptest" - "reflect" "strings" "testing" "time" @@ -267,29 +266,13 @@ func TestCreateOrUpdateTable(t *testing.T) { mysql.BaseShowTablesRow(existingTable, false, ""), }, }) - i := 0 - se.RegisterNotifier("test", func(schema map[string]*Table, created, altered, dropped []string) { - switch i { - case 0: - if len(created) != 5 { - t.Errorf("callback 0: %v, want len of 5\n", created) - } - case 1: - want := []string{"test_table_01"} - if !reflect.DeepEqual(altered, want) { - t.Errorf("callback 0: %v, want %v\n", created, want) - } - default: - t.Fatal("unexpected") - } - i++ - }) - defer se.UnregisterNotifier("test") - if err := se.tableWasCreatedOrAltered(context.Background(), "test_table_01"); err != nil { + + wasCreated, err := se.tableWasCreatedOrAltered(context.Background(), existingTable) + if err != nil { t.Fatal(err) } - if i < 2 { - t.Error("Notifier did not get called") + if wasCreated { + t.Error("wanted wasCreated == false") } } diff --git a/go/vt/vttablet/tabletserver/schema/load_table.go b/go/vt/vttablet/tabletserver/schema/load_table.go index 3257af030d0..af57966d6ab 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table.go +++ b/go/vt/vttablet/tabletserver/schema/load_table.go @@ -157,6 +157,8 @@ func loadMessageInfo(ta *Table, comment string) error { keyvals[kv[0]] = kv[1] } var err error + ta.MessageInfo.Topic = getTopic(keyvals) + if ta.MessageInfo.AckWaitDuration, err = getDuration(keyvals, "vt_ack_wait"); err != nil { return err } @@ -239,3 +241,7 @@ func getNum(in map[string]string, key string) (int, error) { } return v, nil } + +func getTopic(in map[string]string) string { + return in["vt_topic"] +} diff --git a/go/vt/vttablet/tabletserver/schema/load_table_test.go b/go/vt/vttablet/tabletserver/schema/load_table_test.go index e184c417149..7d1a583defb 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table_test.go +++ b/go/vt/vttablet/tabletserver/schema/load_table_test.go @@ -200,6 +200,83 @@ func TestLoadTableMessage(t *testing.T) { } } +func TestLoadTableMessageTopic(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + for query, result := range getMessageTableQueries() { + db.AddQuery(query, result) + } + table, err := newTestLoadTable("USER_TABLE", "vitess_message,vt_topic=test_topic,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30", db) + if err != nil { + t.Fatal(err) + } + want := &Table{ + Name: sqlparser.NewTableIdent("test_table"), + Type: Message, + MessageInfo: &MessageInfo{ + IDPKIndex: 1, + Fields: []*querypb.Field{{ + Name: "id", + Type: sqltypes.Int64, + }, { + Name: "time_scheduled", + Type: sqltypes.Int64, + }, { + Name: "message", + Type: sqltypes.VarBinary, + }}, + AckWaitDuration: 30 * time.Second, + PurgeAfterDuration: 120 * time.Second, + BatchSize: 1, + CacheSize: 10, + PollInterval: 30 * time.Second, + Topic: "test_topic", + }, + } + table.Columns = nil + table.Indexes = nil + table.PKColumns = nil + if !reflect.DeepEqual(table, want) { + t.Errorf("Table:\n%+v, want\n%+v", table, want) + t.Errorf("Table:\n%+v, want\n%+v", table.MessageInfo, want.MessageInfo) + } + + // Missing property + _, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_topic=test_topic,vt_ack_wait=30", db) + wanterr := "not specified for message table" + if err == nil || !strings.Contains(err.Error(), wanterr) { + t.Errorf("newTestLoadTable: %v, want %s", err, wanterr) + } + + // id column must be part of primary key. + for query, result := range getMessageTableQueries() { + db.AddQuery(query, result) + } + db.AddQuery( + "show index from test_table", + &sqltypes.Result{ + Fields: mysql.ShowIndexFromTableFields, + RowsAffected: 1, + Rows: [][]sqltypes.Value{ + mysql.ShowIndexFromTableRow("test_table", true, "PRIMARY", 1, "time_scheduled", false), + }, + }) + _, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_topic=test_topic,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30", db) + wanterr = "id column is not part of the primary key for message table: test_table" + if err == nil || err.Error() != wanterr { + t.Errorf("newTestLoadTable: %v, want %s", err, wanterr) + } + + for query, result := range getTestLoadTableQueries() { + db.AddQuery(query, result) + } + _, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_topic=test_topic,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30", db) + wanterr = "missing from message table: test_table" + if err == nil || !strings.Contains(err.Error(), wanterr) { + t.Errorf("newTestLoadTable: %v, must contain %s", err, wanterr) + } +} + func TestLoadTableWithBitColumn(t *testing.T) { db := fakesqldb.New(t) defer db.Close() diff --git a/go/vt/vttablet/tabletserver/schema/schema.go b/go/vt/vttablet/tabletserver/schema/schema.go index 5e11e33ceab..6748545af4c 100644 --- a/go/vt/vttablet/tabletserver/schema/schema.go +++ b/go/vt/vttablet/tabletserver/schema/schema.go @@ -68,6 +68,9 @@ type Table struct { // MessageInfo contains info for message tables. MessageInfo *MessageInfo + // TopicInfo contains info for message topics. + TopicInfo *TopicInfo + // These vars can be accessed concurrently. TableRows sync2.AtomicInt64 DataLength sync2.AtomicInt64 @@ -87,6 +90,13 @@ type SequenceInfo struct { LastVal int64 } +// TopicInfo contains info specific to message topics. +type TopicInfo struct { + // Subscribers links to all the message tables + // subscribed to this topic + Subscribers []*Table +} + // MessageInfo contains info specific to message tables. type MessageInfo struct { // IDPKIndex is the index of the ID column @@ -99,6 +109,10 @@ type MessageInfo struct { // returned for subscribers. Fields []*querypb.Field + // Optional topic to subscribe to. Any messages + // published to the topic will be added to this table. + Topic string + // AckWaitDuration specifies how long to wait after // the message was first sent. The back-off doubles // every attempt. @@ -201,6 +215,14 @@ func (ta *Table) HasPrimary() bool { return len(ta.Indexes) != 0 && ta.Indexes[0].Name.EqualString("primary") } +// IsTopic returns true if TopicInfo is not nil. +func (ta *Table) IsTopic() bool { + if ta.TopicInfo == nil { + return false + } + return true +} + // UniqueIndexes returns the number of unique indexes on the table func (ta *Table) UniqueIndexes() int { unique := 0 diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 4c62eca4b10..b3d4566d8cb 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1010,30 +1010,58 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq if err != nil { return err } - qre := &QueryExecutor{ - query: query, - marginComments: comments, - bindVars: bindVariables, - transactionID: transactionID, - options: options, - plan: plan, - ctx: ctx, - logStats: logStats, - tsv: tsv, - } - extras := tsv.watcher.ComputeExtras(options) - result, err = qre.Execute() - if err != nil { - return err + if plan.PlanID == planbuilder.PlanInsertTopic { + result, err = tsv.topicExecute(ctx, query, comments, bindVariables, transactionID, options, plan, logStats) + } else { + result, err = tsv.qreExecute(ctx, query, comments, bindVariables, transactionID, options, plan, logStats) } - result.Extras = extras - result = result.StripMetadata(sqltypes.IncludeFieldsOrDefault(options)) - return nil + + return err }, ) return result, err } +func (tsv *TabletServer) topicExecute(ctx context.Context, query string, comments sqlparser.MarginComments, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions, plan *TabletPlan, logStats *tabletenv.LogStats) (result *sqltypes.Result, err error) { + for _, subscriber := range plan.Table.TopicInfo.Subscribers { + // replace the topic name with the subscribed message table name + newQuery := strings.Replace(query, plan.Table.Name.String(), subscriber.Name.String(), -1) + var newPlan *TabletPlan + newPlan, err = tsv.qe.GetPlan(ctx, logStats, newQuery, skipQueryPlanCache(options)) + if err != nil { + return nil, err + } + + // because there isn't an option to return multiple results, only the last + // message table result is returned + result, err = tsv.qreExecute(ctx, newQuery, comments, bindVariables, transactionID, options, newPlan, logStats) + } + return result, err +} + +func (tsv *TabletServer) qreExecute(ctx context.Context, query string, comments sqlparser.MarginComments, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions, plan *TabletPlan, logStats *tabletenv.LogStats) (result *sqltypes.Result, err error) { + qre := &QueryExecutor{ + query: query, + marginComments: comments, + bindVars: bindVariables, + transactionID: transactionID, + options: options, + plan: plan, + ctx: ctx, + logStats: logStats, + tsv: tsv, + } + extras := tsv.watcher.ComputeExtras(options) + result, err = qre.Execute() + if err != nil { + return nil, err + } + result.Extras = extras + result = result.StripMetadata(sqltypes.IncludeFieldsOrDefault(options)) + + return result, nil +} + // StreamExecute executes the query and streams the result. // The first QueryResult will have Fields set (and Rows nil). // The subsequent QueryResult will have Rows set (and Fields nil).