Skip to content

Commit

Permalink
Merge pull request #5011 from nozzle/message-topics-pseudo-table
Browse files Browse the repository at this point in the history
messages: add support for topics that route inserts to multiple subscriber message tables
  • Loading branch information
sougou authored Aug 17, 2019
2 parents a7983a3 + 24d3d1b commit e34746c
Show file tree
Hide file tree
Showing 12 changed files with 645 additions and 102 deletions.
21 changes: 21 additions & 0 deletions go/vt/vttablet/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,27 @@ var tableACLConfig = `{
"writers": ["dev"],
"admins": ["dev"]
},
{
"name": "test_topic",
"table_names_or_prefixes": ["test_topic"],
"readers": ["dev"],
"writers": ["dev"],
"admins": ["dev"]
},
{
"name": "vitess_topic_subscriber_1",
"table_names_or_prefixes": ["vitess_topic_subscriber_1"],
"readers": ["dev"],
"writers": ["dev"],
"admins": ["dev"]
},
{
"name": "vitess_topic_subscriber_2",
"table_names_or_prefixes": ["vitess_topic_subscriber_2"],
"readers": ["dev"],
"writers": ["dev"],
"admins": ["dev"]
},
{
"name": "vitess_acl_unmatched",
"table_names_or_prefixes": ["vitess_acl_unmatched"],
Expand Down
322 changes: 280 additions & 42 deletions go/vt/vttablet/endtoend/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,7 @@ func TestMessage(t *testing.T) {
}

// Start goroutine to consume message stream.
go func() {
if err := client.MessageStream("vitess_message", func(qr *sqltypes.Result) error {
select {
case <-done:
return io.EOF
default:
}
ch <- qr
return nil
}); err != nil {
t.Error(err)
}
close(ch)
}()
go waitForMessage(t, client, "vitess_message", ch, done)
got := <-ch
want := &sqltypes.Result{
Fields: []*querypb.Field{{
Expand Down Expand Up @@ -248,20 +235,7 @@ func TestThreeColMessage(t *testing.T) {
}
defer client.Execute("drop table vitess_message3", nil)

go func() {
if err := client.MessageStream("vitess_message3", func(qr *sqltypes.Result) error {
select {
case <-done:
return io.EOF
default:
}
ch <- qr
return nil
}); err != nil {
t.Error(err)
}
close(ch)
}()
go waitForMessage(t, client, "vitess_message3", ch, done)

// Verify fields.
got := <-ch
Expand Down Expand Up @@ -358,20 +332,7 @@ func TestMessageAuto(t *testing.T) {
defer client.Execute("drop table vitess_message_auto", nil)

// Start goroutine to consume message stream.
go func() {
if err := client.MessageStream("vitess_message_auto", func(qr *sqltypes.Result) error {
select {
case <-done:
return io.EOF
default:
}
ch <- qr
return nil
}); err != nil {
t.Error(err)
}
close(ch)
}()
go waitForMessage(t, client, "vitess_message_auto", ch, done)
<-ch
defer func() { close(done) }()

Expand Down Expand Up @@ -461,3 +422,280 @@ func TestMessageAuto(t *testing.T) {
t.Errorf("message received:\n%v, want\n%v", got, want)
}
}

var createMessageTopic1 = `create table vitess_topic_subscriber_1(
time_scheduled bigint,
id bigint,
time_next bigint,
epoch bigint,
time_created bigint,
time_acked bigint,
message varchar(128),
primary key(time_scheduled, id),
unique index id_idx(id),
index next_idx(time_next, epoch))
comment 'vitess_message,vt_topic=test_topic,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=1'`

var createMessageTopic2 = `create table vitess_topic_subscriber_2(
time_scheduled bigint,
id bigint,
time_next bigint,
epoch bigint,
time_created bigint,
time_acked bigint,
message varchar(128),
primary key(time_scheduled, id),
unique index id_idx(id),
index next_idx(time_next, epoch))
comment 'vitess_message,vt_topic=test_topic,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=1'`

// TestMessageTopic tests for the case where id is an auto-inc column.
func TestMessageTopic(t *testing.T) {
ch1 := make(chan *sqltypes.Result)
ch2 := make(chan *sqltypes.Result)
done := make(chan struct{})
client := framework.NewClient()

//
// phase 1 tests inserts into a topic going to two subscribed message tables
//
if _, err := client.Execute(createMessageTopic1, nil); err != nil {
t.Fatal(err)
}

if _, err := client.Execute(createMessageTopic2, nil); err != nil {
t.Fatal(err)
}

// Start goroutines to consume message stream.
go waitForMessage(t, client, "vitess_topic_subscriber_1", ch1, done)
<-ch1
go waitForMessage(t, client, "vitess_topic_subscriber_2", ch2, done)
<-ch2
defer func() { close(done) }()

// Create message.
err := client.Begin(false)
if err != nil {
t.Error(err)
return
}
// This insert should cause the engine to make a best-effort guess at generated ids.
// It will expedite the first two rows with null values, and the third row, and will
// give up on the last row, which should eventually be picked up by the poller.
_, err = client.Execute("insert into test_topic(id, message) values(1, 'msg1'), (2, 'msg2'), (3, 'msg3')", nil)
if err != nil {
t.Error(err)
return
}

err = client.Commit()
if err != nil {
t.Error(err)
return
}

// Only three messages should be queued on the first subscription table
if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_1.Queued"), 3; got != want {
t.Errorf("Messages/vitess_topic_subscriber_1.Queued: %d, want %d", got, want)
}

// Only three messages should be queued on the second subscription table
if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_2.Queued"), 3; got != want {
t.Errorf("Messages/vitess_topic_subscriber_2.Queued: %d, want %d", got, want)
}

wantResults := []*sqltypes.Result{{
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NULL,
sqltypes.NewVarChar("msg1"),
}},
}, {
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(2),
sqltypes.NULL,
sqltypes.NewVarChar("msg2"),
}},
}, {
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(3),
sqltypes.NULL,
sqltypes.NewVarChar("msg3"),
}},
}}

// Consume first three messages
// and ensure they were received promptly.
start := time.Now()
for i := 0; i < 3; i++ {
// make sure the first message table received all three messages
got1 := <-ch1
got1.Rows[0][1] = sqltypes.NULL

// Results can come in any order.
found := false
for _, want := range wantResults {
if reflect.DeepEqual(got1, want) {
found = true
}
}
if !found {
t.Errorf("message fetch 1: %v not found in expected list: %v", got1, wantResults)
}

// make sure the second message table received all three messages
got2 := <-ch2
got2.Rows[0][1] = sqltypes.NULL

// Results can come in any order.
found = false
for _, want := range wantResults {
if reflect.DeepEqual(got2, want) {
found = true
}
}
if !found {
t.Errorf("message fetch 2: %v not found in expected list: %v", got2, wantResults)
}
}
if d := time.Since(start); d > 1*time.Second {
t.Errorf("messages were delayed: %v", d)
}

// ack the first subscriber
_, err = client.MessageAck("vitess_topic_subscriber_1", []string{"1, 2, 3"})
if err != nil {
t.Error(err)
}

// ack the second subscriber
_, err = client.MessageAck("vitess_topic_subscriber_2", []string{"1, 2, 3"})
if err != nil {
t.Error(err)
}

//
// phase 2 tests deleting one of the subscribers and making sure
// that inserts into a topic go to one subscribed message table
//

client.Execute("drop table vitess_topic_subscriber_1", nil)

// Create message.
err = client.Begin(false)
if err != nil {
t.Error(err)
return
}
// This insert should cause the engine to make a best-effort guess at generated ids.
// It will expedite the first two rows with null values, and the third row, and will
// give up on the last row, which should eventually be picked up by the poller.
_, err = client.Execute("insert into test_topic(id, message) values(4, 'msg4'), (5, 'msg5'), (6, 'msg6')", nil)
if err != nil {
t.Error(err)
return
}

err = client.Commit()
if err != nil {
t.Error(err)
return
}

// no messages should be queued on the first subscription table
if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_1.Queued"), 3; got != want {
t.Errorf("Messages/vitess_topic_subscriber_1.Queued: %d, want %d", got, want)
}

// Only three messages should be queued on the second subscription table
if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_2.Queued"), 6; got != want {
t.Errorf("Messages/vitess_topic_subscriber_2.Queued: %d, want %d", got, want)
}

wantResults = []*sqltypes.Result{{
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(4),
sqltypes.NULL,
sqltypes.NewVarChar("msg4"),
}},
}, {
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(5),
sqltypes.NULL,
sqltypes.NewVarChar("msg5"),
}},
}, {
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(6),
sqltypes.NULL,
sqltypes.NewVarChar("msg6"),
}},
}}

// Consume first three messages
// and ensure they were received promptly.
start = time.Now()
for i := 0; i < 3; i++ {
// make sure the second message table received all three messages
got2 := <-ch2
got2.Rows[0][1] = sqltypes.NULL

// Results can come in any order.
found := false
for _, want := range wantResults {
if reflect.DeepEqual(got2, want) {
found = true
}
}
if !found {
t.Errorf("message fetch 2: %v not found in expected list: %v", got2, wantResults)
}
}
if d := time.Since(start); d > 1*time.Second {
t.Errorf("messages were delayed: %v", d)
}

// ack the second subscriber
_, err = client.MessageAck("vitess_topic_subscriber_2", []string{"4, 5, 6"})
if err != nil {
t.Error(err)
}

//
// phase 3 tests deleting the last subscriber and making sure
// that inserts into a topic error out with table not found
//

// remove the second subscriber which should remove the topic
if _, err := client.Execute("drop table vitess_topic_subscriber_2", nil); err != nil {
t.Fatal(err)
}

// this should fail because the topic doesn't exist. Any other outcome fails the test
_, err = client.Execute("insert into test_topic(id, message) values(4, 'msg4'), (5, 'msg5'), (6, 'msg6')", nil)
switch {
case err == nil:
t.Error("test_topic shouldn't have existed for inserts to succeed")

case err.Error() == "table test_topic not found in schema (CallerID: dev)":

default:
t.Error(err)
}
}

func waitForMessage(t *testing.T, client *framework.QueryClient, tableName string, ch chan *sqltypes.Result, done chan struct{}) {
if err := client.MessageStream(tableName, func(qr *sqltypes.Result) error {
select {
case <-done:
return io.EOF
default:
}
ch <- qr
return nil
}); err != nil {
t.Error(err)
}
close(ch)
}
Loading

0 comments on commit e34746c

Please sign in to comment.