Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VTShovel - VReplication support for external databases #5289

Merged
merged 33 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
489eb3c
Adds proto definition to store dml extracted from Query
rafael Oct 3, 2019
aea60ab
Update schema engine to expect mysql.ConnParams
rafael Oct 10, 2019
11301e3
Adds support for file:pos in mysql binlogdump interface
rafael Oct 10, 2019
2f1d3b1
Adds BinlogFilePos as a way to encode mysql replication
rafael Oct 10, 2019
a2b0074
Adds StartBinlogDumpFromFilePosition to slave connection protocol
rafael Oct 10, 2019
4210649
Enables vreplication to run directly from MySQL
rafael Oct 10, 2019
f903605
Merge branch 'upstream-master' into vtshovel-poc
rafael Oct 10, 2019
c3c238b
Adds the core of vtshovel program
rafael Oct 11, 2019
8bf38cc
Simplifies vtshovel logic. It assumes that it runs directly again
rafael Oct 16, 2019
b56bf67
Update approach to not require another binary to run vtshovel
rafael Oct 17, 2019
c7926ef
Fixes some bugs in dbconfigs and vstreamer client after inital testing
rafael Oct 18, 2019
46c0fe4
Adds QPS chart to tablet vreplication section
rafael Oct 21, 2019
e73faef
Adds flag to register metrics.
rafael Oct 21, 2019
cf57589
Addresses comments from review
rafael Oct 29, 2019
90ee27a
WIP: Adds test for vstreamer client
rafael Oct 30, 2019
3fc6b08
Fixes bug in dbconfigs that was causing vstreamer to not work correctly
rafael Oct 30, 2019
b674d5e
Adds tests for vstreamer_client
rafael Nov 6, 2019
9988496
Do not pass source conn params around
rafael Nov 6, 2019
4bd4904
Style improvements
rafael Nov 6, 2019
21eeeb4
Abort on error when executing a DML in statement based replication
rafael Nov 14, 2019
6593e5d
Merge branch 'upstream-master' into vtshovel-poc
rafael Nov 23, 2019
1665701
Fixes per rebase with file:pos feature
rafael Nov 25, 2019
67ac881
Merge branch 'master' of https://github.com/vitessio/vitess into vtsh…
rafael Nov 26, 2019
731f02f
Revert no longer needed changes in vstream
rafael Nov 26, 2019
ec632f2
Adds support to set flavor for specific connections
rafael Nov 27, 2019
d4460ee
Fixes per integration with file:pos rebase
rafael Nov 27, 2019
4f03489
Remove test to make sure this is the last outstanding issue
rafael Nov 27, 2019
b1a8772
Fixes bug in filepos and adds test for statement mode
rafael Nov 28, 2019
69a4dd0
Merge branch 'master' of https://github.com/vitessio/vitess into vtsh…
rafael Nov 28, 2019
59785b6
Fixes per changes upstream
rafael Nov 28, 2019
5fd8925
Adds more tests and fixes govet issues
rafael Dec 1, 2019
559a210
Re-running go-imports and addressing other comments from PR
rafael Dec 4, 2019
dc0f4a3
Merge branch 'master' of https://github.com/vitessio/vitess into vtsh…
rafael Dec 4, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions go/mysql/binlog_event_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ func (ev filePosQueryEvent) Query(BinlogFormat) (Query, error) {
}, nil
}

//----------------------------------------------------------------------------
func (ev filePosQueryEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte, error) {
return ev, nil, nil
}

var _ BinlogEvent = filePosFakeEvent{}
//----------------------------------------------------------------------------

