Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tabletmanager revamp: refactor initialization #6311

Merged
merged 32 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4eabc0c
tm revamp: remove deprecated code
sougou Jun 2, 2020
8339adf
tm revamp: simplify topo update of mysql port
sougou Jun 2, 2020
ff3145b
tm revamp: fix e2e tabletmanager test
sougou Jun 6, 2020
7d8d899
tm revamp: deprecate demoteMasterType
sougou Jun 6, 2020
0a33587
tm revamp: fix reparent test
sougou Jun 7, 2020
65084c3
tm revamp: move mysql check inside Start
sougou Jun 7, 2020
3411d62
tm revamp: break InitTablet into smaller parts
sougou Jun 8, 2020
52633d9
tm revamp: checkMysql function
sougou Jun 9, 2020
56e4a05
tm revamp: fix race conditions
sougou Jun 9, 2020
ada175d
tm revamp: updateStream initialization refactored
sougou Jun 10, 2020
a347ebd
tm revamp: break up Start into smaller parts
sougou Jun 10, 2020
6608b8c
tm revamp: refreshState does not reload tablet
sougou Jun 10, 2020
553d2d6
tm revamp: pre-populate shardInfo and srvKeyspace
sougou Jun 11, 2020
68627af
tm revamp: fix tests after rebase
sougou Jun 12, 2020
105f995
tm revamp: remove agent._masterTermStartTime
sougou Jun 12, 2020
aa7484a
tm revamp: ActionAgent -> TabletManager
sougou Jun 13, 2020
3331e9f
tm revamp: Agent -> TM
sougou Jun 13, 2020
be8cc28
tm revamp: agent -> tm
sougou Jun 13, 2020
ac077d9
tm revamp: rename 'New' functions
sougou Jun 13, 2020
2d3641a
tm revamp: make some members private
sougou Jun 13, 2020
31cb5f8
tm revamp: groundwork for different initialization
sougou Jun 14, 2020
9ed5de7
tm revamp: use new initialization method
sougou Jun 14, 2020
127e9f7
tm revamp: make VREngine and UpdateStream optional
sougou Jun 14, 2020
dbc25f0
tm revamp: unify codepaths through tm.Start
sougou Jun 14, 2020
99f385f
tm revamp: simplify some tm init tests
sougou Jun 14, 2020
fc52afe
tm revamp: TestStartBuildTabletFromInput
sougou Jun 14, 2020
f5f5e17
tm revamp: fix vtcombo healthcheck
sougou Jun 14, 2020
9254e78
tm revamp: TestStartCreateKeyspaceShard
sougou Jun 14, 2020
4442cd9
tm revamp: TestCheckMastership
sougou Jun 14, 2020
cab2687
tm recamp: test for multi-shard srvKeyspace
sougou Jun 14, 2020
7e8d2d4
tm revamp: TestStartCheckMysql
sougou Jun 15, 2020
245d708
tm revamp: address review comments
sougou Jun 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/cmd/vtbackup/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func startReplication(ctx context.Context, mysqld mysqlctl.MysqlDaemon, topoServ
}

