Skip to content

Commit

Permalink
Merge pull request #9392 from planetscale/rn-vstream-from
Browse files Browse the repository at this point in the history
Initial commit to add the 'vstream * from' functionality back to vtgate
  • Loading branch information
rohit-nayak-ps authored Dec 30, 2021
2 parents 634f57c + 9d23927 commit 794a49b
Show file tree
Hide file tree
Showing 13 changed files with 7,366 additions and 6,985 deletions.
68 changes: 68 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vreplication

import (
"context"
"encoding/json"
"fmt"
"io"
Expand All @@ -26,6 +27,10 @@ import (
"testing"
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
querypb "vitess.io/vitess/go/vt/proto/query"

"vitess.io/vitess/go/vt/log"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -162,6 +167,8 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) {
shardCustomer(t, true, []*Cell{cell1, cell2}, cell2.Name)
}

// TestCellAliasVreplicationWorkflow tests replication from a cell with an alias to test the tablet picker's alias functionality
// We also reuse the setup of this test to validate that the "vstream * from" vtgate query functionality is functional
func TestCellAliasVreplicationWorkflow(t *testing.T) {
cells := []string{"zone1", "zone2"}
mainClusterConfig.vreplicationCompressGTID = true
Expand Down Expand Up @@ -193,9 +200,70 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) {
defer vtgateConn.Close()
verifyClusterHealth(t, vc)
insertInitialData(t)
t.Run("VStreamFrom", func(t *testing.T) {
testVStreamFrom(t, "product", 2)
})
shardCustomer(t, true, []*Cell{cell1, cell2}, "alias")
}

// testVStreamFrom confirms that the "vstream * from" endpoint is serving data
func testVStreamFrom(t *testing.T, table string, expectedRowCount int) {
ctx := context.Background()
vtParams := mysql.ConnParams{
Host: "localhost",
Port: vtgate.MySQLServerPort,
}
ch := make(chan bool, 1)
go func() {
streamConn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer streamConn.Close()
_, err = streamConn.ExecuteFetch("set workload='olap'", 1000, false)
require.NoError(t, err)

query := fmt.Sprintf("vstream * from %s", table)
err = streamConn.ExecuteStreamFetch(query)
require.NoError(t, err)

wantFields := []*querypb.Field{{
Name: "op",
Type: sqltypes.VarChar,
}, {
Name: "pid",
Type: sqltypes.Int32,
}, {
Name: "description",
Type: sqltypes.VarBinary,
}}
gotFields, err := streamConn.Fields()
require.NoError(t, err)
for i, field := range gotFields {
gotFields[i] = &querypb.Field{
Name: field.Name,
Type: field.Type,
}
}
utils.MustMatch(t, wantFields, gotFields)

gotRows, err := streamConn.FetchNext(nil)
require.NoError(t, err)
log.Infof("QR1:%v\n", gotRows)

gotRows, err = streamConn.FetchNext(nil)
require.NoError(t, err)
log.Infof("QR2:%+v\n", gotRows)

ch <- true
}()

select {
case <-ch:
return
case <-time.After(5 * time.Second):
t.Fatal("nothing streamed within timeout")
}
}

func insertInitialData(t *testing.T) {
t.Run("insertInitialData", func(t *testing.T) {
log.Infof("Inserting initial data")
Expand Down
1 change: 1 addition & 0 deletions go/vt/sqlparser/keywords.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ var keywords = []keyword{
{"vitess_shards", VITESS_SHARDS},
{"vitess_tablets", VITESS_TABLETS},
{"vschema", VSCHEMA},
{"vstream", VSTREAM},
{"warnings", WARNINGS},
{"when", WHEN},
{"where", WHERE},
Expand Down
2 changes: 2 additions & 0 deletions go/vt/sqlparser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2059,6 +2059,8 @@ var (
input: "stream * from t",
}, {
input: "stream /* comment */ * from t",
}, {
input: "vstream * from t",
}, {
input: "begin",
}, {
Expand Down
Loading

0 comments on commit 794a49b

Please sign in to comment.