// filePosFakeEvent is the base class for fake events.
type filePosFakeEvent struct {
Expand Down Expand Up @@ -207,10 +209,6 @@ func (ev filePosFakeEvent) Rows(BinlogFormat, *TableMap) (Rows, error) {
return Rows{}, nil
}

func (ev filePosFakeEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte, error) {
return ev, nil, nil
}

func (ev filePosFakeEvent) IsPseudo() bool {
return false
}
Expand Down Expand Up @@ -239,6 +237,10 @@ func (ev filePosGTIDEvent) IsGTID() bool {
return true
}

func (ev filePosGTIDEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte, error) {
return ev, nil, nil
}

func (ev filePosGTIDEvent) GTID(BinlogFormat) (GTID, bool, error) {
return ev.gtid, false, nil
}
2 changes: 1 addition & 1 deletion go/stats/prometheusbackend/prometheusbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (be PromBackend) publishPrometheusMetric(name string, v expvar.Var) {
newMultiTimingsCollector(st, be.buildPromName(name))
case *stats.Histogram:
newHistogramCollector(st, be.buildPromName(name))
case *stats.String, stats.StringFunc, stats.StringMapFunc, *stats.Rates:
case *stats.String, stats.StringFunc, stats.StringMapFunc, *stats.Rates, *stats.RatesFunc:
// Silently ignore these types since they don't make sense to
// export to Prometheus' data model.
default:
Expand Down
29 changes: 29 additions & 0 deletions go/stats/rates.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,32 @@ func (rt *Rates) String() string {
}
return string(data)
}

type RatesFunc struct {
F func() map[string][]float64
help string
}

func NewRateFunc(name string, help string, f func() map[string][]float64) *RatesFunc {
c := &RatesFunc{
F: f,
help: help,
}

if name != "" {
publish(name, c)
}
return c
}

func (rf *RatesFunc) Help() string {
return rf.help
}

func (rf *RatesFunc) String() string {
data, err := json.Marshal(rf.F())
if err != nil {
data, _ = json.Marshal(err.Error())
}
return string(data)
}
3 changes: 2 additions & 1 deletion go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (bps *Stats) MessageHistory() []string {
func NewStats() *Stats {
bps := &Stats{}
bps.Timings = stats.NewTimings("", "", "")
bps.Rates = stats.NewRates("", bps.Timings, 15, 60e9)
bps.Rates = stats.NewRates("", bps.Timings, 15*60/5, 5*time.Second)
bps.History = history.New(3)
bps.SecondsBehindMaster.Set(math.MaxInt64)
return bps
Expand Down Expand Up @@ -202,6 +202,7 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
log.Error(err)
return err
}

blp.position = settings.StartPos
blp.stopPosition = settings.StopPos
t, err := throttler.NewThrottler(
Expand Down
59 changes: 43 additions & 16 deletions go/vt/dbconfigs/dbconfigs.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ const (
// AllPrivs user should have more privileges than App (should include possibility to do
// schema changes and write to internal Vitess tables), but it shouldn't have SUPER
// privilege like Dba has.
AllPrivs = "allprivs"
Dba = "dba"
Filtered = "filtered"
Repl = "repl"
AllPrivs = "allprivs"
Dba = "dba"
Filtered = "filtered"
Repl = "repl"
ExternalRepl = "erepl"
rafael marked this conversation as resolved.
Show resolved Hide resolved
)

// All can be used to register all flags: RegisterFlags(All...)
var All = []string{App, AppDebug, AllPrivs, Dba, Filtered, Repl}
var All = []string{App, AppDebug, AllPrivs, Dba, Filtered, Repl, ExternalRepl}

// RegisterFlags registers the flags for the given DBConfigFlag.
// For instance, vttablet will register client, dba and repl.
Expand Down Expand Up @@ -128,6 +129,7 @@ func registerPerUserFlags(dbc *userConfig, userKey string) {
flag.StringVar(&dbc.param.SslCert, "db-config-"+userKey+"-ssl-cert", "", "deprecated: use db_ssl_cert")
flag.StringVar(&dbc.param.SslKey, "db-config-"+userKey+"-ssl-key", "", "deprecated: use db_ssl_key")
flag.StringVar(&dbc.param.ServerName, "db-config-"+userKey+"-server_name", "", "deprecated: use db_server_name")
flag.StringVar(&dbc.param.Flavor, "db-config-"+userKey+"-flavor", "", "deprecated: use db_flavor")

flag.StringVar(&dbc.param.DeprecatedDBName, "db-config-"+userKey+"-dbname", "", "deprecated: dbname does not need to be explicitly configured")

Expand Down Expand Up @@ -158,16 +160,33 @@ func (dbcfgs *DBConfigs) DbaWithDB() *mysql.ConnParams {
return dbcfgs.makeParams(Dba, true)
}

// FilteredWithDB returns connection parameters for appdebug with dbname set.
// FilteredWithDB returns connection parameters for filtered with dbname set.
func (dbcfgs *DBConfigs) FilteredWithDB() *mysql.ConnParams {
return dbcfgs.makeParams(Filtered, true)
}

// Repl returns connection parameters for appdebug with no dbname set.
// Repl returns connection parameters for repl with no dbname set.
func (dbcfgs *DBConfigs) Repl() *mysql.ConnParams {
return dbcfgs.makeParams(Repl, false)
}

// ExternalRepl returns connection parameters for repl with no dbname set.
func (dbcfgs *DBConfigs) ExternalRepl() *mysql.ConnParams {
return dbcfgs.makeParams(ExternalRepl, true)
}

// ExternalReplWithDB returns connection parameters for repl with dbname set.
func (dbcfgs *DBConfigs) ExternalReplWithDB() *mysql.ConnParams {
params := dbcfgs.makeParams(ExternalRepl, true)
// TODO @rafael: This is a hack to allows to configure external databases by providing
// db-config-erepl-dbname.
if params.DeprecatedDBName != "" {
params.DbName = params.DeprecatedDBName
return params
}
return params
}

// AppWithDB returns connection parameters for app with dbname set.
func (dbcfgs *DBConfigs) makeParams(userKey string, withDB bool) *mysql.ConnParams {
orig := dbcfgs.userConfigs[userKey]
Expand Down Expand Up @@ -238,8 +257,13 @@ func HasConnectionParams() bool {
// is used to initialize the per-user conn params.
func Init(defaultSocketFile string) (*DBConfigs, error) {
// The new base configs, if set, supersede legacy settings.
for _, uc := range dbConfigs.userConfigs {
if HasConnectionParams() {
for user, uc := range dbConfigs.userConfigs {
// TODO @rafael: For ExternalRepl we need to respect the provided host / port
// At the moment this is an snowflake user connection type that it used by
// vreplication to connect to external mysql hosts that are not part of a vitess
// cluster. In the future we need to refactor all dbconfig to support custom users
// in a more flexible way.
if HasConnectionParams() && user != ExternalRepl {
uc.param.Host = baseConfig.Host
uc.param.Port = baseConfig.Port
uc.param.UnixSocket = baseConfig.UnixSocket
Expand All @@ -253,7 +277,9 @@ func Init(defaultSocketFile string) (*DBConfigs, error) {
if baseConfig.Flags != 0 {
uc.param.Flags = baseConfig.Flags
}
uc.param.Flavor = baseConfig.Flavor
if user != ExternalRepl {
uc.param.Flavor = baseConfig.Flavor
}
if uc.useSSL {
uc.param.SslCa = baseConfig.SslCa
uc.param.SslCaPath = baseConfig.SslCaPath
Expand Down Expand Up @@ -282,12 +308,13 @@ func Init(defaultSocketFile string) (*DBConfigs, error) {
func NewTestDBConfigs(genParams, appDebugParams mysql.ConnParams, dbName string) *DBConfigs {
dbcfgs := &DBConfigs{
userConfigs: map[string]*userConfig{
App: {param: genParams},
AppDebug: {param: appDebugParams},
AllPrivs: {param: genParams},
Dba: {param: genParams},
Filtered: {param: genParams},
Repl: {param: genParams},
App: {param: genParams},
AppDebug: {param: appDebugParams},
AllPrivs: {param: genParams},
Dba: {param: genParams},
Filtered: {param: genParams},
Repl: {param: genParams},
ExternalRepl: {param: genParams},
},
}
dbcfgs.DBName.Set(dbName)
Expand Down
Loading