Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VTShovel - VReplication support for external databases #5289

Merged
merged 33 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
489eb3c
Adds proto definition to store dml extracted from Query
rafael Oct 3, 2019
aea60ab
Update schema engine to expect mysql.ConnParams
rafael Oct 10, 2019
11301e3
Adds support for file:pos in mysql binlogdump interface
rafael Oct 10, 2019
2f1d3b1
Adds BinlogFilePos as a way to encode mysql replication
rafael Oct 10, 2019
a2b0074
Adds StartBinlogDumpFromFilePosition to slave connection protocol
rafael Oct 10, 2019
4210649
Enables vreplication to run directly from MySQL
rafael Oct 10, 2019
f903605
Merge branch 'upstream-master' into vtshovel-poc
rafael Oct 10, 2019
c3c238b
Adds the core of vtshovel program
rafael Oct 11, 2019
8bf38cc
Simplifies vtshovel logic. It assumes that it runs directly again
rafael Oct 16, 2019
b56bf67
Update approach to not require another binary to run vtshovel
rafael Oct 17, 2019
c7926ef
Fixes some bugs in dbconfigs and vstreamer client after inital testing
rafael Oct 18, 2019
46c0fe4
Adds QPS chart to tablet vreplication section
rafael Oct 21, 2019
e73faef
Adds flag to register metrics.
rafael Oct 21, 2019
cf57589
Addresses comments from review
rafael Oct 29, 2019
90ee27a
WIP: Adds test for vstreamer client
rafael Oct 30, 2019
3fc6b08
Fixes bug in dbconfigs that was causing vstreamer to not work correctly
rafael Oct 30, 2019
b674d5e
Adds tests for vstreamer_client
rafael Nov 6, 2019
9988496
Do not pass source conn params around
rafael Nov 6, 2019
4bd4904
Style improvements
rafael Nov 6, 2019
21eeeb4
Abort on error when executing a DML in statement based replication
rafael Nov 14, 2019
6593e5d
Merge branch 'upstream-master' into vtshovel-poc
rafael Nov 23, 2019
1665701
Fixes per rebase with file:pos feature
rafael Nov 25, 2019
67ac881
Merge branch 'master' of https://github.com/vitessio/vitess into vtsh…
rafael Nov 26, 2019
731f02f
Revert no longer needed changes in vstream
rafael Nov 26, 2019
ec632f2
Adds support to set flavor for specific connections
rafael Nov 27, 2019
d4460ee
Fixes per integration with file:pos rebase
rafael Nov 27, 2019
4f03489
Remove test to make sure this is the last outstanding issue
rafael Nov 27, 2019
b1a8772
Fixes bug in filepos and adds test for statement mode
rafael Nov 28, 2019
69a4dd0
Merge branch 'master' of https://github.com/vitessio/vitess into vtsh…
rafael Nov 28, 2019
59785b6
Fixes per changes upstream
rafael Nov 28, 2019
5fd8925
Adds more tests and fixes govet issues
rafael Dec 1, 2019
559a210
Re-running go-imports and addressing other comments from PR
rafael Dec 4, 2019
dc0f4a3
Merge branch 'master' of https://github.com/vitessio/vitess into vtsh…
rafael Dec 4, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func main() {
}

