Skip to content

Commit

Permalink
VStream API: allow cells to be specified for picking source tablets t…
Browse files Browse the repository at this point in the history
…o stream from (vitessio#10294)

* Add parameter Cells in vstream api flags, for tablet picker to look for candidates in

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

* Add e2e test for the new Cells vstream flag

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

* Self-review

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

* Address review comments

Fix gofmt

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
Signed-off-by: Vilius Okockis <vilius.okockis@vinted.com>
  • Loading branch information
rohit-nayak-ps authored and DeathBorn committed Apr 12, 2024
1 parent 9259da7 commit e649f02
Show file tree
Hide file tree
Showing 16 changed files with 305 additions and 106 deletions.
42 changes: 33 additions & 9 deletions go/test/endtoend/vreplication/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"fmt"
"io/ioutil"
"net/http"
"os/exec"
"regexp"
"strconv"
"strings"
"testing"

"vitess.io/vitess/go/vt/log"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -98,24 +101,29 @@ func validateThatQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *c
return newCount == count+1
}

func getQueryCount(url string, query string) int {
var headings, row []string
var rows [][]string
func getHTTPBody(url string) string {
resp, err := http.Get(url)
if err != nil {
fmt.Printf("http Get returns %+v\n", err)
return 0
log.Infof("http Get returns %+v", err)
return ""
}
if resp.StatusCode != 200 {
fmt.Printf("http Get returns status %d\n", resp.StatusCode)
return 0
log.Infof("http Get returns status %d", resp.StatusCode)
return ""
}
respByte, _ := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
body := string(respByte)
return body
}

func getQueryCount(url string, query string) int {
var headings, row []string
var rows [][]string
body := getHTTPBody(url)
doc, err := goquery.NewDocumentFromReader(strings.NewReader(body))
if err != nil {
fmt.Printf("goquery parsing returns %+v\n", err)
log.Infof("goquery parsing returns %+v\n", err)
return 0
}

Expand Down Expand Up @@ -143,7 +151,7 @@ func getQueryCount(url string, query string) int {
})
})
if queryIndex == -1 || countIndex == -1 {
fmt.Println("Queryz response is incorrect")
log.Infof("Queryz response is incorrect")
return 0
}
for _, row := range rows {
Expand Down Expand Up @@ -267,3 +275,19 @@ func printRoutingRules(t *testing.T, vc *VitessCluster, msg string) error {
fmt.Printf("Routing Rules::%s:\n%s\n", msg, output)
return nil
}

func osExec(t *testing.T, command string, args []string) (string, error) {

Check failure on line 279 in go/test/endtoend/vreplication/helper.go

View workflow job for this annotation

GitHub Actions / Lint using golangci-lint

`osExec` is unused (deadcode)

Check failure on line 279 in go/test/endtoend/vreplication/helper.go

View workflow job for this annotation

GitHub Actions / Lint using golangci-lint

`osExec` is unused (deadcode)
cmd := exec.Command(command, args...)
output, err := cmd.CombinedOutput()
return string(output), err
}

func getDebugVar(t *testing.T, port int, varPath []string) (string, error) {
var val []byte
var err error
url := fmt.Sprintf("http://localhost:%d/debug/vars", port)
body := getHTTPBody(url)
val, _, _, err = jsonparser.Get([]byte(body), varPath...)
require.NoError(t, err)
return string(val), nil
}
89 changes: 88 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,22 @@ limitations under the License.
package vreplication

import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"sync"
"testing"
"time"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -142,7 +149,7 @@ func TestBasicVreplicationWorkflow(t *testing.T) {

func TestMultiCellVreplicationWorkflow(t *testing.T) {
cells := []string{"zone1", "zone2"}
allCellNames = "zone1,zone2"
allCellNames = strings.Join(cells, ",")

vc = NewVitessCluster(t, "TestMultiCellVreplicationWorkflow", cells, mainClusterConfig)
require.NotNil(t, vc)
Expand All @@ -165,6 +172,86 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) {
verifyClusterHealth(t, vc)
insertInitialData(t)
shardCustomer(t, true, []*Cell{cell1, cell2}, cell2.Name, true)

// we tag along this test so as not to create the overhead of creating another cluster
testVStreamCellFlag(t)
}

func testVStreamCellFlag(t *testing.T) {
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "product",
Shard: "0",
Gtid: "",
}}}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "product",
Filter: "select * from product",
}},
}
ctx := context.Background()

type vstreamTestCase struct {
cells string
expectError bool
}
nonExistingCell := "zone7"
vstreamTestCases := []vstreamTestCase{
{"zone1,zone2", false},
{nonExistingCell, true},
{"", false},
}

for _, tc := range vstreamTestCases {
t.Run("VStreamCellsFlag/"+tc.cells, func(t *testing.T) {
conn, err := vtgateconn.Dial(ctx, fmt.Sprintf("localhost:%d", vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer conn.Close()

flags := &vtgatepb.VStreamFlags{}
if tc.cells != "" {
flags.Cells = tc.cells
}

ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
reader, err := conn.VStream(ctx2, topodatapb.TabletType_REPLICA, vgtid, filter, flags)
require.NoError(t, err)

rowsReceived := false
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()

events, err := reader.Recv()
switch err {
case nil:
if len(events) > 0 {
log.Infof("received %d events", len(events))
rowsReceived = true
}
case io.EOF:
log.Infof("stream ended without data")
default:
log.Infof("%s:: remote error: %v", time.Now(), err)
}
}()
wg.Wait()

if tc.expectError {
require.False(t, rowsReceived)

// if no tablet was found the tablet picker adds a key which includes the cell name to the vtgate TabletPickerNoTabletFoundErrorCount stat
pickerErrorStat, err := getDebugVar(t, vc.ClusterConfig.vtgatePort, []string{"TabletPickerNoTabletFoundErrorCount"})
require.NoError(t, err)
require.Contains(t, pickerErrorStat, nonExistingCell)
} else {
require.True(t, rowsReceived)
}
})
}
}

