Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

messages: add support for topics that route inserts to multiple subscriber message tables #5011

Merged
merged 6 commits into from
Aug 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions go/vt/vttablet/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,27 @@ var tableACLConfig = `{
"writers": ["dev"],
"admins": ["dev"]
},
{
"name": "test_topic",
"table_names_or_prefixes": ["test_topic"],
"readers": ["dev"],
"writers": ["dev"],
"admins": ["dev"]
},
{
"name": "vitess_topic_subscriber_1",
"table_names_or_prefixes": ["vitess_topic_subscriber_1"],
"readers": ["dev"],
"writers": ["dev"],
"admins": ["dev"]
},
{
"name": "vitess_topic_subscriber_2",
"table_names_or_prefixes": ["vitess_topic_subscriber_2"],
"readers": ["dev"],
"writers": ["dev"],
"admins": ["dev"]
},
{
"name": "vitess_acl_unmatched",
"table_names_or_prefixes": ["vitess_acl_unmatched"],
Expand Down
322 changes: 280 additions & 42 deletions go/vt/vttablet/endtoend/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,7 @@ func TestMessage(t *testing.T) {
}

// Start goroutine to consume message stream.
go func() {
if err := client.MessageStream("vitess_message", func(qr *sqltypes.Result) error {
select {
case <-done:
return io.EOF
default:
}
ch <- qr
return nil
}); err != nil {
t.Error(err)
}
close(ch)
}()
go waitForMessage(t, client, "vitess_message", ch, done)
got := <-ch
want := &sqltypes.Result{
Fields: []*querypb.Field{{
Expand Down Expand Up @@ -248,20 +235,7 @@ func TestThreeColMessage(t *testing.T) {
}
defer client.Execute("drop table vitess_message3", nil)

go func() {
if err := client.MessageStream("vitess_message3", func(qr *sqltypes.Result) error {
select {
case <-done:
return io.EOF
default:
}
ch <- qr
return nil
}); err != nil {
t.Error(err)
}
close(ch)
}()
go waitForMessage(t, client, "vitess_message3", ch, done)

// Verify fields.
got := <-ch
Expand Down Expand Up @@ -358,20 +332,7 @@ func TestMessageAuto(t *testing.T) {
defer client.Execute("drop table vitess_message_auto", nil)

// Start goroutine to consume message stream.
go func() {
if err := client.MessageStream("vitess_message_auto", func(qr *sqltypes.Result) error {
select {
case <-done:
return io.EOF
default:
}
ch <- qr
return nil
}); err != nil {
t.Error(err)
}
close(ch)
}()
go waitForMessage(t, client, "vitess_message_auto", ch, done)
<-ch
defer func() { close(done) }()

Expand Down Expand Up @@ -461,3 +422,280 @@ func TestMessageAuto(t *testing.T) {
t.Errorf("message received:\n%v, want\n%v", got, want)
}
}

var createMessageTopic1 = `create table vitess_topic_subscriber_1(
time_scheduled bigint,
id bigint,
time_next bigint,
epoch bigint,
time_created bigint,
time_acked bigint,
message varchar(128),
primary key(time_scheduled, id),
unique index id_idx(id),
index next_idx(time_next, epoch))
comment 'vitess_message,vt_topic=test_topic,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=1'`

var createMessageTopic2 = `create table vitess_topic_subscriber_2(
time_scheduled bigint,
id bigint,
time_next bigint,
epoch bigint,
time_created bigint,
time_acked bigint,
message varchar(128),
primary key(time_scheduled, id),
unique index id_idx(id),
index next_idx(time_next, epoch))
comment 'vitess_message,vt_topic=test_topic,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=1'`

// TestMessageTopic tests for the case where id is an auto-inc column.
func TestMessageTopic(t *testing.T) {
ch1 := make(chan *sqltypes.Result)
ch2 := make(chan *sqltypes.Result)
done := make(chan struct{})
client := framework.NewClient()

//
// phase 1 tests inserts into a topic going to two subscribed message tables
//
if _, err := client.Execute(createMessageTopic1, nil); err != nil {
t.Fatal(err)
}

if _, err := client.Execute(createMessageTopic2, nil); err != nil {
t.Fatal(err)
}

// Start goroutines to consume message stream.
go waitForMessage(t, client, "vitess_topic_subscriber_1", ch1, done)
<-ch1
go waitForMessage(t, client, "vitess_topic_subscriber_2", ch2, done)
<-ch2
derekperkins marked this conversation as resolved.
Show resolved Hide resolved
defer func() { close(done) }()

// Create message.
err := client.Begin(false)
if err != nil {
t.Error(err)
return
}
// This insert should cause the engine to make a best-effort guess at generated ids.
// It will expedite the first two rows with null values, and the third row, and will
// give up on the last row, which should eventually be picked up by the poller.
_, err = client.Execute("insert into test_topic(id, message) values(1, 'msg1'), (2, 'msg2'), (3, 'msg3')", nil)
if err != nil {
t.Error(err)
return
}

err = client.Commit()
if err != nil {
t.Error(err)
return
}

// Only three messages should be queued on the first subscription table
if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_1.Queued"), 3; got != want {
t.Errorf("Messages/vitess_topic_subscriber_1.Queued: %d, want %d", got, want)
}

// Only three messages should be queued on the second subscription table
if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_2.Queued"), 3; got != want {
t.Errorf("Messages/vitess_topic_subscriber_2.Queued: %d, want %d", got, want)
}

wantResults := []*sqltypes.Result{{
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NULL,
sqltypes.NewVarChar("msg1"),
}},
}, {
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(2),
sqltypes.NULL,
sqltypes.NewVarChar("msg2"),
}},
}, {
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(3),
sqltypes.NULL,
sqltypes.NewVarChar("msg3"),
}},
}}

// Consume first three messages
// and ensure they were received promptly.
start := time.Now()
for i := 0; i < 3; i++ {
// make sure the first message table received all three messages
got1 := <-ch1
got1.Rows[0][1] = sqltypes.NULL

// Results can come in any order.
found := false
for _, want := range wantResults {
if reflect.DeepEqual(got1, want) {
found = true
}
}
if !found {
t.Errorf("message fetch 1: %v not found in expected list: %v", got1, wantResults)
}

// make sure the second message table received all three messages
got2 := <-ch2
got2.Rows[0][1] = sqltypes.NULL

// Results can come in any order.
found = false
for _, want := range wantResults {
if reflect.DeepEqual(got2, want) {
found = true
}
}
if !found {
t.Errorf("message fetch 2: %v not found in expected list: %v", got2, wantResults)
}
}
if d := time.Since(start); d > 1*time.Second {
t.Errorf("messages were delayed: %v", d)
}

// ack the first subscriber
_, err = client.MessageAck("vitess_topic_subscriber_1", []string{"1, 2, 3"})
if err != nil {
t.Error(err)
}

// ack the second subscriber
_, err = client.MessageAck("vitess_topic_subscriber_2", []string{"1, 2, 3"})
if err != nil {
t.Error(err)
}

//
// phase 2 tests deleting one of the subscribers and making sure
// that inserts into a topic go to one subscribed message table
//

client.Execute("drop table vitess_topic_subscriber_1", nil)

// Create message.
err = client.Begin(false)
if err != nil {
t.Error(err)
return
}
// This insert should cause the engine to make a best-effort guess at generated ids.
// It will expedite the first two rows with null values, and the third row, and will
// give up on the last row, which should eventually be picked up by the poller.
_, err = client.Execute("insert into test_topic(id, message) values(4, 'msg4'), (5, 'msg5'), (6, 'msg6')", nil)
if err != nil {
t.Error(err)
return
}

err = client.Commit()
if err != nil {
t.Error(err)
return
}

// no messages should be queued on the first subscription table
if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_1.Queued"), 3; got != want {
t.Errorf("Messages/vitess_topic_subscriber_1.Queued: %d, want %d", got, want)
}

// Only three messages should be queued on the second subscription table
if got, want := framework.FetchInt(framework.DebugVars(), "Messages/vitess_topic_subscriber_2.Queued"), 6; got != want {
t.Errorf("Messages/vitess_topic_subscriber_2.Queued: %d, want %d", got, want)
}

wantResults = []*sqltypes.Result{{
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(4),
sqltypes.NULL,
sqltypes.NewVarChar("msg4"),
}},
}, {
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(5),
sqltypes.NULL,
sqltypes.NewVarChar("msg5"),
}},
}, {
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(6),
sqltypes.NULL,
sqltypes.NewVarChar("msg6"),
}},
}}

// Consume first three messages
// and ensure they were received promptly.
start = time.Now()
for i := 0; i < 3; i++ {
// make sure the second message table received all three messages
got2 := <-ch2
got2.Rows[0][1] = sqltypes.NULL

// Results can come in any order.
found := false
for _, want := range wantResults {
if reflect.DeepEqual(got2, want) {
found = true
}
}
if !found {
t.Errorf("message fetch 2: %v not found in expected list: %v", got2, wantResults)
}
}
if d := time.Since(start); d > 1*time.Second {
t.Errorf("messages were delayed: %v", d)
}

// ack the second subscriber
_, err = client.MessageAck("vitess_topic_subscriber_2", []string{"4, 5, 6"})
if err != nil {
t.Error(err)
}

//
// phase 3 tests deleting the last subscriber and making sure
// that inserts into a topic error out with table not found
//

// remove the second subscriber which should remove the topic
if _, err := client.Execute("drop table vitess_topic_subscriber_2", nil); err != nil {
t.Fatal(err)
}

// this should fail because the topic doesn't exist. Any other outcome fails the test
_, err = client.Execute("insert into test_topic(id, message) values(4, 'msg4'), (5, 'msg5'), (6, 'msg6')", nil)
switch {
case err == nil:
t.Error("test_topic shouldn't have existed for inserts to succeed")

case err.Error() == "table test_topic not found in schema (CallerID: dev)":

default:
t.Error(err)
}
}

func waitForMessage(t *testing.T, client *framework.QueryClient, tableName string, ch chan *sqltypes.Result, done chan struct{}) {
if err := client.MessageStream(tableName, func(qr *sqltypes.Result) error {
select {
case <-done:
return io.EOF
default:
}
ch <- qr
return nil
}); err != nil {
t.Error(err)
}
close(ch)
}
Loading