// vtgate configuration and init
resilientServer := srvtopo.NewResilientServer(ts, "ResilientSrvTopoServer")
resilientServer := srvtopo.NewResilientServer(ts, "ResilientSrvTopoServer", true)
healthCheck := discovery.NewHealthCheck(1*time.Millisecond /*retryDelay*/, 1*time.Hour /*healthCheckTimeout*/)
tabletTypesToWait := []topodatapb.TabletType{
topodatapb.TabletType_MASTER,
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func main() {
ts := topo.Open()
defer ts.Close()

resilientServer = srvtopo.NewResilientServer(ts, "ResilientSrvTopoServer")
resilientServer = srvtopo.NewResilientServer(ts, "ResilientSrvTopoServer", true)

healthCheck = discovery.NewHealthCheck(*healthCheckRetryDelay, *healthCheckTimeout)
healthCheck.RegisterStats()
Expand Down
10 changes: 10 additions & 0 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type flavor interface {
// stopSlave returns the command to stop the slave.
stopSlaveCommand() string

// sendBinlogFileDumpCommand sends the packet required to start streaming from file:post
rafael marked this conversation as resolved.
Show resolved Hide resolved
sendBinlogFileDumpCommand(c *Conn, slaveID uint32, binlogFilename string, pos uint32) error

// sendBinlogDumpCommand sends the packet required to start
// dumping binlogs from the specified location.
sendBinlogDumpCommand(c *Conn, slaveID uint32, startPos Position) error
Expand Down Expand Up @@ -163,6 +166,13 @@ func (c *Conn) StopSlaveCommand() string {
return c.flavor.stopSlaveCommand()
}

// SendBinlogFileDumpCommand sends the flavor-specific version of
// the COM_BINLOG_DUMP command to start dumping raw binlog
// events over a slave connection, starting at a given file position.
func (c *Conn) SendBinlogFileDumpCommand(slaveID uint32, binlogFilename string, pos uint32) error {
return c.flavor.sendBinlogFileDumpCommand(c, slaveID, binlogFilename, pos)
}

// SendBinlogDumpCommand sends the flavor-specific version of
// the COM_BINLOG_DUMP command to start dumping raw binlog
// events over a slave connection, starting at a given GTID.
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (mariadbFlavor) stopSlaveCommand() string {
return "STOP SLAVE"
}

// sendBinlogFileDumpCommand is part of the Flavor interface.
func (mariadbFlavor) sendBinlogFileDumpCommand(c *Conn, slaveID uint32, binlogFilename string, pos uint32) error {
panic("filename binglog not supported for mariadb")
}

// sendBinlogDumpCommand is part of the Flavor interface.
func (mariadbFlavor) sendBinlogDumpCommand(c *Conn, slaveID uint32, startPos Position) error {
// Tell the server that we understand GTIDs by setting our slave
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func (mysqlFlavor) stopSlaveCommand() string {
return "STOP SLAVE"
}

// sendBinlogDumpCommand is part of the Flavor interface.
func (mysqlFlavor) sendBinlogFileDumpCommand(c *Conn, slaveID uint32, binlogFilename string, pos uint32) error {
return c.WriteComBinlogDump(slaveID, binlogFilename, pos, 0)
}

// sendBinlogDumpCommand is part of the Flavor interface.
func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, slaveID uint32, startPos Position) error {
gtidSet, ok := startPos.GTIDSet.(Mysql56GTIDSet)
Expand Down
29 changes: 29 additions & 0 deletions go/mysql/replication_position.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package mysql
import (
"encoding/json"
"fmt"
"strconv"
"strings"

"vitess.io/vitess/go/vt/proto/vtrpc"
Expand All @@ -35,6 +36,12 @@ const (
MaximumPositionSize = 64000
)

// BinlogFilePos used to encode filename:pos.
type BinlogFilePos struct {
Name string
Pos uint32
}

// Position represents the information necessary to describe which
// transactions a server has seen, so that it can request a replication stream
// from a new master that picks up where it left off.
Expand Down Expand Up @@ -120,6 +127,28 @@ func EncodePosition(rp Position) string {
return fmt.Sprintf("%s/%s", rp.GTIDSet.Flavor(), rp.GTIDSet.String())
}

// ParseFilePosition converts a string in the format file:pos
rafael marked this conversation as resolved.
Show resolved Hide resolved
// to BinlogFilePos
func ParseFilePosition(s string) (rp BinlogFilePos, err error) {
if s == "" {
return rp, vterrors.Errorf(vtrpc.Code_INTERNAL, "parse error: unknown file:pos format %#v", s)
}

parts := strings.SplitN(s, ":", 2)
if len(parts) != 2 {
return rp, vterrors.Errorf(vtrpc.Code_INTERNAL, "parse error: unknown file:pos format %#v", s)
}

pos, err := strconv.Atoi(parts[1])
if err != nil {
return rp, vterrors.Errorf(vtrpc.Code_INTERNAL, "parse error: pos is not a valid int %#v", s)
}

rp.Name = parts[0]
rp.Pos = uint32(pos)
return rp, nil
}

// DecodePosition converts a string in the format returned by
// EncodePosition back into a Position value with the
// correct underlying flavor.
Expand Down
42 changes: 42 additions & 0 deletions go/mysql/replication_position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,48 @@ func TestPositionAppendToZero(t *testing.T) {
}
}

func TestParseFilePositionInvalidInput(t *testing.T) {
input := "filenameinvalidpos"
rp, err := ParseFilePosition(input)
if err == nil {
t.Errorf("ParseFilePosition(%#v) expected error, got : %#v", input, rp)
}

want := `parse error: unknown file:pos format`
got, ok := err.(error)
if !ok || !strings.HasPrefix(got.Error(), want) {
t.Errorf("wrong error, got %#v, want %#v", got, want)
}
}

func TestParseFilePositionInvalidPos(t *testing.T) {
input := "filename:invalidpos"
rp, err := ParseFilePosition(input)
if err == nil {
t.Errorf("ParseFilePosition(%#v) expected error, got : %#v", input, rp)
}

want := `parse error: pos is not a valid`
got, ok := err.(error)
if !ok || !strings.HasPrefix(got.Error(), want) {
t.Errorf("wrong error, got %#v, want %#v", got, want)
}
}

func TestParseFilePosition(t *testing.T) {
input := "filename:2343"
want := BinlogFilePos{Name: "filename", Pos: 2343}
got, err := ParseFilePosition(input)
if err != nil {
t.Errorf("ParseFilePosition(%#v) unexpected error: %#v", input, err)
}

if got.Name != want.Name || got.Pos != want.Pos {
t.Errorf("ParseFilePosition(%#v) = %#v, want %#v", input, got, want)
}

}

func TestMustParsePosition(t *testing.T) {
flavor := "fake flavor"
gtidSetParsers[flavor] = func(s string) (GTIDSet, error) {
Expand Down
2 changes: 1 addition & 1 deletion go/stats/prometheusbackend/prometheusbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (be PromBackend) publishPrometheusMetric(name string, v expvar.Var) {
newMultiTimingsCollector(st, be.buildPromName(name))
case *stats.Histogram:
newHistogramCollector(st, be.buildPromName(name))
case *stats.String, stats.StringFunc, stats.StringMapFunc, *stats.Rates:
case *stats.String, stats.StringFunc, stats.StringMapFunc, *stats.Rates, *stats.RatesFunc:
// Silently ignore these types since they don't make sense to
// export to Prometheus' data model.
default:
Expand Down
29 changes: 29 additions & 0 deletions go/stats/rates.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,32 @@ func (rt *Rates) String() string {
}
return string(data)
}

type RatesFunc struct {
F func() map[string][]float64
help string
}

func NewRateFunc(name string, help string, f func() map[string][]float64) *RatesFunc {
c := &RatesFunc{
F: f,
help: help,
}

if name != "" {
publish(name, c)
}
return c
}

func (rf *RatesFunc) Help() string {
return rf.help
}

func (rf *RatesFunc) String() string {
data, err := json.Marshal(rf.F())
if err != nil {
data, _ = json.Marshal(err.Error())
}
return string(data)
}
19 changes: 12 additions & 7 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (bps *Stats) MessageHistory() []string {
func NewStats() *Stats {
bps := &Stats{}
bps.Timings = stats.NewTimings("", "", "")
bps.Rates = stats.NewRates("", bps.Timings, 15, 60e9)
bps.Rates = stats.NewRates("", bps.Timings, 15*60/5, 5*time.Second)
bps.History = history.New(3)
bps.SecondsBehindMaster.Set(math.MaxInt64)
return bps
Expand Down Expand Up @@ -202,7 +202,10 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
log.Error(err)
return err
}
blp.position = settings.StartPos

if !settings.GtidStartPos.IsZero() {
blp.position = settings.GtidStartPos
}
blp.stopPosition = settings.StopPos
t, err := throttler.NewThrottler(
fmt.Sprintf("BinlogPlayer/%d", blp.uid),
Expand Down Expand Up @@ -517,11 +520,12 @@ func SetVReplicationState(dbClient DBClient, uid uint32, state, message string)

// VRSettings contains the settings of a vreplication table.
type VRSettings struct {
StartPos mysql.Position
StartPos string
StopPos mysql.Position
MaxTPS int64
MaxReplicationLag int64
State string
GtidStartPos mysql.Position
}

// ReadVRSettings retrieves the throttler settings for
Expand All @@ -546,17 +550,18 @@ func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) {
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse max_replication_lag column: %v", err)
}
startPos, err := mysql.DecodePosition(vrRow[0].ToString())
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse pos column: %v", err)
}
startPos := vrRow[0].ToString()
// TODO: This will be removed when we start using filename:pos flavor and everythign will by a proper enconded mysql.Position
gtidStartPos, _ := mysql.DecodePosition(startPos)

stopPos, err := mysql.DecodePosition(vrRow[1].ToString())
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse stop_pos column: %v", err)
}

return VRSettings{
StartPos: startPos,
GtidStartPos: gtidStartPos,
StopPos: stopPos,
MaxTPS: maxTPS,
MaxReplicationLag: maxReplicationLag,
Expand Down
20 changes: 20 additions & 0 deletions go/vt/binlog/slave_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,26 @@ func (sc *SlaveConnection) StartBinlogDumpFromPosition(ctx context.Context, star
return sc.streamEvents(ctx), nil
}

// StartBinlogDumpFromFilePosition requests a replication binlog dump from
// the master mysqld at the given binlog filename:pos and then sends binlog
// events to the provided channel.
// The stream will continue in the background, waiting for new events if
// necessary, until the connection is closed, either by the master or
// by canceling the context.
//
// Note the context is valid and used until eventChan is closed.
func (sc *SlaveConnection) StartBinlogDumpFromFilePosition(ctx context.Context, binlogFilename string, pos uint32) (<-chan mysql.BinlogEvent, error) {
ctx, sc.cancel = context.WithCancel(ctx)

log.Infof("sending binlog file dump command: binlogfilename=%v, pos=%v, slaveID=%v", binlogFilename, pos, sc.slaveID)
if err := sc.SendBinlogFileDumpCommand(sc.slaveID, binlogFilename, pos); err != nil {
log.Errorf("couldn't send binlog dump command: %v", err)
return nil, err
}

return sc.streamEvents(ctx), nil
}

// streamEvents returns a channel on which events are streamed.
func (sc *SlaveConnection) streamEvents(ctx context.Context) chan mysql.BinlogEvent {
// FIXME(alainjobart) I think we can use a buffered channel for better performance.
Expand Down
36 changes: 27 additions & 9 deletions go/vt/dbconfigs/dbconfigs.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ const (
// AllPrivs user should have more privileges than App (should include possibility to do
// schema changes and write to internal Vitess tables), but it shouldn't have SUPER
// privilege like Dba has.
AllPrivs = "allprivs"
Dba = "dba"
Filtered = "filtered"
Repl = "repl"
AllPrivs = "allprivs"
Dba = "dba"
Filtered = "filtered"
Repl = "repl"
ExternalRepl = "erepl"
rafael marked this conversation as resolved.
Show resolved Hide resolved
)

// All can be used to register all flags: RegisterFlags(All...)
var All = []string{App, AppDebug, AllPrivs, Dba, Filtered, Repl}
var All = []string{App, AppDebug, AllPrivs, Dba, Filtered, Repl, ExternalRepl}

// RegisterFlags registers the flags for the given DBConfigFlag.
// For instance, vttablet will register client, dba and repl.
Expand Down Expand Up @@ -157,16 +158,28 @@ func (dbcfgs *DBConfigs) DbaWithDB() *mysql.ConnParams {
return dbcfgs.makeParams(Dba, true)
}

// FilteredWithDB returns connection parameters for appdebug with dbname set.
// FilteredWithDB returns connection parameters for filtered with dbname set.
func (dbcfgs *DBConfigs) FilteredWithDB() *mysql.ConnParams {
return dbcfgs.makeParams(Filtered, true)
}

// Repl returns connection parameters for appdebug with no dbname set.
// Repl returns connection parameters for repl with no dbname set.
func (dbcfgs *DBConfigs) Repl() *mysql.ConnParams {
return dbcfgs.makeParams(Repl, false)
}

// ExternalRepl returns connection parameters for repl with no dbname set.
func (dbcfgs *DBConfigs) ExternalRepl() *mysql.ConnParams {
return dbcfgs.makeParams(ExternalRepl, false)
}

// ExternalReplWithDB returns connection parameters for repl with dbname set.
func (dbcfgs *DBConfigs) ExternalReplWithDB() *mysql.ConnParams {
params := dbcfgs.makeParams(ExternalRepl, false)
params.DbName = params.DeprecatedDBName
return params
}

// AppWithDB returns connection parameters for app with dbname set.
func (dbcfgs *DBConfigs) makeParams(userKey string, withDB bool) *mysql.ConnParams {
orig := dbcfgs.userConfigs[userKey]
Expand Down Expand Up @@ -237,8 +250,13 @@ func HasConnectionParams() bool {
// is used to initialize the per-user conn params.
func Init(defaultSocketFile string) (*DBConfigs, error) {
// The new base configs, if set, supersede legacy settings.
for _, uc := range dbConfigs.userConfigs {
if HasConnectionParams() {
for user, uc := range dbConfigs.userConfigs {
// TODO @rafael: For ExternalRepl we need to respect the provided host / port
// At the moment this is an snowflake user connection type that it used by
// vreplication to connect to external mysql hosts that are not part of a vitess
// cluster. In the future we need to refactor all dbconfig to support custom users
// in a more flexible way.
if HasConnectionParams() && user != ExternalRepl {
uc.param.Host = baseConfig.Host
uc.param.Port = baseConfig.Port
uc.param.UnixSocket = baseConfig.UnixSocket
Expand Down
Loading