func TestCellAliasVreplicationWorkflow(t *testing.T) {
Expand Down
27 changes: 18 additions & 9 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ type TabletRecorder interface {
type keyspaceShardTabletType string
type tabletAliasString string

//HealthCheck declares what the TabletGateway needs from the HealthCheck
// HealthCheck declares what the TabletGateway needs from the HealthCheck
type HealthCheck interface {
// CacheStatus returns a displayable version of the health check cache.
CacheStatus() TabletsCacheStatusList
Expand Down Expand Up @@ -245,18 +245,27 @@ type HealthCheckImpl struct {
// NewHealthCheck creates a new HealthCheck object.
// Parameters:
// retryDelay.
// The duration to wait before retrying to connect (e.g. after a failed connection
// attempt).
//
// The duration to wait before retrying to connect (e.g. after a failed connection
// attempt).
//
// healthCheckTimeout.
// The duration for which we consider a health check response to be 'fresh'. If we don't get
// a health check response from a tablet for more than this duration, we consider the tablet
// not healthy.
//
// The duration for which we consider a health check response to be 'fresh'. If we don't get
// a health check response from a tablet for more than this duration, we consider the tablet
// not healthy.
//
// topoServer.
// The topology server that this healthcheck object can use to retrieve cell or tablet information
//
// The topology server that this healthcheck object can use to retrieve cell or tablet information
//
// localCell.
// The localCell for this healthcheck
//
// The localCell for this healthcheck
//
// callback.
// A function to call when there is a master change. Used to notify vtgate's buffer to stop buffering.
//
// A function to call when there is a master change. Used to notify vtgate's buffer to stop buffering.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl {
log.Infof("loading tablets for cells: %v", cellsToWatch)

Expand Down
14 changes: 7 additions & 7 deletions go/vt/discovery/legacy_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,13 +378,13 @@ func NewLegacyDefaultHealthCheck() LegacyHealthCheck {

// NewLegacyHealthCheck creates a new LegacyHealthCheck object.
// Parameters:
// retryDelay.
// The duration to wait before retrying to connect (e.g. after a failed connection
// attempt).
// healthCheckTimeout.
// The duration for which we consider a health check response to be 'fresh'. If we don't get
// a health check response from a tablet for more than this duration, we consider the tablet
// not healthy.
// - retryDelay
// The duration to wait before retrying to connect (e.g. after a failed connection
// attempt).
// - healthCheckTimeout
// The duration for which we consider a health check response to be 'fresh'. If we don't get
// a health check response from a tablet for more than this duration, we consider the tablet
// not healthy.
func NewLegacyHealthCheck(retryDelay, healthCheckTimeout time.Duration) LegacyHealthCheck {
hc := &LegacyHealthCheckImpl{
addrToHealth: make(map[string]*legacyTabletHealth),
Expand Down
8 changes: 4 additions & 4 deletions go/vt/discovery/legacy_replicationlag.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ func LegacyIsReplicationLagVeryHigh(tabletStats *LegacyTabletStats) bool {
// lags of (30m, 35m, 40m, 45m) return all.
//
// One thing to know about this code: vttablet also has a couple flags that impact the logic here:
// * unhealthy_threshold: if replication lag is higher than this, a tablet will be reported as unhealthy.
// The default for this is 2h, same as the discovery_high_replication_lag_minimum_serving here.
// * degraded_threshold: this is only used by vttablet for display. It should match
// discovery_low_replication_lag here, so the vttablet status display matches what vtgate will do of it.
// - unhealthy_threshold: if replication lag is higher than this, a tablet will be reported as unhealthy.
// The default for this is 2h, same as the discovery_high_replication_lag_minimum_serving here.
// - degraded_threshold: this is only used by vttablet for display. It should match
// discovery_low_replication_lag here, so the vttablet status display matches what vtgate will do of it.
func FilterLegacyStatsByReplicationLag(tabletStatsList []*LegacyTabletStats) []*LegacyTabletStats {
if !*legacyReplicationLagAlgorithm {
return filterLegacyStatsByLag(tabletStatsList)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/discovery/replicationlag.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ func IsReplicationLagVeryHigh(tabletHealth *TabletHealth) bool {
// lags of (30m, 35m, 40m, 45m) return all.
//
// One thing to know about this code: vttablet also has a couple flags that impact the logic here:
// * unhealthy_threshold: if replication lag is higher than this, a tablet will be reported as unhealthy.
// The default for this is 2h, same as the discovery_high_replication_lag_minimum_serving here.
// * degraded_threshold: this is only used by vttablet for display. It should match
// discovery_low_replication_lag here, so the vttablet status display matches what vtgate will do of it.
// - unhealthy_threshold: if replication lag is higher than this, a tablet will be reported as unhealthy.
// The default for this is 2h, same as the discovery_high_replication_lag_minimum_serving here.
// - degraded_threshold: this is only used by vttablet for display. It should match
// discovery_low_replication_lag here, so the vttablet status display matches what vtgate will do of it.
func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHealth {
if !*legacyReplicationLagAlgorithm {
return filterStatsByLag(tabletHealthList)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
// if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue
if err == nil {
actualCells = append(actualCells, alias.Cells...)
} else {
log.Infof("Unable to resolve cell %s, ignoring", cell)
}
} else {
// valid cell, add it to our list
Expand Down
Loading

0 comments on commit e649f02

Please sign in to comment.