// Stop slave (in case we're restarting), set master, and start slave.
if err := mysqld.SetMaster(ctx, topoproto.MysqlHostname(ti.Tablet), int(topoproto.MysqlPort(ti.Tablet)), true /* slaveStopBefore */, true /* slaveStartAfter */); err != nil {
if err := mysqld.SetMaster(ctx, ti.Tablet.MysqlHostname, int(ti.Tablet.MysqlPort), true /* slaveStopBefore */, true /* slaveStartAfter */); err != nil {
deepthi marked this conversation as resolved.
Show resolved Hide resolved
return vterrors.Wrap(err, "MysqlDaemon.SetMaster failed")
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions go/cmd/vttablet/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import (
"vitess.io/vitess/go/vt/servenv"
)

// This file registers a /healthz URL that reports the health of the agent.
// This file registers a /healthz URL that reports the health of the tm.

var okMessage = []byte("ok\n")

func init() {
servenv.OnRun(func() {
http.Handle("/healthz", http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
if _, err := agent.Healthy(); err != nil {
http.Error(rw, fmt.Sprintf("500 internal server error: agent not healthy: %v", err), http.StatusInternalServerError)
if _, err := tm.Healthy(); err != nil {
http.Error(rw, fmt.Sprintf("500 internal server error: tablet manager not healthy: %v", err), http.StatusInternalServerError)
return
}

Expand Down
14 changes: 5 additions & 9 deletions go/cmd/vttablet/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ var (
{{if .DisallowQueryService}}
Query Service disabled: {{.DisallowQueryService}}<br>
{{end}}
{{if .DisableUpdateStream}}
Update Stream disabled<br>
{{end}}
</td>
<td width="25%" border="">
<a href="/schemaz">Schema</a></br>
Expand Down Expand Up @@ -150,19 +147,18 @@ func healthHTMLName() template.HTML {
func addStatusParts(qsc tabletserver.Controller) {
servenv.AddStatusPart("Tablet", tabletTemplate, func() interface{} {
return map[string]interface{}{
"Tablet": topo.NewTabletInfo(agent.Tablet(), nil),
"BlacklistedTables": agent.BlacklistedTables(),
"DisallowQueryService": agent.DisallowQueryService(),
"DisableUpdateStream": !agent.EnableUpdateStream(),
"Tablet": topo.NewTabletInfo(tm.Tablet(), nil),
"BlacklistedTables": tm.BlacklistedTables(),
"DisallowQueryService": tm.DisallowQueryService(),
}
})
servenv.AddStatusFuncs(template.FuncMap{
"github_com_vitessio_vitess_health_html_name": healthHTMLName,
})
servenv.AddStatusPart("Health", healthTemplate, func() interface{} {
latest, _ := agent.History.Latest().(*tabletmanager.HealthRecord)
latest, _ := tm.History.Latest().(*tabletmanager.HealthRecord)
return &healthStatus{
Records: agent.History.Records(),
Records: tm.History.Records(),
Config: tabletmanager.ConfigHTML(),
current: latest,
}
Expand Down
111 changes: 64 additions & 47 deletions go/cmd/vttablet/vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ import (
"io/ioutil"

"golang.org/x/net/context"
"vitess.io/vitess/go/vt/binlog"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/health"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/tableacl"
"vitess.io/vitess/go/vt/tableacl/simpleacl"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/yaml2"
Expand All @@ -43,7 +47,7 @@ var (
tabletPath = flag.String("tablet-path", "", "tablet alias")
tabletConfig = flag.String("tablet_config", "", "YAML file config for tablet")

agent *tabletmanager.ActionAgent
tm *tabletmanager.TabletManager
)

func init() {
Expand All @@ -55,7 +59,61 @@ func main() {
mysqlctl.RegisterFlags()

servenv.ParseFlags("vttablet")
servenv.Init()

if *tabletPath == "" {
log.Exit("-tablet-path required")
}
tabletAlias, err := topoproto.ParseTabletAlias(*tabletPath)
if err != nil {
log.Exitf("failed to parse -tablet-path: %v", err)
}

// config and mycnf intializations are intertwined.
config, mycnf := initConfig(tabletAlias)

ts := topo.Open()
qsc := createTabletServer(config, ts, tabletAlias)

mysqld := mysqlctl.NewMysqld(config.DB)
servenv.OnClose(mysqld.Close)

// Initialize and start tm.
gRPCPort := int32(0)
if servenv.GRPCPort != nil {
gRPCPort = int32(*servenv.GRPCPort)
}
tablet, err := tabletmanager.BuildTabletFromInput(tabletAlias, int32(*servenv.Port), gRPCPort)
if err != nil {
log.Exitf("failed to parse -tablet-path: %v", err)
}
tm = &tabletmanager.TabletManager{
BatchCtx: context.Background(),
TopoServer: ts,
Cnf: mycnf,
MysqlDaemon: mysqld,
DBConfigs: config.DB.Clone(),
QueryServiceControl: qsc,
deepthi marked this conversation as resolved.
Show resolved Hide resolved
UpdateStream: binlog.NewUpdateStream(ts, tablet.Keyspace, tabletAlias.Cell, qsc.SchemaEngine()),
VREngine: vreplication.NewEngine(config, ts, tabletAlias.Cell, mysqld),
HealthReporter: health.DefaultAggregator,
}
if err := tm.Start(tablet); err != nil {
log.Exitf("failed to parse -tablet-path: %v", err)
}
servenv.OnClose(func() {
// Close the tm so that our topo entry gets pruned properly and any
// background goroutines that use the topo connection are stopped.
tm.Close()

// tm uses ts. So, it should be closed after tm.
ts.Close()
})

servenv.RunDefault()
}

func initConfig(tabletAlias *topodatapb.TabletAlias) (*tabletenv.TabletConfig, *mysqlctl.Mycnf) {
tabletenv.Init()
// Load current config after tabletenv.Init, because it changes it.
config := tabletenv.NewCurrentConfig()
Expand All @@ -75,16 +133,6 @@ func main() {
gotBytes, _ := yaml2.Marshal(config)
log.Infof("Loaded config file %s successfully:\n%s", *tabletConfig, gotBytes)

servenv.Init()

if *tabletPath == "" {
log.Exit("-tablet-path required")
}
tabletAlias, err := topoproto.ParseTabletAlias(*tabletPath)
if err != nil {
log.Exitf("failed to parse -tablet-path: %v", err)
}

var mycnf *mysqlctl.Mycnf
var socketFile string
// If no connection parameters were specified, load the mycnf file
Expand All @@ -108,54 +156,23 @@ func main() {
for _, cfg := range config.ExternalConnections {
cfg.InitWithSocket("")
}
return config, mycnf
}

func createTabletServer(config *tabletenv.TabletConfig, ts *topo.Server, tabletAlias *topodatapb.TabletAlias) *tabletserver.TabletServer {
if *tableACLConfig != "" {
// To override default simpleacl, other ACL plugins must set themselves to be default ACL factory
tableacl.Register("simpleacl", &simpleacl.Factory{})
} else if *enforceTableACLConfig {
log.Exit("table acl config has to be specified with table-acl-config flag because enforce-tableacl-config is set.")
}

// creates and registers the query service
ts := topo.Open()
qsc := tabletserver.NewTabletServer("", config, ts, *tabletAlias)
servenv.OnRun(func() {
qsc.Register()
addStatusParts(qsc)
})
servenv.OnClose(func() {
// We now leave the queryservice running during lameduck,
// so stop it in OnClose(), after lameduck is over.
qsc.StopService()
})

servenv.OnClose(qsc.StopService)
qsc.InitACL(*tableACLConfig, *enforceTableACLConfig, *tableACLConfigReloadInterval)

// Create mysqld and register the health reporter (needs to be done
// before initializing the agent, so the initial health check
// done by the agent has the right reporter)
mysqld := mysqlctl.NewMysqld(config.DB)
servenv.OnClose(mysqld.Close)

// Depends on both query and updateStream.
gRPCPort := int32(0)
if servenv.GRPCPort != nil {
gRPCPort = int32(*servenv.GRPCPort)
}
agent, err = tabletmanager.NewActionAgent(context.Background(), ts, mysqld, qsc, tabletAlias, config, mycnf, int32(*servenv.Port), gRPCPort)
if err != nil {
log.Exitf("NewActionAgent() failed: %v", err)
}

servenv.OnClose(func() {
// Close the agent so that our topo entry gets pruned properly and any
// background goroutines that use the topo connection are stopped.
agent.Close()

// We will still use the topo server during lameduck period
// to update our state, so closing it in OnClose()
ts.Close()
})

servenv.RunDefault()
return qsc
}
10 changes: 8 additions & 2 deletions go/test/endtoend/recovery/recovery_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ var (
dbPassword = "VtDbaPass"

// UseXb flag to use extra backup for recovery teseting.
UseXb = false
UseXb = false
// XbArgs are the arguments for specifying xtrabackup.
XbArgs = []string{
"-backup_engine_implementation", "xtrabackup",
"-xtrabackup_stream_mode=xbstream",
Expand All @@ -41,12 +42,14 @@ var (
}
)

// VerifyQueriesUsingVtgate verifies queries using vtgate.
func VerifyQueriesUsingVtgate(t *testing.T, session *vtgateconn.VTGateSession, query string, value string) {
qr, err := session.Execute(context.Background(), query, nil)
require.Nil(t, err)
assert.Equal(t, value, fmt.Sprintf("%v", qr.Rows[0][0]))
}

// RestoreTablet performs a PITR restore.
func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tablet *cluster.Vttablet, restoreKSName string, shardName string, keyspaceName string, commonTabletArg []string) {
tablet.ValidateTabletRestart(t)
replicaTabletArgs := commonTabletArg
Expand All @@ -69,7 +72,9 @@ func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tabl
"-enable_replication_reporter=false",
"-init_tablet_type", "replica",
"-init_keyspace", restoreKSName,
"-init_shard", shardName)
"-init_shard", shardName,
"-init_db_name_override", "vt_"+keyspaceName,
)
tablet.VttabletProcess.SupportsBackup = true
tablet.VttabletProcess.ExtraArgs = replicaTabletArgs

Expand All @@ -81,6 +86,7 @@ func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tabl
require.Nil(t, err)
}

// InsertData inserts data.
func InsertData(t *testing.T, tablet *cluster.Vttablet, index int, keyspaceName string) {
_, err := tablet.VttabletProcess.QueryTablet(fmt.Sprintf("insert into vt_insert_test (id, msg) values (%d, 'test %d')", index, index), keyspaceName, true)
require.Nil(t, err)
Expand Down
23 changes: 1 addition & 22 deletions go/test/endtoend/reparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,6 @@ func TestReparentCrossCell(t *testing.T) {
}

func TestReparentGraceful(t *testing.T) {
reparentGraceful(t, false)
}

func TestReparentGracefulRecovery(t *testing.T) {
reparentGraceful(t, true)
}

func reparentGraceful(t *testing.T, confusedMaster bool) {
defer cluster.PanicHandler(t)
ctx := context.Background()

Expand Down Expand Up @@ -270,20 +262,7 @@ func reparentGraceful(t *testing.T, confusedMaster bool) {

checkMasterTablet(t, tablet62044)

// Simulate a master that forgets it's master and becomes replica.
deepthi marked this conversation as resolved.
Show resolved Hide resolved
// PlannedReparentShard should be able to recover by reparenting to the same master again,
// as long as all tablets are available to check that it's safe.
if confusedMaster {
tablet62044.Type = "replica"
err = clusterInstance.VtctlclientProcess.InitTablet(tablet62044, tablet62044.Cell, keyspaceName, hostname, shardName)
require.Nil(t, err)

err = clusterInstance.VtctlclientProcess.ExecuteCommand("RefreshState", tablet62044.Alias)
require.Nil(t, err)
}

// Perform a graceful reparent to the same master.
// It should be idempotent, and should fix any inconsistencies if necessary
// A graceful reparent to the same master should be idempotent.
err = clusterInstance.VtctlclientProcess.ExecuteCommand(
"PlannedReparentShard",
"-keyspace_shard", fmt.Sprintf("%s/%s", keyspaceName, shardName),
Expand Down
Loading