From 15bea70256bcbefa87a0eae73911485e08323465 Mon Sep 17 00:00:00 2001 From: amecea Date: Thu, 21 Feb 2019 19:29:16 +0200 Subject: [PATCH 1/4] Refactor sidecar command, add configs --- cmd/mysql-operator-sidecar/main.go | 17 +- pkg/sidecar/app/configs.go | 234 ++++++++++++ pkg/sidecar/app/constants.go | 69 ++++ pkg/sidecar/app/util.go | 176 +++++++++ pkg/sidecar/appclone/appclone.go | 44 +-- pkg/sidecar/appconf/appconf.go | 29 +- pkg/sidecar/apphelper/apphelper.go | 77 ++-- pkg/sidecar/apphelper/server.go | 22 +- pkg/sidecar/apptakebackup/apptakebackup.go | 12 +- pkg/sidecar/util/util.go | 397 --------------------- 10 files changed, 579 insertions(+), 498 deletions(-) create mode 100644 pkg/sidecar/app/configs.go create mode 100644 pkg/sidecar/app/constants.go create mode 100644 pkg/sidecar/app/util.go delete mode 100644 pkg/sidecar/util/util.go diff --git a/cmd/mysql-operator-sidecar/main.go b/cmd/mysql-operator-sidecar/main.go index ca7a2607b..919c11e8c 100644 --- a/cmd/mysql-operator-sidecar/main.go +++ b/cmd/mysql-operator-sidecar/main.go @@ -25,6 +25,7 @@ import ( "github.com/spf13/cobra" "sigs.k8s.io/controller-runtime/pkg/runtime/signals" + "github.com/presslabs/mysql-operator/pkg/sidecar/app" "github.com/presslabs/mysql-operator/pkg/sidecar/appclone" "github.com/presslabs/mysql-operator/pkg/sidecar/appconf" "github.com/presslabs/mysql-operator/pkg/sidecar/apphelper" @@ -59,11 +60,14 @@ func main() { // setup logging logf.SetLogger(logf.ZapLogger(debug)) + // init configs + cfg := app.NewBasicConfig(stopCh) + confCmd := &cobra.Command{ Use: "init-configs", Short: "Init subcommand, for init files.", Run: func(cmd *cobra.Command, args []string) { - err := appconf.RunConfigCommand(stopCh) + err := appconf.RunConfigCommand(cfg) if err != nil { log.Error(err, "init command failed") os.Exit(1) @@ -76,7 +80,7 @@ func main() { Use: "clone", Short: "Clone data from a bucket or prior node.", Run: func(cmd *cobra.Command, args []string) { - err := appclone.RunCloneCommand(stopCh) + err := appclone.RunCloneCommand(cfg) if err != nil { log.Error(err, "clone command failed") os.Exit(1) @@ -85,18 +89,19 @@ func main() { } cmd.AddCommand(cloneCmd) - helperCmd := &cobra.Command{ + sidecarCmd := &cobra.Command{ Use: "run", Short: "Configs mysql users, replication, and serve backups.", Run: func(cmd *cobra.Command, args []string) { - err := apphelper.RunRunCommand(stopCh) + mysqlCFG := app.NewMysqlConfig(cfg) + err := apphelper.RunRunCommand(mysqlCFG) if err != nil { log.Error(err, "run command failed") os.Exit(1) } }, } - cmd.AddCommand(helperCmd) + cmd.AddCommand(sidecarCmd) takeBackupCmd := &cobra.Command{ Use: "take-backup-to", @@ -108,7 +113,7 @@ func main() { return nil }, Run: func(cmd *cobra.Command, args []string) { - err := apptakebackup.RunTakeBackupCommand(stopCh, args[0], args[1]) + err := apptakebackup.RunTakeBackupCommand(cfg, args[0], args[1]) if err != nil { log.Error(err, "take backup command failed") os.Exit(1) diff --git a/pkg/sidecar/app/configs.go b/pkg/sidecar/app/configs.go new file mode 100644 index 000000000..2830354a4 --- /dev/null +++ b/pkg/sidecar/app/configs.go @@ -0,0 +1,234 @@ +/* +Copyright 2019 Pressinfra SRL + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "fmt" + "os" + "path" + "strconv" + "strings" + + "github.com/go-ini/ini" + // add mysql driver + _ "github.com/go-sql-driver/mysql" + + "github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster" + orc "github.com/presslabs/mysql-operator/pkg/orchestrator" +) + +// NodeRole represents the kind of the MySQL server +type NodeRole string + +const ( + // MasterNode represents the master role for MySQL server + MasterNode NodeRole = "master" + // SlaveNode represents the slave role for MySQL server + SlaveNode NodeRole = "slave" +) + +// BaseConfig contains information related with the pod. +type BaseConfig struct { + StopCh <-chan struct{} + + // Hostname represents the pod hostname + Hostname string + // ClusterName is the MySQL cluster name + ClusterName string + // Namespace represents the namespace where the pod is in + Namespace string + // ServiceName is the name of the headless service + ServiceName string + + // NodeRole represents the MySQL role of the node, can be on of: msater, slave + NodeRole NodeRole + // ServerID represents the MySQL server id + ServerID int + + // InitBucketURL represents the init bucket to initialize mysql + InitBucketURL *string + + // OrchestratorURL is the URL to connect to orchestrator + OrchestratorURL *string + + // MasterHost represents the cluster master hostname + MasterHost string + + // backup user and password for http endpoint + BackupUser string + BackupPassword string +} + +// GetHostFor returns the pod hostname for given MySQL server id +func (cfg *BaseConfig) GetHostFor(id int) string { + base := mysqlcluster.GetNameForResource(mysqlcluster.StatefulSet, cfg.ClusterName) + return fmt.Sprintf("%s-%d.%s.%s", base, id-100, cfg.ServiceName, cfg.Namespace) +} + +func (cfg *BaseConfig) getOrcClient() orc.Interface { + if cfg.OrchestratorURL == nil { + return nil + } + + return orc.NewFromURI(*cfg.OrchestratorURL) +} + +func (cfg *BaseConfig) getFQClusterName() string { + return fmt.Sprintf("%s.%s", cfg.ClusterName, cfg.Namespace) +} + +func (cfg *BaseConfig) getMasterHost() string { + if client := cfg.getOrcClient(); client != nil { + if master, err := client.Master(cfg.getFQClusterName()); err == nil { + return master.Key.Hostname + } + } + + log.V(-1).Info("failed to obtain master from orchestrator, go for default master", + "master", cfg.GetHostFor(100)) + return cfg.GetHostFor(100) + +} + +// NewBasicConfig returns a pointer to BaseConfig configured from environment variables +func NewBasicConfig(stop <-chan struct{}) *BaseConfig { + cfg := &BaseConfig{ + StopCh: stop, + Hostname: getEnvValue("HOSTNAME"), + ClusterName: getEnvValue("MY_CLUSTER_NAME"), + Namespace: getEnvValue("MY_NAMESPACE"), + ServiceName: getEnvValue("MY_SERVICE_NAME"), + + InitBucketURL: getEnvP("INIT_BUCKET_URI"), + OrchestratorURL: getEnvP("ORCHESTRATOR_URI"), + + BackupUser: getEnvValue("MYSQL_BACKUP_USER"), + BackupPassword: getEnvValue("MYSQL_BACKUP_PASSWORD"), + } + + // get master host + cfg.MasterHost = cfg.getMasterHost() + + // set node role + cfg.NodeRole = SlaveNode + if cfg.Hostname == cfg.MasterHost { + cfg.NodeRole = MasterNode + } + + // get server id + ordinal := getOrdinalFromHostname(cfg.Hostname) + cfg.ServerID = ordinal + 100 + + return cfg +} + +func getEnvValue(key string) string { + value := os.Getenv(key) + if len(value) == 0 { + log.Info("envirorment is not set", "key", key) + } + + return value +} + +func getEnvP(key string) *string { + if value := getEnvValue(key); len(value) != 0 { + return &value + } + return nil +} + +func getOrdinalFromHostname(hn string) int { + // mysql-master-1 + // or + // stateful-ceva-3 + l := strings.Split(hn, "-") + for i := len(l) - 1; i >= 0; i-- { + if o, err := strconv.ParseInt(l[i], 10, 8); err == nil { + return int(o) + } + } + + return 0 +} + +// MysqlConfig contains extra information to connect or configure MySQL server +type MysqlConfig struct { + // inherit from base config + BaseConfig + + MysqlDSN *string + + // replication user and password + ReplicationUser string + ReplicationPassword string + + // metrcis exporter user and password + MetricsUser string + MetricsPassword string + + // orchestrator credentials + OrchestratorUser string + OrchestratorPassword string +} + +// NewMysqlConfig returns a pointer to MysqlConfig +func NewMysqlConfig(cfg *BaseConfig) *MysqlConfig { + mycfg := &MysqlConfig{ + BaseConfig: *cfg, + + ReplicationUser: getEnvValue("MYSQL_REPLICATION_USER"), + ReplicationPassword: getEnvValue("MYSQL_REPLICATION_PASSWORD"), + + MetricsUser: getEnvValue("MYSQL_METRICS_EXPORTER_USER"), + MetricsPassword: getEnvValue("MYSQL_METRICS_EXPORTER_PASSWORD"), + + OrchestratorUser: getEnvValue("MYSQL_ORC_TOPOLOGY_USER"), + OrchestratorPassword: getEnvValue("MYSQL_ORC_TOPOLOGY_PASSWORD"), + } + + // set connection DSN to MySQL + var err error + if mycfg.MysqlDSN, err = getMySQLConnectionString(); err != nil { + log.Error(err, "get MySQL DSN") + } + + return mycfg +} + +// getMySQLConnectionString returns the mysql DSN +func getMySQLConnectionString() (*string, error) { + cnfPath := path.Join(ConfigDir, "client.cnf") + cfg, err := ini.Load(cnfPath) + if err != nil { + return nil, fmt.Errorf("Could not open %s: %s", cnfPath, err) + } + + client := cfg.Section("client") + host := client.Key("host").String() + user := client.Key("user").String() + password := client.Key("password").String() + port, err := client.Key("port").Int() + + if err != nil { + return nil, fmt.Errorf("Invalid port in %s: %s", cnfPath, err) + } + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?timeout=5s&multiStatements=true&interpolateParams=true", + user, password, host, port, + ) + return &dsn, nil +} diff --git a/pkg/sidecar/app/constants.go b/pkg/sidecar/app/constants.go new file mode 100644 index 000000000..0d94b94a0 --- /dev/null +++ b/pkg/sidecar/app/constants.go @@ -0,0 +1,69 @@ +/* +Copyright 2018 Pressinfra SRL + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "strconv" + + // add mysql driver + _ "github.com/go-sql-driver/mysql" + + "github.com/presslabs/mysql-operator/pkg/util/constants" +) + +var ( + // MysqlPort represents port on which mysql works + MysqlPort = strconv.Itoa(constants.MysqlPort) + + // ConfigDir is the mysql configs path, /etc/mysql + ConfigDir = constants.ConfVolumeMountPath + + // ConfDPath is /etc/mysql/conf.d + ConfDPath = constants.ConfDPath + + // MountConfigDir is the mounted configs that needs processing + MountConfigDir = constants.ConfMapVolumeMountPath + + // DataDir is the mysql data. /var/lib/mysql + DataDir = constants.DataVolumeMountPath + + // ToolsDbName is the name of the tools table + ToolsDbName = constants.HelperDbName + // ToolsInitTableName is the name of the init table + ToolsInitTableName = "init" + + // UtilityUser is the name of the percona utility user. + UtilityUser = "sys_utility_sidecar" + + // OrcTopologyDir contains the path where the secret with orc credentials is + // mounted. + OrcTopologyDir = constants.OrcTopologyDir + + // ServerPort http server port + ServerPort = constants.SidecarServerPort + // ServerProbeEndpoint is the http server endpoint for probe + ServerProbeEndpoint = constants.SidecarServerProbePath + // ServerBackupEndpoint is the http server endpoint for backups + ServerBackupEndpoint = "/xbackup" +) + +const ( + // RcloneConfigFile represents the path to the file that contains rclon + // configs. This path should be the same as defined in docker entrypoint + // script from mysql-operator-sidecar/docker-entrypoint.sh. /etc/rclone.conf + RcloneConfigFile = "/etc/rclone.conf" +) diff --git a/pkg/sidecar/app/util.go b/pkg/sidecar/app/util.go new file mode 100644 index 000000000..4d684302e --- /dev/null +++ b/pkg/sidecar/app/util.go @@ -0,0 +1,176 @@ +/* +Copyright 2018 Pressinfra SRL + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "bufio" + "database/sql" + "fmt" + "io" + "net/http" + "os" + + // add mysql driver + _ "github.com/go-sql-driver/mysql" + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" +) + +var log = logf.Log.WithName("sidecar.app") + +// RunQuery executes a query +func RunQuery(cfg *MysqlConfig, q string, args ...interface{}) error { + if cfg.MysqlDSN == nil { + log.Info("could not get mysql connection DSN") + return fmt.Errorf("no DSN specified") + } + + db, err := sql.Open("mysql", *cfg.MysqlDSN) + if err != nil { + return err + } + + log.V(4).Info("running query", "query", q, "args", args) + if _, err := db.Exec(q, args...); err != nil { + return err + } + + return nil +} + +// CopyFile the src file to dst. Any existing file will be overwritten and will not +// copy file attributes. +// nolint: gosec +func CopyFile(src, dst string) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer func() { + if err1 := in.Close(); err1 != nil { + log.Error(err1, "failed to close source file", "src_file", src) + } + }() + + out, err := os.Create(dst) + if err != nil { + return err + } + defer func() { + if err1 := out.Close(); err1 != nil { + log.Error(err1, "failed to close destination file", "dest_file", dst) + } + }() + + _, err = io.Copy(out, in) + if err != nil { + return err + } + return nil +} + +// MaxClients limit an http endpoint to allow just n max concurrent connections +func MaxClients(h http.Handler, n int) http.Handler { + sema := make(chan struct{}, n) + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sema <- struct{}{} + defer func() { <-sema }() + + h.ServeHTTP(w, r) + }) +} + +// RequestABackup connects to specified host and endpoint and gets the backup +func RequestABackup(cfg *BaseConfig, host, endpoint string) (io.Reader, error) { + log.Info("initialize a backup", "host", host, "endpoint", endpoint) + + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s:%d%s", host, ServerPort, endpoint), nil) + if err != nil { + return nil, fmt.Errorf("fail to create request: %s", err) + } + + // set authentification user and password + req.SetBasicAuth(cfg.BackupUser, cfg.BackupPassword) + + client := &http.Client{} + + resp, err := client.Do(req) + if err != nil || resp.StatusCode != 200 { + status := "unknown" + if resp != nil { + status = resp.Status + } + return nil, fmt.Errorf("fail to get backup: %s, code: %s", err, status) + } + + return resp.Body, nil +} + +// ReadPurgedGTID returns the GTID from xtrabackup_binlog_info file +func ReadPurgedGTID() (string, error) { + file, err := os.Open(fmt.Sprintf("%s/xtrabackup_binlog_info", DataDir)) + if err != nil { + return "", err + } + + defer func() { + if err1 := file.Close(); err1 != nil { + log.Error(err1, "failed to close file") + } + }() + + return getGTIDFrom(file) +} + +// getGTIDFrom parse the content from xtrabackup_binlog_info file passed as +// io.Reader and extracts the GTID. +func getGTIDFrom(reader io.Reader) (string, error) { + scanner := bufio.NewScanner(reader) + scanner.Split(bufio.ScanWords) + + count := 0 + gtid := "" + for scanner.Scan() { + if count == 2 { + gtid = scanner.Text() + } + count++ + } + + if err := scanner.Err(); err != nil { + return "", err + } else if len(gtid) == 0 { + return "", fmt.Errorf("failed to read GTID reached EOF") + } + + return gtid, nil +} + +// ShouldBootstrapNode checks if the mysql data is at the first initialization +func ShouldBootstrapNode() bool { + _, err := os.Open(fmt.Sprintf("%s/%s/%s.CSV", DataDir, + ToolsDbName, ToolsInitTableName)) + if os.IsNotExist(err) { + return true + } else if err != nil { + log.Error(err, "first init check failed hard") + return true + } + + // maybe check csv init data and log it + return false +} diff --git a/pkg/sidecar/appclone/appclone.go b/pkg/sidecar/appclone/appclone.go index 9430098df..7ab384fc8 100644 --- a/pkg/sidecar/appclone/appclone.go +++ b/pkg/sidecar/appclone/appclone.go @@ -24,18 +24,18 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - "github.com/presslabs/mysql-operator/pkg/sidecar/util" + "github.com/presslabs/mysql-operator/pkg/sidecar/app" ) var log = logf.Log.WithName("sidecar.appclone") // RunCloneCommand clone the data from source. // nolint: gocyclo -func RunCloneCommand(stopCh <-chan struct{}) error { - log.Info("clonning command", "host", util.GetHostname()) +func RunCloneCommand(cfg *app.BaseConfig) error { + log.Info("clonning command", "host", cfg.Hostname) // skip cloning if data exists. - if !util.ShouldBootstrapNode() { + if !app.ShouldBootstrapNode() { log.Info("data exists and is initialized, skipping cloning.") return nil } @@ -49,22 +49,21 @@ func RunCloneCommand(stopCh <-chan struct{}) error { return fmt.Errorf("removing lost+found: %s", err) } - if util.NodeRole() == "master" { - initBucket := util.GetInitBucket() - if len(initBucket) == 0 { + if cfg.NodeRole == app.MasterNode { + if cfg.InitBucketURL == nil { log.Info("skip cloning init bucket uri is not set.") // let mysqld initialize data dir return nil } - err := cloneFromBucket(initBucket) + err := cloneFromBucket(*cfg.InitBucketURL) if err != nil { return fmt.Errorf("failed to clone from bucket, err: %s", err) } } else { // clonging from prior node - if util.GetServerID() > 100 { - sourceHost := util.GetHostFor(util.GetServerID() - 1) - err := cloneFromSource(sourceHost) + if cfg.ServerID > 100 { + sourceHost := cfg.GetHostFor(cfg.ServerID - 1) + err := cloneFromSource(cfg, sourceHost) if err != nil { return fmt.Errorf("failed to clone from %s, err: %s", sourceHost, err) } @@ -88,7 +87,7 @@ func cloneFromBucket(initBucket string) error { log.Info("cloning from bucket", "bucket", initBucket) - if _, err := os.Stat(util.RcloneConfigFile); os.IsNotExist(err) { + if _, err := os.Stat(app.RcloneConfigFile); os.IsNotExist(err) { log.Error(err, "rclone config file does not exists") return err } @@ -96,7 +95,7 @@ func cloneFromBucket(initBucket string) error { // writes to stdout the content of the bucket uri // nolint: gosec rclone := exec.Command("rclone", "-vv", - fmt.Sprintf("--config=%s", util.RcloneConfigFile), "cat", initBucket) + fmt.Sprintf("--config=%s", app.RcloneConfigFile), "cat", initBucket) // gzip reads from stdin decompress and then writes to stdout // nolint: gosec @@ -106,7 +105,7 @@ func cloneFromBucket(initBucket string) error { // extracts files from stdin (-x) and writes them to mysql // data target dir // nolint: gosec - xbstream := exec.Command("xbstream", "-x", "-C", util.DataDir) + xbstream := exec.Command("xbstream", "-x", "-C", app.DataDir) var err error // rclone | gzip | xbstream @@ -150,10 +149,10 @@ func cloneFromBucket(initBucket string) error { return nil } -func cloneFromSource(host string) error { +func cloneFromSource(cfg *app.BaseConfig, host string) error { log.Info("cloning from node", "host", host) - backupBody, err := util.RequestABackup(host, util.ServerBackupEndpoint) + backupBody, err := app.RequestABackup(cfg, host, app.ServerBackupEndpoint) if err != nil { return fmt.Errorf("fail to get backup: %s", err) } @@ -162,7 +161,7 @@ func cloneFromSource(host string) error { // extracts files from stdin (-x) and writes them to mysql // data target dir // nolint: gosec - xbstream := exec.Command("xbstream", "-x", "-C", util.DataDir) + xbstream := exec.Command("xbstream", "-x", "-C", app.DataDir) xbstream.Stdin = backupBody xbstream.Stderr = os.Stderr @@ -179,14 +178,9 @@ func cloneFromSource(host string) error { } func xtrabackupPreperData() error { - replUser := util.GetReplUser() - replPass := util.GetReplPass() - - // TODO: remove user and password for here, not needed. // nolint: gosec xtbkCmd := exec.Command("xtrabackup", "--prepare", - fmt.Sprintf("--target-dir=%s", util.DataDir), - fmt.Sprintf("--user=%s", replUser), fmt.Sprintf("--password=%s", replPass)) + fmt.Sprintf("--target-dir=%s", app.DataDir)) xtbkCmd.Stderr = os.Stderr @@ -195,7 +189,7 @@ func xtrabackupPreperData() error { // nolint: gosec func checkIfDataExists() bool { - path := fmt.Sprintf("%s/mysql", util.DataDir) + path := fmt.Sprintf("%s/mysql", app.DataDir) _, err := os.Open(path) if os.IsNotExist(err) { @@ -208,6 +202,6 @@ func checkIfDataExists() bool { } func deleteLostFound() error { - path := fmt.Sprintf("%s/lost+found", util.DataDir) + path := fmt.Sprintf("%s/lost+found", app.DataDir) return os.RemoveAll(path) } diff --git a/pkg/sidecar/appconf/appconf.go b/pkg/sidecar/appconf/appconf.go index 9f3adc620..a9e01aac5 100644 --- a/pkg/sidecar/appconf/appconf.go +++ b/pkg/sidecar/appconf/appconf.go @@ -24,7 +24,7 @@ import ( "github.com/go-ini/ini" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - "github.com/presslabs/mysql-operator/pkg/sidecar/util" + "github.com/presslabs/mysql-operator/pkg/sidecar/app" pkgutil "github.com/presslabs/mysql-operator/pkg/util" ) @@ -35,44 +35,43 @@ const ( ) // RunConfigCommand generates my.cnf, client.cnf and 10-dynamic.cnf files. -func RunConfigCommand(stopCh <-chan struct{}) error { - role := util.NodeRole() - log.Info("configuring server", "host", util.GetHostname(), "role", role) +func RunConfigCommand(cfg *app.BaseConfig) error { + log.Info("configuring server", "host", cfg.Hostname, "role", cfg.NodeRole) - if err := util.CopyFile(util.MountConfigDir+"/my.cnf", util.ConfigDir+"/my.cnf"); err != nil { + if err := app.CopyFile(app.MountConfigDir+"/my.cnf", app.ConfigDir+"/my.cnf"); err != nil { return fmt.Errorf("copy file my.cnf: %s", err) } uPass := pkgutil.RandomString(rStrLen) - reportHost := util.GetHostFor(util.GetServerID()) + reportHost := cfg.GetHostFor(cfg.ServerID) var dynCFG, utilityCFG, clientCFG *ini.File var err error - if dynCFG, err = getDynamicConfigs(util.GetServerID(), reportHost); err != nil { + if dynCFG, err = getDynamicConfigs(cfg.ServerID, reportHost); err != nil { return fmt.Errorf("failed to get dynamic configs: %s", err) } - if err = os.Mkdir(util.ConfDPath, os.FileMode(0755)); err != nil { + if err = os.Mkdir(app.ConfDPath, os.FileMode(0755)); err != nil { if !os.IsExist(err) { - return fmt.Errorf("error mkdir %s/conf.d: %s", util.ConfigDir, err) + return fmt.Errorf("error mkdir %s/conf.d: %s", app.ConfigDir, err) } } - if err = dynCFG.SaveTo(util.ConfDPath + "/10-dynamic.cnf"); err != nil { + if err = dynCFG.SaveTo(app.ConfDPath + "/10-dynamic.cnf"); err != nil { return fmt.Errorf("failed to save configs: %s", err) } - if utilityCFG, err = getUtilityUserConfigs(util.UtilityUser, uPass); err != nil { + if utilityCFG, err = getUtilityUserConfigs(app.UtilityUser, uPass); err != nil { return fmt.Errorf("failed to configure utility user: %s", err) } - if err = utilityCFG.SaveTo(util.ConfDPath + "/10-utility-user.cnf"); err != nil { + if err = utilityCFG.SaveTo(app.ConfDPath + "/10-utility-user.cnf"); err != nil { return fmt.Errorf("failed to configure utility user: %s", err) } - if clientCFG, err = getClientConfigs(util.UtilityUser, uPass); err != nil { + if clientCFG, err = getClientConfigs(app.UtilityUser, uPass); err != nil { return fmt.Errorf("failed to get client configs: %s", err) } - if err = clientCFG.SaveTo(util.ConfigDir + "/client.cnf"); err != nil { + if err = clientCFG.SaveTo(app.ConfigDir + "/client.cnf"); err != nil { return fmt.Errorf("failed to save configs: %s", err) } @@ -87,7 +86,7 @@ func getClientConfigs(user, pass string) (*ini.File, error) { if _, err := client.NewKey("host", "127.0.0.1"); err != nil { return nil, err } - if _, err := client.NewKey("port", util.MysqlPort); err != nil { + if _, err := client.NewKey("port", app.MysqlPort); err != nil { return nil, err } if _, err := client.NewKey("user", user); err != nil { diff --git a/pkg/sidecar/apphelper/apphelper.go b/pkg/sidecar/apphelper/apphelper.go index c6ff31182..4db7fce8d 100644 --- a/pkg/sidecar/apphelper/apphelper.go +++ b/pkg/sidecar/apphelper/apphelper.go @@ -22,7 +22,7 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - "github.com/presslabs/mysql-operator/pkg/sidecar/util" + "github.com/presslabs/mysql-operator/pkg/sidecar/app" ) var log = logf.Log.WithName("sidecar.apphelper") @@ -36,64 +36,62 @@ const ( // RunRunCommand is the main command, and represents the runtime helper that // configures the mysql server -func RunRunCommand(stopCh <-chan struct{}) error { +func RunRunCommand(cfg *app.MysqlConfig) error { log.Info("start initialization") // wait for mysql to be ready - if err := waitForMysqlReady(); err != nil { + if err := waitForMysqlReady(cfg); err != nil { return fmt.Errorf("mysql is not ready, err: %s", err) } // deactivate super read only log.Info("temporary disable SUPER_READ_ONLY") - if err := util.RunQuery("SET GLOBAL READ_ONLY = 1; SET GLOBAL SUPER_READ_ONLY = 0;"); err != nil { + if err := app.RunQuery(cfg, "SET GLOBAL READ_ONLY = 1; SET GLOBAL SUPER_READ_ONLY = 0;"); err != nil { return fmt.Errorf("failed to configure master node, err: %s", err) } // update orchestrator user and password if orchestrator is configured log.V(1).Info("configure orchestrator credentials") - if len(util.GetOrcUser()) > 0 { - if err := configureOrchestratorUser(); err != nil { - return err - } + if err := configureOrchestratorUser(cfg); err != nil { + return err } // update replication user and password log.V(1).Info("configure replication credentials") - if err := configureReplicationUser(); err != nil { + if err := configureReplicationUser(cfg); err != nil { return err } // update metrics exporter user and password log.V(1).Info("configure metrics exporter credentials") - if err := configureExporterUser(); err != nil { + if err := configureExporterUser(cfg); err != nil { return err } // if it's slave set replication source (master host) log.V(1).Info("configure topology") - if err := configTopology(); err != nil { + if err := configTopology(cfg); err != nil { return err } // mark setup as complete by writing a row in config table log.V(1).Info("flag setup as complet") - if err := markConfigurationDone(); err != nil { + if err := markConfigurationDone(cfg); err != nil { return err } // if it's master node then make it writtable else make it read only log.V(1).Info("configure read only flag") - if err := configReadOnly(); err != nil { + if err := configReadOnly(cfg); err != nil { return err } log.V(1).Info("start http server") - srv := newServer(stopCh) + srv := newServer(cfg) return srv.ListenAndServe() } -func configureOrchestratorUser() error { +func configureOrchestratorUser(cfg *app.MysqlConfig) error { query := ` SET @@SESSION.SQL_LOG_BIN = 0; GRANT SUPER, PROCESS, REPLICATION SLAVE, REPLICATION CLIENT, RELOAD ON *.* TO ?@'%%' IDENTIFIED BY ?; @@ -104,49 +102,50 @@ func configureOrchestratorUser() error { // insert toolsDBName, it's not user input so it's safe. Can't use // placeholders for table names, see: // https://github.com/golang/go/issues/18478 - query = fmt.Sprintf(query, util.ToolsDbName) + query = fmt.Sprintf(query, app.ToolsDbName) - if err := util.RunQuery(query, util.GetOrcUser(), util.GetOrcPass(), util.GetOrcUser(), util.GetOrcUser()); err != nil { + if err := app.RunQuery(cfg, query, cfg.OrchestratorUser, cfg.OrchestratorPassword, + cfg.OrchestratorUser, cfg.OrchestratorPassword); err != nil { return fmt.Errorf("failed to configure orchestrator (user/pass/access), err: %s", err) } return nil } -func configureReplicationUser() error { +func configureReplicationUser(cfg *app.MysqlConfig) error { query := ` SET @@SESSION.SQL_LOG_BIN = 0; GRANT SELECT, PROCESS, RELOAD, LOCK TABLES, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO ?@'%' IDENTIFIED BY ?; ` - if err := util.RunQuery(query, util.GetReplUser(), util.GetReplPass()); err != nil { + if err := app.RunQuery(cfg, query, cfg.ReplicationUser, cfg.ReplicationPassword); err != nil { return fmt.Errorf("failed to configure replication user: %s", err) } return nil } -func configureExporterUser() error { +func configureExporterUser(cfg *app.MysqlConfig) error { query := ` SET @@SESSION.SQL_LOG_BIN = 0; GRANT SELECT, PROCESS, REPLICATION CLIENT ON *.* TO ?@'127.0.0.1' IDENTIFIED BY ? WITH MAX_USER_CONNECTIONS 3; ` - if err := util.RunQuery(query, util.GetExporterUser(), util.GetExporterPass()); err != nil { + if err := app.RunQuery(cfg, query, cfg.MetricsUser, cfg.MetricsPassword); err != nil { return fmt.Errorf("failed to metrics exporter user: %s", err) } return nil } -func waitForMysqlReady() error { +func waitForMysqlReady(cfg *app.MysqlConfig) error { log.V(1).Info("wait for mysql to be ready") for i := 0; i < timeOut; i++ { time.Sleep(1 * time.Second) - if err := util.RunQuery("SELECT 1"); err == nil { + if err := app.RunQuery(cfg, "SELECT 1"); err == nil { break } } - if err := util.RunQuery("SELECT 1"); err != nil { + if err := app.RunQuery(cfg, "SELECT 1"); err != nil { log.V(1).Info("mysql is not ready", "error", err) return err } @@ -156,27 +155,27 @@ func waitForMysqlReady() error { } -func configReadOnly() error { +func configReadOnly(cfg *app.MysqlConfig) error { var query string - if util.NodeRole() == "master" { + if cfg.NodeRole == app.MasterNode { query = "SET GLOBAL READ_ONLY = 0" } else { query = "SET GLOBAL SUPER_READ_ONLY = 1" } - if err := util.RunQuery(query); err != nil { + if err := app.RunQuery(cfg, query); err != nil { return fmt.Errorf("failed to set read_only config, err: %s", err) } return nil } -func configTopology() error { - if util.NodeRole() == "slave" { +func configTopology(cfg *app.MysqlConfig) error { + if cfg.NodeRole == app.SlaveNode { log.Info("setting up as slave") - if util.ShouldBootstrapNode() { + if app.ShouldBootstrapNode() { log.Info("doing bootstrap") - if gtid, err := util.ReadPurgedGTID(); err == nil { + if gtid, err := app.ReadPurgedGTID(); err == nil { log.Info("RESET MASTER and setting GTID_PURGED", "gtid", gtid) - if errQ := util.RunQuery("RESET MASTER; SET GLOBAL GTID_PURGED=?", gtid); errQ != nil { + if errQ := app.RunQuery(cfg, "RESET MASTER; SET GLOBAL GTID_PURGED=?", gtid); errQ != nil { return errQ } } else { @@ -192,14 +191,14 @@ func configTopology() error { MASTER_PASSWORD=?, MASTER_CONNECT_RETRY=?; ` - if err := util.RunQuery(query, - util.GetMasterHost(), util.GetReplUser(), util.GetReplPass(), connRetry, + if err := app.RunQuery(cfg, query, + cfg.MasterHost, cfg.ReplicationUser, cfg.ReplicationPassword, connRetry, ); err != nil { return fmt.Errorf("failed to configure slave node, err: %s", err) } query = "START SLAVE;" - if err := util.RunQuery(query); err != nil { + if err := app.RunQuery(cfg, query); err != nil { log.Info("failed to start slave in the simple mode, trying a second method") // TODO: https://bugs.mysql.com/bug.php?id=83713 query2 := ` @@ -209,7 +208,7 @@ func configTopology() error { reset slave; start slave; ` - if err := util.RunQuery(query2); err != nil { + if err := app.RunQuery(cfg, query2); err != nil { return fmt.Errorf("failed to start slave node, err: %s", err) } } @@ -218,7 +217,7 @@ func configTopology() error { return nil } -func markConfigurationDone() error { +func markConfigurationDone(cfg *app.MysqlConfig) error { query := ` SET @@SESSION.SQL_LOG_BIN = 0; BEGIN; @@ -235,9 +234,9 @@ func markConfigurationDone() error { // insert tables and databases names. It's safe because is not user input. // see: https://github.com/golang/go/issues/18478 - query = fmt.Sprintf(query, util.ToolsDbName, util.ToolsInitTableName) + query = fmt.Sprintf(query, app.ToolsDbName, app.ToolsInitTableName) - if err := util.RunQuery(query, util.GetHostname()); err != nil { + if err := app.RunQuery(cfg, query, cfg.Hostname); err != nil { return fmt.Errorf("failed to mark configuration done, err: %s", err) } diff --git a/pkg/sidecar/apphelper/server.go b/pkg/sidecar/apphelper/server.go index 3a0e4397c..73b9a09f4 100644 --- a/pkg/sidecar/apphelper/server.go +++ b/pkg/sidecar/apphelper/server.go @@ -24,29 +24,31 @@ import ( "os" "os/exec" - "github.com/presslabs/mysql-operator/pkg/sidecar/util" + "github.com/presslabs/mysql-operator/pkg/sidecar/app" ) type server struct { + cfg *app.MysqlConfig http.Server } -func newServer(stop <-chan struct{}) *server { +func newServer(cfg *app.MysqlConfig) *server { mux := http.NewServeMux() srv := &server{ + cfg: cfg, Server: http.Server{ - Addr: fmt.Sprintf(":%d", util.ServerPort), + Addr: fmt.Sprintf(":%d", app.ServerPort), Handler: mux, }, } // Add handle functions - mux.HandleFunc(util.ServerProbeEndpoint, srv.healthHandler) - mux.Handle(util.ServerBackupEndpoint, util.MaxClients(http.HandlerFunc(srv.backupHandler), 1)) + mux.HandleFunc(app.ServerProbeEndpoint, srv.healthHandler) + mux.Handle(app.ServerBackupEndpoint, app.MaxClients(http.HandlerFunc(srv.backupHandler), 1)) // Shutdown gracefully the http server go func() { - <-stop // wait for stop signal + <-cfg.StopCh // wait for stop signal if err := srv.Shutdown(context.Background()); err != nil { log.Error(err, "failed to stop http server") @@ -82,9 +84,9 @@ func (s *server) backupHandler(w http.ResponseWriter, r *http.Request) { // nolint: gosec xtrabackup := exec.Command("xtrabackup", "--backup", "--slave-info", "--stream=xbstream", - fmt.Sprintf("--tables-exclude=%s.%s", util.ToolsDbName, util.ToolsInitTableName), - "--host=127.0.0.1", fmt.Sprintf("--user=%s", util.GetReplUser()), - fmt.Sprintf("--password=%s", util.GetReplPass())) + fmt.Sprintf("--tables-exclude=%s.%s", app.ToolsDbName, app.ToolsInitTableName), + "--host=127.0.0.1", fmt.Sprintf("--user=%s", s.cfg.ReplicationUser), + fmt.Sprintf("--password=%s", s.cfg.ReplicationPassword)) xtrabackup.Stderr = os.Stderr @@ -123,5 +125,5 @@ func (s *server) backupHandler(w http.ResponseWriter, r *http.Request) { func (s *server) isAuthenticated(r *http.Request) bool { user, pass, ok := r.BasicAuth() - return ok && user == util.GetBackupUser() && pass == util.GetBackupPass() + return ok && user == s.cfg.BackupUser && pass == s.cfg.BackupPassword } diff --git a/pkg/sidecar/apptakebackup/apptakebackup.go b/pkg/sidecar/apptakebackup/apptakebackup.go index 4487467fc..e23a01b56 100644 --- a/pkg/sidecar/apptakebackup/apptakebackup.go +++ b/pkg/sidecar/apptakebackup/apptakebackup.go @@ -24,22 +24,22 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - "github.com/presslabs/mysql-operator/pkg/sidecar/util" + "github.com/presslabs/mysql-operator/pkg/sidecar/app" ) var log = logf.Log.WithName("sidecar.apptakebackup") // RunTakeBackupCommand starts a backup command // nolint: unparam -func RunTakeBackupCommand(stopCh <-chan struct{}, srcHost, destBucket string) error { +func RunTakeBackupCommand(cfg *app.BaseConfig, srcHost, destBucket string) error { log.Info("take a backup", "host", srcHost, "bucket", destBucket) destBucket = normalizeBucketURI(destBucket) - return pushBackupFromTo(srcHost, destBucket) + return pushBackupFromTo(cfg, srcHost, destBucket) } -func pushBackupFromTo(srcHost, destBucket string) error { +func pushBackupFromTo(cfg *app.BaseConfig, srcHost, destBucket string) error { - backupBody, err := util.RequestABackup(srcHost, util.ServerBackupEndpoint) + backupBody, err := app.RequestABackup(cfg, srcHost, app.ServerBackupEndpoint) if err != nil { return fmt.Errorf("getting backup: %s", err) } @@ -49,7 +49,7 @@ func pushBackupFromTo(srcHost, destBucket string) error { // nolint: gosec rclone := exec.Command("rclone", - fmt.Sprintf("--config=%s", util.RcloneConfigFile), "rcat", destBucket) + fmt.Sprintf("--config=%s", app.RcloneConfigFile), "rcat", destBucket) gzip.Stdin = backupBody gzip.Stderr = os.Stderr diff --git a/pkg/sidecar/util/util.go b/pkg/sidecar/util/util.go deleted file mode 100644 index 9068f85d2..000000000 --- a/pkg/sidecar/util/util.go +++ /dev/null @@ -1,397 +0,0 @@ -/* -Copyright 2018 Pressinfra SRL - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "bufio" - "database/sql" - "fmt" - "io" - "net/http" - "os" - "path" - "strconv" - "strings" - - "github.com/go-ini/ini" - // add mysql driver - _ "github.com/go-sql-driver/mysql" - logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - - "github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster" - orc "github.com/presslabs/mysql-operator/pkg/orchestrator" - "github.com/presslabs/mysql-operator/pkg/util/constants" -) - -var log = logf.Log.WithName("sidecar.util") - -var ( - // MysqlPort represents port on which mysql works - MysqlPort = strconv.Itoa(constants.MysqlPort) - - // ConfigDir is the mysql configs path, /etc/mysql - ConfigDir = constants.ConfVolumeMountPath - - // ConfDPath is /etc/mysql/conf.d - ConfDPath = constants.ConfDPath - - // MountConfigDir is the mounted configs that needs processing - MountConfigDir = constants.ConfMapVolumeMountPath - - // DataDir is the mysql data. /var/lib/mysql - DataDir = constants.DataVolumeMountPath - - // ToolsDbName is the name of the tools table - ToolsDbName = constants.HelperDbName - // ToolsInitTableName is the name of the init table - ToolsInitTableName = "init" - - // UtilityUser is the name of the percona utility user. - UtilityUser = "sys_utility_sidecar" - - // OrcTopologyDir contains the path where the secret with orc credentials is - // mounted. - OrcTopologyDir = constants.OrcTopologyDir - - // ServerPort http server port - ServerPort = constants.SidecarServerPort - // ServerProbeEndpoint is the http server endpoint for probe - ServerProbeEndpoint = constants.SidecarServerProbePath - // ServerBackupEndpoint is the http server endpoint for backups - ServerBackupEndpoint = "/xbackup" -) - -const ( - // RcloneConfigFile represents the path to the file that contains rclon - // configs. This path should be the same as defined in docker entrypoint - // script from mysql-operator-sidecar/docker-entrypoint.sh. /etc/rclone.conf - RcloneConfigFile = "/etc/rclone.conf" -) - -// GetHostname returns the pod hostname from env HOSTNAME -func GetHostname() string { - return os.Getenv("HOSTNAME") -} - -// GetClusterName returns the mysql cluster name from env MY_CLUSTER_NAME -func GetClusterName() string { - return getEnvValue("MY_CLUSTER_NAME") -} - -// GetNamespace returns the namespace of the pod from env MY_NAMESPACE -func GetNamespace() string { - return getEnvValue("MY_NAMESPACE") -} - -// GetServiceName returns the headless service name from env MY_SERVICE_NAME -func GetServiceName() string { - return getEnvValue("MY_SERVICE_NAME") -} - -// NodeRole returns the node mysql role: master or slave -func NodeRole() string { - if GetMasterHost() == GetHostFor(GetServerID()) { - return "master" - } - return "slave" -} - -func getOrdinal() int { - hn := GetHostname() - // mysql-master-1 - // or - // stateful-ceva-3 - l := strings.Split(hn, "-") - for i := len(l) - 1; i >= 0; i-- { - if o, err := strconv.ParseInt(l[i], 10, 8); err == nil { - return int(o) - } - } - - return 0 -} - -// GetServerID returns the mysql node ID -func GetServerID() int { - return 100 + getOrdinal() -} - -// GetHostFor returns the host for given server id -func GetHostFor(id int) string { - base := mysqlcluster.GetNameForResource(mysqlcluster.StatefulSet, GetClusterName()) - govSVC := GetServiceName() - namespace := GetNamespace() - return fmt.Sprintf("%s-%d.%s.%s", base, id-100, govSVC, namespace) -} - -func getEnvValue(key string) string { - value := os.Getenv(key) - if len(value) == 0 { - log.Info("envirorment is not set", "key", key) - } - - return value -} - -// GetReplUser returns the replication user name from env variable -// MYSQL_REPLICATION_USER -func GetReplUser() string { - return getEnvValue("MYSQL_REPLICATION_USER") -} - -// GetReplPass returns the replication password from env variable -// MYSQL_REPLICATION_PASSWORD -func GetReplPass() string { - return getEnvValue("MYSQL_REPLICATION_PASSWORD") -} - -// GetExporterUser returns the replication user name from env variable -// MYSQL_METRICS_EXPORTER_USER -func GetExporterUser() string { - return getEnvValue("MYSQL_METRICS_EXPORTER_USER") -} - -// GetExporterPass returns the replication password from env variable -// MYSQL_METRICS_EXPORTER_PASSWORD -func GetExporterPass() string { - return getEnvValue("MYSQL_METRICS_EXPORTER_PASSWORD") -} - -// GetInitBucket returns the bucket uri from env variable -// INIT_BUCKET_URI -func GetInitBucket() string { - return getEnvValue("INIT_BUCKET_URI") -} - -// GetBackupUser returns the basic auth credentials to access backup -func GetBackupUser() string { - return getEnvValue("MYSQL_BACKUP_USER") -} - -// GetBackupPass returns the basic auth credentials to access backup -func GetBackupPass() string { - return getEnvValue("MYSQL_BACKUP_PASSWORD") -} - -// GetMasterHost returns the master host -func GetMasterHost() string { - orcURI := getOrcURI() - if len(orcURI) == 0 { - log.Info("orchestrator is not used") - return GetHostFor(100) - } - - fqClusterName := fmt.Sprintf("%s.%s", GetClusterName(), GetNamespace()) - - client := orc.NewFromURI(orcURI) - inst, err := client.Master(fqClusterName) - if err != nil { - log.V(-1).Info("failed to obtain master from orchestrator, go for default master", "master", GetHostFor(100)) - return GetHostFor(100) - } - - return inst.Key.Hostname -} - -// GetOrcUser returns the orchestrator topology user from env variable -// MYSQL_ORC_TOPOLOGY_USER -func GetOrcUser() string { - return getEnvValue("MYSQL_ORC_TOPOLOGY_USER") -} - -// GetOrcPass returns the orchestrator topology password from env variable -// MYSQL_ORC_TOPOLOGY_PASSWORD -func GetOrcPass() string { - return getEnvValue("MYSQL_ORC_TOPOLOGY_PASSWORD") -} - -// GetMySQLConnectionString returns the mysql DSN -func GetMySQLConnectionString() (string, error) { - cnfPath := path.Join(ConfigDir, "client.cnf") - cfg, err := ini.Load(cnfPath) - if err != nil { - return "", fmt.Errorf("Could not open %s: %s", cnfPath, err) - } - - client := cfg.Section("client") - host := client.Key("host").String() - user := client.Key("user").String() - password := client.Key("password").String() - port, err := client.Key("port").Int() - - if err != nil { - return "", fmt.Errorf("Invalid port in %s: %s", cnfPath, err) - } - dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?timeout=5s&multiStatements=true&interpolateParams=true", - user, password, host, port, - ) - return dsn, nil -} - -func getDbConnection() (*sql.DB, error) { - dsn, err := GetMySQLConnectionString() - if err != nil { - log.Error(err, "could not get mysql connection DSN") - return nil, err - } - - return sql.Open("mysql", dsn) -} - -// RunQuery executes a query -func RunQuery(q string, args ...interface{}) error { - db, err := getDbConnection() - if err != nil { - log.Error(err, "could not open mysql connection") - return err - } - - log.V(4).Info("running query", "query", q, "args", args) - if _, err := db.Exec(q, args...); err != nil { - return err - } - - return nil -} - -func getOrcURI() string { - return getEnvValue("ORCHESTRATOR_URI") -} - -// CopyFile the src file to dst. Any existing file will be overwritten and will not -// copy file attributes. -// nolint: gosec -func CopyFile(src, dst string) error { - in, err := os.Open(src) - if err != nil { - return err - } - defer func() { - if err1 := in.Close(); err1 != nil { - log.Error(err1, "failed to close source file", "src_file", src) - } - }() - - out, err := os.Create(dst) - if err != nil { - return err - } - defer func() { - if err1 := out.Close(); err1 != nil { - log.Error(err1, "failed to close destination file", "dest_file", dst) - } - }() - - _, err = io.Copy(out, in) - if err != nil { - return err - } - return nil -} - -// MaxClients limit an http endpoint to allow just n max concurrent connections -func MaxClients(h http.Handler, n int) http.Handler { - sema := make(chan struct{}, n) - - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - sema <- struct{}{} - defer func() { <-sema }() - - h.ServeHTTP(w, r) - }) -} - -// RequestABackup connects to specified host and endpoint and gets the backup -func RequestABackup(host, endpoint string) (io.Reader, error) { - log.Info("initialize a backup", "host", host, "endpoint", endpoint) - - req, err := http.NewRequest("GET", fmt.Sprintf("http://%s:%d%s", host, ServerPort, endpoint), nil) - if err != nil { - return nil, fmt.Errorf("fail to create request: %s", err) - } - - // set authentification user and password - req.SetBasicAuth(GetBackupUser(), GetBackupPass()) - - client := &http.Client{} - - resp, err := client.Do(req) - if err != nil || resp.StatusCode != 200 { - status := "unknown" - if resp != nil { - status = resp.Status - } - return nil, fmt.Errorf("fail to get backup: %s, code: %s", err, status) - } - - return resp.Body, nil -} - -// ReadPurgedGTID returns the GTID from xtrabackup_binlog_info file -func ReadPurgedGTID() (string, error) { - file, err := os.Open(fmt.Sprintf("%s/xtrabackup_binlog_info", DataDir)) - if err != nil { - return "", err - } - - defer func() { - if err1 := file.Close(); err1 != nil { - log.Error(err1, "failed to close file") - } - }() - - return getGTIDFrom(file) -} - -// getGTIDFrom parse the content from xtrabackup_binlog_info file passed as -// io.Reader and extracts the GTID. -func getGTIDFrom(reader io.Reader) (string, error) { - scanner := bufio.NewScanner(reader) - scanner.Split(bufio.ScanWords) - - count := 0 - gtid := "" - for scanner.Scan() { - if count == 2 { - gtid = scanner.Text() - } - count++ - } - - if err := scanner.Err(); err != nil { - return "", err - } else if len(gtid) == 0 { - return "", fmt.Errorf("failed to read GTID reached EOF") - } - - return gtid, nil -} - -// ShouldBootstrapNode checks if the mysql data is at the first initialization -func ShouldBootstrapNode() bool { - _, err := os.Open(fmt.Sprintf("%s/%s/%s.CSV", DataDir, - ToolsDbName, ToolsInitTableName)) - if os.IsNotExist(err) { - return true - } else if err != nil { - log.Error(err, "first init check failed hard") - return true - } - - // maybe check csv init data and log it - return false -} From ba730d227efc4a93b3df1676427ade627c92f5e2 Mon Sep 17 00:00:00 2001 From: amecea Date: Tue, 5 Mar 2019 17:35:18 +0200 Subject: [PATCH 2/4] Small changes to improve the code --- pkg/sidecar/app/configs.go | 80 ++++++++++++++---------------- pkg/sidecar/app/util.go | 4 +- pkg/sidecar/appclone/appclone.go | 8 +-- pkg/sidecar/appconf/appconf.go | 4 +- pkg/sidecar/apphelper/apphelper.go | 6 +-- pkg/sidecar/apphelper/server.go | 2 +- 6 files changed, 48 insertions(+), 56 deletions(-) diff --git a/pkg/sidecar/app/configs.go b/pkg/sidecar/app/configs.go index 2830354a4..ffd9c3b7e 100644 --- a/pkg/sidecar/app/configs.go +++ b/pkg/sidecar/app/configs.go @@ -43,7 +43,8 @@ const ( // BaseConfig contains information related with the pod. type BaseConfig struct { - StopCh <-chan struct{} + // Stop represents the shutdown channel + Stop <-chan struct{} // Hostname represents the pod hostname Hostname string @@ -54,81 +55,79 @@ type BaseConfig struct { // ServiceName is the name of the headless service ServiceName string - // NodeRole represents the MySQL role of the node, can be on of: msater, slave - NodeRole NodeRole // ServerID represents the MySQL server id ServerID int // InitBucketURL represents the init bucket to initialize mysql - InitBucketURL *string + InitBucketURL string // OrchestratorURL is the URL to connect to orchestrator - OrchestratorURL *string - - // MasterHost represents the cluster master hostname - MasterHost string + OrchestratorURL string // backup user and password for http endpoint BackupUser string BackupPassword string } -// GetHostFor returns the pod hostname for given MySQL server id -func (cfg *BaseConfig) GetHostFor(id int) string { +// FQDNForServer returns the pod hostname for given MySQL server id +func (cfg *BaseConfig) FQDNForServer(id int) string { base := mysqlcluster.GetNameForResource(mysqlcluster.StatefulSet, cfg.ClusterName) return fmt.Sprintf("%s-%d.%s.%s", base, id-100, cfg.ServiceName, cfg.Namespace) } -func (cfg *BaseConfig) getOrcClient() orc.Interface { - if cfg.OrchestratorURL == nil { +func (cfg *BaseConfig) newOrcClient() orc.Interface { + if len(cfg.OrchestratorURL) == 0 { + log.Info("OrchestratorURL not set") return nil } - return orc.NewFromURI(*cfg.OrchestratorURL) + return orc.NewFromURI(cfg.OrchestratorURL) } -func (cfg *BaseConfig) getFQClusterName() string { +// ClusterFQDN returns the cluster FQ Name of the cluster from which the node belongs +func (cfg *BaseConfig) ClusterFQDN() string { return fmt.Sprintf("%s.%s", cfg.ClusterName, cfg.Namespace) } -func (cfg *BaseConfig) getMasterHost() string { - if client := cfg.getOrcClient(); client != nil { - if master, err := client.Master(cfg.getFQClusterName()); err == nil { +// MasterFQDN the FQ Name of the cluster's master +func (cfg *BaseConfig) MasterFQDN() string { + if client := cfg.newOrcClient(); client != nil { + if master, err := client.Master(cfg.ClusterFQDN()); err == nil { return master.Key.Hostname } } log.V(-1).Info("failed to obtain master from orchestrator, go for default master", - "master", cfg.GetHostFor(100)) - return cfg.GetHostFor(100) + "master", cfg.FQDNForServer(100)) + return cfg.FQDNForServer(100) } +// NodeRole returns the role of the current node +func (cfg *BaseConfig) NodeRole() NodeRole { + if cfg.Hostname == cfg.MasterFQDN() { + return MasterNode + } + + return SlaveNode +} + // NewBasicConfig returns a pointer to BaseConfig configured from environment variables func NewBasicConfig(stop <-chan struct{}) *BaseConfig { cfg := &BaseConfig{ - StopCh: stop, + Stop: stop, Hostname: getEnvValue("HOSTNAME"), ClusterName: getEnvValue("MY_CLUSTER_NAME"), Namespace: getEnvValue("MY_NAMESPACE"), ServiceName: getEnvValue("MY_SERVICE_NAME"), - InitBucketURL: getEnvP("INIT_BUCKET_URI"), - OrchestratorURL: getEnvP("ORCHESTRATOR_URI"), + InitBucketURL: getEnvValue("INIT_BUCKET_URI"), + OrchestratorURL: getEnvValue("ORCHESTRATOR_URI"), BackupUser: getEnvValue("MYSQL_BACKUP_USER"), BackupPassword: getEnvValue("MYSQL_BACKUP_PASSWORD"), } - // get master host - cfg.MasterHost = cfg.getMasterHost() - - // set node role - cfg.NodeRole = SlaveNode - if cfg.Hostname == cfg.MasterHost { - cfg.NodeRole = MasterNode - } - // get server id ordinal := getOrdinalFromHostname(cfg.Hostname) cfg.ServerID = ordinal + 100 @@ -145,13 +144,6 @@ func getEnvValue(key string) string { return value } -func getEnvP(key string) *string { - if value := getEnvValue(key); len(value) != 0 { - return &value - } - return nil -} - func getOrdinalFromHostname(hn string) int { // mysql-master-1 // or @@ -171,7 +163,7 @@ type MysqlConfig struct { // inherit from base config BaseConfig - MysqlDSN *string + MysqlDSN string // replication user and password ReplicationUser string @@ -211,11 +203,11 @@ func NewMysqlConfig(cfg *BaseConfig) *MysqlConfig { } // getMySQLConnectionString returns the mysql DSN -func getMySQLConnectionString() (*string, error) { +func getMySQLConnectionString() (string, error) { cnfPath := path.Join(ConfigDir, "client.cnf") cfg, err := ini.Load(cnfPath) if err != nil { - return nil, fmt.Errorf("Could not open %s: %s", cnfPath, err) + return "", fmt.Errorf("Could not open %s: %s", cnfPath, err) } client := cfg.Section("client") @@ -223,12 +215,12 @@ func getMySQLConnectionString() (*string, error) { user := client.Key("user").String() password := client.Key("password").String() port, err := client.Key("port").Int() - if err != nil { - return nil, fmt.Errorf("Invalid port in %s: %s", cnfPath, err) + return "", fmt.Errorf("Invalid port in %s: %s", cnfPath, err) } + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?timeout=5s&multiStatements=true&interpolateParams=true", user, password, host, port, ) - return &dsn, nil + return dsn, nil } diff --git a/pkg/sidecar/app/util.go b/pkg/sidecar/app/util.go index 4d684302e..fbf0c3659 100644 --- a/pkg/sidecar/app/util.go +++ b/pkg/sidecar/app/util.go @@ -33,12 +33,12 @@ var log = logf.Log.WithName("sidecar.app") // RunQuery executes a query func RunQuery(cfg *MysqlConfig, q string, args ...interface{}) error { - if cfg.MysqlDSN == nil { + if len(cfg.MysqlDSN) == 0 { log.Info("could not get mysql connection DSN") return fmt.Errorf("no DSN specified") } - db, err := sql.Open("mysql", *cfg.MysqlDSN) + db, err := sql.Open("mysql", cfg.MysqlDSN) if err != nil { return err } diff --git a/pkg/sidecar/appclone/appclone.go b/pkg/sidecar/appclone/appclone.go index 7ab384fc8..85f2b1883 100644 --- a/pkg/sidecar/appclone/appclone.go +++ b/pkg/sidecar/appclone/appclone.go @@ -49,20 +49,20 @@ func RunCloneCommand(cfg *app.BaseConfig) error { return fmt.Errorf("removing lost+found: %s", err) } - if cfg.NodeRole == app.MasterNode { - if cfg.InitBucketURL == nil { + if cfg.NodeRole() == app.MasterNode { + if len(cfg.InitBucketURL) == 0 { log.Info("skip cloning init bucket uri is not set.") // let mysqld initialize data dir return nil } - err := cloneFromBucket(*cfg.InitBucketURL) + err := cloneFromBucket(cfg.InitBucketURL) if err != nil { return fmt.Errorf("failed to clone from bucket, err: %s", err) } } else { // clonging from prior node if cfg.ServerID > 100 { - sourceHost := cfg.GetHostFor(cfg.ServerID - 1) + sourceHost := cfg.FQDNForServer(cfg.ServerID - 1) err := cloneFromSource(cfg, sourceHost) if err != nil { return fmt.Errorf("failed to clone from %s, err: %s", sourceHost, err) diff --git a/pkg/sidecar/appconf/appconf.go b/pkg/sidecar/appconf/appconf.go index a9e01aac5..4c1d56b99 100644 --- a/pkg/sidecar/appconf/appconf.go +++ b/pkg/sidecar/appconf/appconf.go @@ -36,14 +36,14 @@ const ( // RunConfigCommand generates my.cnf, client.cnf and 10-dynamic.cnf files. func RunConfigCommand(cfg *app.BaseConfig) error { - log.Info("configuring server", "host", cfg.Hostname, "role", cfg.NodeRole) + log.Info("configuring server", "host", cfg.Hostname, "role", cfg.NodeRole()) if err := app.CopyFile(app.MountConfigDir+"/my.cnf", app.ConfigDir+"/my.cnf"); err != nil { return fmt.Errorf("copy file my.cnf: %s", err) } uPass := pkgutil.RandomString(rStrLen) - reportHost := cfg.GetHostFor(cfg.ServerID) + reportHost := cfg.FQDNForServer(cfg.ServerID) var dynCFG, utilityCFG, clientCFG *ini.File var err error diff --git a/pkg/sidecar/apphelper/apphelper.go b/pkg/sidecar/apphelper/apphelper.go index 4db7fce8d..48e44a33d 100644 --- a/pkg/sidecar/apphelper/apphelper.go +++ b/pkg/sidecar/apphelper/apphelper.go @@ -157,7 +157,7 @@ func waitForMysqlReady(cfg *app.MysqlConfig) error { func configReadOnly(cfg *app.MysqlConfig) error { var query string - if cfg.NodeRole == app.MasterNode { + if cfg.NodeRole() == app.MasterNode { query = "SET GLOBAL READ_ONLY = 0" } else { query = "SET GLOBAL SUPER_READ_ONLY = 1" @@ -169,7 +169,7 @@ func configReadOnly(cfg *app.MysqlConfig) error { } func configTopology(cfg *app.MysqlConfig) error { - if cfg.NodeRole == app.SlaveNode { + if cfg.NodeRole() == app.SlaveNode { log.Info("setting up as slave") if app.ShouldBootstrapNode() { log.Info("doing bootstrap") @@ -192,7 +192,7 @@ func configTopology(cfg *app.MysqlConfig) error { MASTER_CONNECT_RETRY=?; ` if err := app.RunQuery(cfg, query, - cfg.MasterHost, cfg.ReplicationUser, cfg.ReplicationPassword, connRetry, + cfg.MasterFQDN(), cfg.ReplicationUser, cfg.ReplicationPassword, connRetry, ); err != nil { return fmt.Errorf("failed to configure slave node, err: %s", err) } diff --git a/pkg/sidecar/apphelper/server.go b/pkg/sidecar/apphelper/server.go index 73b9a09f4..48901a6b8 100644 --- a/pkg/sidecar/apphelper/server.go +++ b/pkg/sidecar/apphelper/server.go @@ -48,7 +48,7 @@ func newServer(cfg *app.MysqlConfig) *server { // Shutdown gracefully the http server go func() { - <-cfg.StopCh // wait for stop signal + <-cfg.Stop // wait for stop signal if err := srv.Shutdown(context.Background()); err != nil { log.Error(err, "failed to stop http server") From 4c6d42b7678680891b1a540b88f2be90eed879a4 Mon Sep 17 00:00:00 2001 From: amecea Date: Wed, 6 Mar 2019 15:06:51 +0200 Subject: [PATCH 3/4] Refactor sidecar package --- cmd/mysql-operator-sidecar/main.go | 17 +-- pkg/sidecar/{appclone => }/appclone.go | 32 +++--- pkg/sidecar/{appconf => }/appconf.go | 26 ++--- pkg/sidecar/{apphelper => }/apphelper.go | 105 ++++++++++++------ .../{apptakebackup => }/apptakebackup.go | 17 +-- pkg/sidecar/{app => }/configs.go | 97 +++++++--------- pkg/sidecar/{app => }/constants.go | 30 +++-- pkg/sidecar/{apphelper => }/server.go | 28 +++-- pkg/sidecar/{app => }/util.go | 79 +++---------- 9 files changed, 192 insertions(+), 239 deletions(-) rename pkg/sidecar/{appclone => }/appclone.go (83%) rename pkg/sidecar/{appconf => }/appconf.go (77%) rename pkg/sidecar/{apphelper => }/apphelper.go (68%) rename pkg/sidecar/{apptakebackup => }/apptakebackup.go (75%) rename pkg/sidecar/{app => }/configs.go (83%) rename pkg/sidecar/{app => }/constants.go (70%) rename pkg/sidecar/{apphelper => }/server.go (81%) rename pkg/sidecar/{app => }/util.go (56%) diff --git a/cmd/mysql-operator-sidecar/main.go b/cmd/mysql-operator-sidecar/main.go index 919c11e8c..4def3e9bf 100644 --- a/cmd/mysql-operator-sidecar/main.go +++ b/cmd/mysql-operator-sidecar/main.go @@ -25,11 +25,7 @@ import ( "github.com/spf13/cobra" "sigs.k8s.io/controller-runtime/pkg/runtime/signals" - "github.com/presslabs/mysql-operator/pkg/sidecar/app" - "github.com/presslabs/mysql-operator/pkg/sidecar/appclone" - "github.com/presslabs/mysql-operator/pkg/sidecar/appconf" - "github.com/presslabs/mysql-operator/pkg/sidecar/apphelper" - "github.com/presslabs/mysql-operator/pkg/sidecar/apptakebackup" + "github.com/presslabs/mysql-operator/pkg/sidecar" ) var log = logf.Log.WithName("sidecar") @@ -61,13 +57,13 @@ func main() { logf.SetLogger(logf.ZapLogger(debug)) // init configs - cfg := app.NewBasicConfig(stopCh) + cfg := sidecar.NewConfig(stopCh) confCmd := &cobra.Command{ Use: "init-configs", Short: "Init subcommand, for init files.", Run: func(cmd *cobra.Command, args []string) { - err := appconf.RunConfigCommand(cfg) + err := sidecar.RunConfigCommand(cfg) if err != nil { log.Error(err, "init command failed") os.Exit(1) @@ -80,7 +76,7 @@ func main() { Use: "clone", Short: "Clone data from a bucket or prior node.", Run: func(cmd *cobra.Command, args []string) { - err := appclone.RunCloneCommand(cfg) + err := sidecar.RunCloneCommand(cfg) if err != nil { log.Error(err, "clone command failed") os.Exit(1) @@ -93,8 +89,7 @@ func main() { Use: "run", Short: "Configs mysql users, replication, and serve backups.", Run: func(cmd *cobra.Command, args []string) { - mysqlCFG := app.NewMysqlConfig(cfg) - err := apphelper.RunRunCommand(mysqlCFG) + err := sidecar.RunSidecarCommand(cfg) if err != nil { log.Error(err, "run command failed") os.Exit(1) @@ -113,7 +108,7 @@ func main() { return nil }, Run: func(cmd *cobra.Command, args []string) { - err := apptakebackup.RunTakeBackupCommand(cfg, args[0], args[1]) + err := sidecar.RunTakeBackupCommand(cfg, args[0], args[1]) if err != nil { log.Error(err, "take backup command failed") os.Exit(1) diff --git a/pkg/sidecar/appclone/appclone.go b/pkg/sidecar/appclone.go similarity index 83% rename from pkg/sidecar/appclone/appclone.go rename to pkg/sidecar/appclone.go index 85f2b1883..1831e9191 100644 --- a/pkg/sidecar/appclone/appclone.go +++ b/pkg/sidecar/appclone.go @@ -14,28 +14,22 @@ See the License for the specific language governing permissions and limitations under the License. */ -package appclone +package sidecar import ( "fmt" "os" "os/exec" "strings" - - logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - - "github.com/presslabs/mysql-operator/pkg/sidecar/app" ) -var log = logf.Log.WithName("sidecar.appclone") - // RunCloneCommand clone the data from source. // nolint: gocyclo -func RunCloneCommand(cfg *app.BaseConfig) error { +func RunCloneCommand(cfg *Config) error { log.Info("clonning command", "host", cfg.Hostname) // skip cloning if data exists. - if !app.ShouldBootstrapNode() { + if !shouldBootstrapNode() { log.Info("data exists and is initialized, skipping cloning.") return nil } @@ -49,7 +43,7 @@ func RunCloneCommand(cfg *app.BaseConfig) error { return fmt.Errorf("removing lost+found: %s", err) } - if cfg.NodeRole() == app.MasterNode { + if cfg.NodeRole() == MasterNode { if len(cfg.InitBucketURL) == 0 { log.Info("skip cloning init bucket uri is not set.") // let mysqld initialize data dir @@ -87,7 +81,7 @@ func cloneFromBucket(initBucket string) error { log.Info("cloning from bucket", "bucket", initBucket) - if _, err := os.Stat(app.RcloneConfigFile); os.IsNotExist(err) { + if _, err := os.Stat(rcloneConfigFile); os.IsNotExist(err) { log.Error(err, "rclone config file does not exists") return err } @@ -95,7 +89,7 @@ func cloneFromBucket(initBucket string) error { // writes to stdout the content of the bucket uri // nolint: gosec rclone := exec.Command("rclone", "-vv", - fmt.Sprintf("--config=%s", app.RcloneConfigFile), "cat", initBucket) + fmt.Sprintf("--config=%s", rcloneConfigFile), "cat", initBucket) // gzip reads from stdin decompress and then writes to stdout // nolint: gosec @@ -105,7 +99,7 @@ func cloneFromBucket(initBucket string) error { // extracts files from stdin (-x) and writes them to mysql // data target dir // nolint: gosec - xbstream := exec.Command("xbstream", "-x", "-C", app.DataDir) + xbstream := exec.Command("xbstream", "-x", "-C", dataDir) var err error // rclone | gzip | xbstream @@ -149,10 +143,10 @@ func cloneFromBucket(initBucket string) error { return nil } -func cloneFromSource(cfg *app.BaseConfig, host string) error { +func cloneFromSource(cfg *Config, host string) error { log.Info("cloning from node", "host", host) - backupBody, err := app.RequestABackup(cfg, host, app.ServerBackupEndpoint) + backupBody, err := requestABackup(cfg, host, serverBackupEndpoint) if err != nil { return fmt.Errorf("fail to get backup: %s", err) } @@ -161,7 +155,7 @@ func cloneFromSource(cfg *app.BaseConfig, host string) error { // extracts files from stdin (-x) and writes them to mysql // data target dir // nolint: gosec - xbstream := exec.Command("xbstream", "-x", "-C", app.DataDir) + xbstream := exec.Command("xbstream", "-x", "-C", dataDir) xbstream.Stdin = backupBody xbstream.Stderr = os.Stderr @@ -180,7 +174,7 @@ func cloneFromSource(cfg *app.BaseConfig, host string) error { func xtrabackupPreperData() error { // nolint: gosec xtbkCmd := exec.Command("xtrabackup", "--prepare", - fmt.Sprintf("--target-dir=%s", app.DataDir)) + fmt.Sprintf("--target-dir=%s", dataDir)) xtbkCmd.Stderr = os.Stderr @@ -189,7 +183,7 @@ func xtrabackupPreperData() error { // nolint: gosec func checkIfDataExists() bool { - path := fmt.Sprintf("%s/mysql", app.DataDir) + path := fmt.Sprintf("%s/mysql", dataDir) _, err := os.Open(path) if os.IsNotExist(err) { @@ -202,6 +196,6 @@ func checkIfDataExists() bool { } func deleteLostFound() error { - path := fmt.Sprintf("%s/lost+found", app.DataDir) + path := fmt.Sprintf("%s/lost+found", dataDir) return os.RemoveAll(path) } diff --git a/pkg/sidecar/appconf/appconf.go b/pkg/sidecar/appconf.go similarity index 77% rename from pkg/sidecar/appconf/appconf.go rename to pkg/sidecar/appconf.go index 4c1d56b99..7e704afb8 100644 --- a/pkg/sidecar/appconf/appconf.go +++ b/pkg/sidecar/appconf.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package appconf +package sidecar import ( "fmt" @@ -22,23 +22,19 @@ import ( "strconv" "github.com/go-ini/ini" - logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - "github.com/presslabs/mysql-operator/pkg/sidecar/app" pkgutil "github.com/presslabs/mysql-operator/pkg/util" ) -var log = logf.Log.WithName("sidecar.appconf") - const ( rStrLen = 18 ) // RunConfigCommand generates my.cnf, client.cnf and 10-dynamic.cnf files. -func RunConfigCommand(cfg *app.BaseConfig) error { +func RunConfigCommand(cfg *Config) error { log.Info("configuring server", "host", cfg.Hostname, "role", cfg.NodeRole()) - if err := app.CopyFile(app.MountConfigDir+"/my.cnf", app.ConfigDir+"/my.cnf"); err != nil { + if err := copyFile(mountConfigDir+"/my.cnf", configDir+"/my.cnf"); err != nil { return fmt.Errorf("copy file my.cnf: %s", err) } @@ -52,26 +48,26 @@ func RunConfigCommand(cfg *app.BaseConfig) error { return fmt.Errorf("failed to get dynamic configs: %s", err) } - if err = os.Mkdir(app.ConfDPath, os.FileMode(0755)); err != nil { + if err = os.Mkdir(confDPath, os.FileMode(0755)); err != nil { if !os.IsExist(err) { - return fmt.Errorf("error mkdir %s/conf.d: %s", app.ConfigDir, err) + return fmt.Errorf("error mkdir %s/conf.d: %s", configDir, err) } } - if err = dynCFG.SaveTo(app.ConfDPath + "/10-dynamic.cnf"); err != nil { + if err = dynCFG.SaveTo(confDPath + "/10-dynamic.cnf"); err != nil { return fmt.Errorf("failed to save configs: %s", err) } - if utilityCFG, err = getUtilityUserConfigs(app.UtilityUser, uPass); err != nil { + if utilityCFG, err = getUtilityUserConfigs(utilityUser, uPass); err != nil { return fmt.Errorf("failed to configure utility user: %s", err) } - if err = utilityCFG.SaveTo(app.ConfDPath + "/10-utility-user.cnf"); err != nil { + if err = utilityCFG.SaveTo(confDPath + "/10-utility-user.cnf"); err != nil { return fmt.Errorf("failed to configure utility user: %s", err) } - if clientCFG, err = getClientConfigs(app.UtilityUser, uPass); err != nil { + if clientCFG, err = getClientConfigs(utilityUser, uPass); err != nil { return fmt.Errorf("failed to get client configs: %s", err) } - if err = clientCFG.SaveTo(app.ConfigDir + "/client.cnf"); err != nil { + if err = clientCFG.SaveTo(configDir + "/client.cnf"); err != nil { return fmt.Errorf("failed to save configs: %s", err) } @@ -86,7 +82,7 @@ func getClientConfigs(user, pass string) (*ini.File, error) { if _, err := client.NewKey("host", "127.0.0.1"); err != nil { return nil, err } - if _, err := client.NewKey("port", app.MysqlPort); err != nil { + if _, err := client.NewKey("port", mysqlPort); err != nil { return nil, err } if _, err := client.NewKey("user", user); err != nil { diff --git a/pkg/sidecar/apphelper/apphelper.go b/pkg/sidecar/apphelper.go similarity index 68% rename from pkg/sidecar/apphelper/apphelper.go rename to pkg/sidecar/apphelper.go index 48e44a33d..d2fb4528d 100644 --- a/pkg/sidecar/apphelper/apphelper.go +++ b/pkg/sidecar/apphelper.go @@ -14,19 +14,16 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apphelper +package sidecar import ( + "bufio" "fmt" + "io" + "os" "time" - - logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - - "github.com/presslabs/mysql-operator/pkg/sidecar/app" ) -var log = logf.Log.WithName("sidecar.apphelper") - const ( // timeOut represents the number of tries to check mysql to be ready. timeOut = 60 @@ -34,9 +31,9 @@ const ( connRetry = 10 ) -// RunRunCommand is the main command, and represents the runtime helper that +// RunSidecarCommand is the main command, and represents the runtime helper that // configures the mysql server -func RunRunCommand(cfg *app.MysqlConfig) error { +func RunSidecarCommand(cfg *Config) error { log.Info("start initialization") // wait for mysql to be ready @@ -46,7 +43,7 @@ func RunRunCommand(cfg *app.MysqlConfig) error { // deactivate super read only log.Info("temporary disable SUPER_READ_ONLY") - if err := app.RunQuery(cfg, "SET GLOBAL READ_ONLY = 1; SET GLOBAL SUPER_READ_ONLY = 0;"); err != nil { + if err := runQuery(cfg, "SET GLOBAL READ_ONLY = 1; SET GLOBAL SUPER_READ_ONLY = 0;"); err != nil { return fmt.Errorf("failed to configure master node, err: %s", err) } @@ -91,7 +88,7 @@ func RunRunCommand(cfg *app.MysqlConfig) error { return srv.ListenAndServe() } -func configureOrchestratorUser(cfg *app.MysqlConfig) error { +func configureOrchestratorUser(cfg *Config) error { query := ` SET @@SESSION.SQL_LOG_BIN = 0; GRANT SUPER, PROCESS, REPLICATION SLAVE, REPLICATION CLIENT, RELOAD ON *.* TO ?@'%%' IDENTIFIED BY ?; @@ -102,9 +99,9 @@ func configureOrchestratorUser(cfg *app.MysqlConfig) error { // insert toolsDBName, it's not user input so it's safe. Can't use // placeholders for table names, see: // https://github.com/golang/go/issues/18478 - query = fmt.Sprintf(query, app.ToolsDbName) + query = fmt.Sprintf(query, toolsDbName) - if err := app.RunQuery(cfg, query, cfg.OrchestratorUser, cfg.OrchestratorPassword, + if err := runQuery(cfg, query, cfg.OrchestratorUser, cfg.OrchestratorPassword, cfg.OrchestratorUser, cfg.OrchestratorPassword); err != nil { return fmt.Errorf("failed to configure orchestrator (user/pass/access), err: %s", err) } @@ -112,40 +109,40 @@ func configureOrchestratorUser(cfg *app.MysqlConfig) error { return nil } -func configureReplicationUser(cfg *app.MysqlConfig) error { +func configureReplicationUser(cfg *Config) error { query := ` SET @@SESSION.SQL_LOG_BIN = 0; GRANT SELECT, PROCESS, RELOAD, LOCK TABLES, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO ?@'%' IDENTIFIED BY ?; ` - if err := app.RunQuery(cfg, query, cfg.ReplicationUser, cfg.ReplicationPassword); err != nil { + if err := runQuery(cfg, query, cfg.ReplicationUser, cfg.ReplicationPassword); err != nil { return fmt.Errorf("failed to configure replication user: %s", err) } return nil } -func configureExporterUser(cfg *app.MysqlConfig) error { +func configureExporterUser(cfg *Config) error { query := ` SET @@SESSION.SQL_LOG_BIN = 0; GRANT SELECT, PROCESS, REPLICATION CLIENT ON *.* TO ?@'127.0.0.1' IDENTIFIED BY ? WITH MAX_USER_CONNECTIONS 3; ` - if err := app.RunQuery(cfg, query, cfg.MetricsUser, cfg.MetricsPassword); err != nil { + if err := runQuery(cfg, query, cfg.MetricsUser, cfg.MetricsPassword); err != nil { return fmt.Errorf("failed to metrics exporter user: %s", err) } return nil } -func waitForMysqlReady(cfg *app.MysqlConfig) error { +func waitForMysqlReady(cfg *Config) error { log.V(1).Info("wait for mysql to be ready") for i := 0; i < timeOut; i++ { time.Sleep(1 * time.Second) - if err := app.RunQuery(cfg, "SELECT 1"); err == nil { + if err := runQuery(cfg, "SELECT 1"); err == nil { break } } - if err := app.RunQuery(cfg, "SELECT 1"); err != nil { + if err := runQuery(cfg, "SELECT 1"); err != nil { log.V(1).Info("mysql is not ready", "error", err) return err } @@ -155,27 +152,27 @@ func waitForMysqlReady(cfg *app.MysqlConfig) error { } -func configReadOnly(cfg *app.MysqlConfig) error { +func configReadOnly(cfg *Config) error { var query string - if cfg.NodeRole() == app.MasterNode { + if cfg.NodeRole() == MasterNode { query = "SET GLOBAL READ_ONLY = 0" } else { query = "SET GLOBAL SUPER_READ_ONLY = 1" } - if err := app.RunQuery(cfg, query); err != nil { + if err := runQuery(cfg, query); err != nil { return fmt.Errorf("failed to set read_only config, err: %s", err) } return nil } -func configTopology(cfg *app.MysqlConfig) error { - if cfg.NodeRole() == app.SlaveNode { +func configTopology(cfg *Config) error { + if cfg.NodeRole() == SlaveNode { log.Info("setting up as slave") - if app.ShouldBootstrapNode() { + if shouldBootstrapNode() { log.Info("doing bootstrap") - if gtid, err := app.ReadPurgedGTID(); err == nil { + if gtid, err := readPurgedGTID(); err == nil { log.Info("RESET MASTER and setting GTID_PURGED", "gtid", gtid) - if errQ := app.RunQuery(cfg, "RESET MASTER; SET GLOBAL GTID_PURGED=?", gtid); errQ != nil { + if errQ := runQuery(cfg, "RESET MASTER; SET GLOBAL GTID_PURGED=?", gtid); errQ != nil { return errQ } } else { @@ -191,14 +188,14 @@ func configTopology(cfg *app.MysqlConfig) error { MASTER_PASSWORD=?, MASTER_CONNECT_RETRY=?; ` - if err := app.RunQuery(cfg, query, + if err := runQuery(cfg, query, cfg.MasterFQDN(), cfg.ReplicationUser, cfg.ReplicationPassword, connRetry, ); err != nil { return fmt.Errorf("failed to configure slave node, err: %s", err) } query = "START SLAVE;" - if err := app.RunQuery(cfg, query); err != nil { + if err := runQuery(cfg, query); err != nil { log.Info("failed to start slave in the simple mode, trying a second method") // TODO: https://bugs.mysql.com/bug.php?id=83713 query2 := ` @@ -208,7 +205,7 @@ func configTopology(cfg *app.MysqlConfig) error { reset slave; start slave; ` - if err := app.RunQuery(cfg, query2); err != nil { + if err := runQuery(cfg, query2); err != nil { return fmt.Errorf("failed to start slave node, err: %s", err) } } @@ -217,7 +214,7 @@ func configTopology(cfg *app.MysqlConfig) error { return nil } -func markConfigurationDone(cfg *app.MysqlConfig) error { +func markConfigurationDone(cfg *Config) error { query := ` SET @@SESSION.SQL_LOG_BIN = 0; BEGIN; @@ -234,11 +231,51 @@ func markConfigurationDone(cfg *app.MysqlConfig) error { // insert tables and databases names. It's safe because is not user input. // see: https://github.com/golang/go/issues/18478 - query = fmt.Sprintf(query, app.ToolsDbName, app.ToolsInitTableName) + query = fmt.Sprintf(query, toolsDbName, toolsInitTableName) - if err := app.RunQuery(cfg, query, cfg.Hostname); err != nil { + if err := runQuery(cfg, query, cfg.Hostname); err != nil { return fmt.Errorf("failed to mark configuration done, err: %s", err) } return nil } + +// readPurgedGTID returns the GTID from xtrabackup_binlog_info file +func readPurgedGTID() (string, error) { + file, err := os.Open(fmt.Sprintf("%s/xtrabackup_binlog_info", dataDir)) + if err != nil { + return "", err + } + + defer func() { + if err1 := file.Close(); err1 != nil { + log.Error(err1, "failed to close file") + } + }() + + return getGTIDFrom(file) +} + +// getGTIDFrom parse the content from xtrabackup_binlog_info file passed as +// io.Reader and extracts the GTID. +func getGTIDFrom(reader io.Reader) (string, error) { + scanner := bufio.NewScanner(reader) + scanner.Split(bufio.ScanWords) + + count := 0 + gtid := "" + for scanner.Scan() { + if count == 2 { + gtid = scanner.Text() + } + count++ + } + + if err := scanner.Err(); err != nil { + return "", err + } else if len(gtid) == 0 { + return "", fmt.Errorf("failed to read GTID reached EOF") + } + + return gtid, nil +} diff --git a/pkg/sidecar/apptakebackup/apptakebackup.go b/pkg/sidecar/apptakebackup.go similarity index 75% rename from pkg/sidecar/apptakebackup/apptakebackup.go rename to pkg/sidecar/apptakebackup.go index e23a01b56..79dd72b9c 100644 --- a/pkg/sidecar/apptakebackup/apptakebackup.go +++ b/pkg/sidecar/apptakebackup.go @@ -14,32 +14,25 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apptakebackup +package sidecar import ( "fmt" "os" "os/exec" "strings" - - logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - - "github.com/presslabs/mysql-operator/pkg/sidecar/app" ) -var log = logf.Log.WithName("sidecar.apptakebackup") - // RunTakeBackupCommand starts a backup command -// nolint: unparam -func RunTakeBackupCommand(cfg *app.BaseConfig, srcHost, destBucket string) error { +func RunTakeBackupCommand(cfg *Config, srcHost, destBucket string) error { log.Info("take a backup", "host", srcHost, "bucket", destBucket) destBucket = normalizeBucketURI(destBucket) return pushBackupFromTo(cfg, srcHost, destBucket) } -func pushBackupFromTo(cfg *app.BaseConfig, srcHost, destBucket string) error { +func pushBackupFromTo(cfg *Config, srcHost, destBucket string) error { - backupBody, err := app.RequestABackup(cfg, srcHost, app.ServerBackupEndpoint) + backupBody, err := requestABackup(cfg, srcHost, serverBackupEndpoint) if err != nil { return fmt.Errorf("getting backup: %s", err) } @@ -49,7 +42,7 @@ func pushBackupFromTo(cfg *app.BaseConfig, srcHost, destBucket string) error { // nolint: gosec rclone := exec.Command("rclone", - fmt.Sprintf("--config=%s", app.RcloneConfigFile), "rcat", destBucket) + fmt.Sprintf("--config=%s", rcloneConfigFile), "rcat", destBucket) gzip.Stdin = backupBody gzip.Stderr = os.Stderr diff --git a/pkg/sidecar/app/configs.go b/pkg/sidecar/configs.go similarity index 83% rename from pkg/sidecar/app/configs.go rename to pkg/sidecar/configs.go index ffd9c3b7e..737a85fb6 100644 --- a/pkg/sidecar/app/configs.go +++ b/pkg/sidecar/configs.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package app +package sidecar import ( "fmt" @@ -41,8 +41,8 @@ const ( SlaveNode NodeRole = "slave" ) -// BaseConfig contains information related with the pod. -type BaseConfig struct { +// Config contains information related with the pod. +type Config struct { // Stop represents the shutdown channel Stop <-chan struct{} @@ -67,15 +67,30 @@ type BaseConfig struct { // backup user and password for http endpoint BackupUser string BackupPassword string + + // MysqlDSN represents the connection string to connect to MySQL + MysqlDSN string + + // replication user and password + ReplicationUser string + ReplicationPassword string + + // metrcis exporter user and password + MetricsUser string + MetricsPassword string + + // orchestrator credentials + OrchestratorUser string + OrchestratorPassword string } // FQDNForServer returns the pod hostname for given MySQL server id -func (cfg *BaseConfig) FQDNForServer(id int) string { +func (cfg *Config) FQDNForServer(id int) string { base := mysqlcluster.GetNameForResource(mysqlcluster.StatefulSet, cfg.ClusterName) return fmt.Sprintf("%s-%d.%s.%s", base, id-100, cfg.ServiceName, cfg.Namespace) } -func (cfg *BaseConfig) newOrcClient() orc.Interface { +func (cfg *Config) newOrcClient() orc.Interface { if len(cfg.OrchestratorURL) == 0 { log.Info("OrchestratorURL not set") return nil @@ -85,12 +100,12 @@ func (cfg *BaseConfig) newOrcClient() orc.Interface { } // ClusterFQDN returns the cluster FQ Name of the cluster from which the node belongs -func (cfg *BaseConfig) ClusterFQDN() string { +func (cfg *Config) ClusterFQDN() string { return fmt.Sprintf("%s.%s", cfg.ClusterName, cfg.Namespace) } // MasterFQDN the FQ Name of the cluster's master -func (cfg *BaseConfig) MasterFQDN() string { +func (cfg *Config) MasterFQDN() string { if client := cfg.newOrcClient(); client != nil { if master, err := client.Master(cfg.ClusterFQDN()); err == nil { return master.Key.Hostname @@ -104,7 +119,7 @@ func (cfg *BaseConfig) MasterFQDN() string { } // NodeRole returns the role of the current node -func (cfg *BaseConfig) NodeRole() NodeRole { +func (cfg *Config) NodeRole() NodeRole { if cfg.Hostname == cfg.MasterFQDN() { return MasterNode } @@ -112,9 +127,9 @@ func (cfg *BaseConfig) NodeRole() NodeRole { return SlaveNode } -// NewBasicConfig returns a pointer to BaseConfig configured from environment variables -func NewBasicConfig(stop <-chan struct{}) *BaseConfig { - cfg := &BaseConfig{ +// NewConfig returns a pointer to Config configured from environment variables +func NewConfig(stop <-chan struct{}) *Config { + cfg := &Config{ Stop: stop, Hostname: getEnvValue("HOSTNAME"), ClusterName: getEnvValue("MY_CLUSTER_NAME"), @@ -126,12 +141,26 @@ func NewBasicConfig(stop <-chan struct{}) *BaseConfig { BackupUser: getEnvValue("MYSQL_BACKUP_USER"), BackupPassword: getEnvValue("MYSQL_BACKUP_PASSWORD"), + + ReplicationUser: getEnvValue("MYSQL_REPLICATION_USER"), + ReplicationPassword: getEnvValue("MYSQL_REPLICATION_PASSWORD"), + + MetricsUser: getEnvValue("MYSQL_METRICS_EXPORTER_USER"), + MetricsPassword: getEnvValue("MYSQL_METRICS_EXPORTER_PASSWORD"), + + OrchestratorUser: getEnvValue("MYSQL_ORC_TOPOLOGY_USER"), + OrchestratorPassword: getEnvValue("MYSQL_ORC_TOPOLOGY_PASSWORD"), } // get server id ordinal := getOrdinalFromHostname(cfg.Hostname) cfg.ServerID = ordinal + 100 + var err error + if cfg.MysqlDSN, err = getMySQLConnectionString(); err != nil { + log.Error(err, "get mysql dsn") + } + return cfg } @@ -158,53 +187,9 @@ func getOrdinalFromHostname(hn string) int { return 0 } -// MysqlConfig contains extra information to connect or configure MySQL server -type MysqlConfig struct { - // inherit from base config - BaseConfig - - MysqlDSN string - - // replication user and password - ReplicationUser string - ReplicationPassword string - - // metrcis exporter user and password - MetricsUser string - MetricsPassword string - - // orchestrator credentials - OrchestratorUser string - OrchestratorPassword string -} - -// NewMysqlConfig returns a pointer to MysqlConfig -func NewMysqlConfig(cfg *BaseConfig) *MysqlConfig { - mycfg := &MysqlConfig{ - BaseConfig: *cfg, - - ReplicationUser: getEnvValue("MYSQL_REPLICATION_USER"), - ReplicationPassword: getEnvValue("MYSQL_REPLICATION_PASSWORD"), - - MetricsUser: getEnvValue("MYSQL_METRICS_EXPORTER_USER"), - MetricsPassword: getEnvValue("MYSQL_METRICS_EXPORTER_PASSWORD"), - - OrchestratorUser: getEnvValue("MYSQL_ORC_TOPOLOGY_USER"), - OrchestratorPassword: getEnvValue("MYSQL_ORC_TOPOLOGY_PASSWORD"), - } - - // set connection DSN to MySQL - var err error - if mycfg.MysqlDSN, err = getMySQLConnectionString(); err != nil { - log.Error(err, "get MySQL DSN") - } - - return mycfg -} - // getMySQLConnectionString returns the mysql DSN func getMySQLConnectionString() (string, error) { - cnfPath := path.Join(ConfigDir, "client.cnf") + cnfPath := path.Join(configDir, "client.cnf") cfg, err := ini.Load(cnfPath) if err != nil { return "", fmt.Errorf("Could not open %s: %s", cnfPath, err) diff --git a/pkg/sidecar/app/constants.go b/pkg/sidecar/constants.go similarity index 70% rename from pkg/sidecar/app/constants.go rename to pkg/sidecar/constants.go index 0d94b94a0..442d9d87c 100644 --- a/pkg/sidecar/app/constants.go +++ b/pkg/sidecar/constants.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package app +package sidecar import ( "strconv" @@ -27,43 +27,39 @@ import ( var ( // MysqlPort represents port on which mysql works - MysqlPort = strconv.Itoa(constants.MysqlPort) + mysqlPort = strconv.Itoa(constants.MysqlPort) // ConfigDir is the mysql configs path, /etc/mysql - ConfigDir = constants.ConfVolumeMountPath + configDir = constants.ConfVolumeMountPath // ConfDPath is /etc/mysql/conf.d - ConfDPath = constants.ConfDPath + confDPath = constants.ConfDPath // MountConfigDir is the mounted configs that needs processing - MountConfigDir = constants.ConfMapVolumeMountPath + mountConfigDir = constants.ConfMapVolumeMountPath // DataDir is the mysql data. /var/lib/mysql - DataDir = constants.DataVolumeMountPath + dataDir = constants.DataVolumeMountPath // ToolsDbName is the name of the tools table - ToolsDbName = constants.HelperDbName + toolsDbName = constants.HelperDbName // ToolsInitTableName is the name of the init table - ToolsInitTableName = "init" + toolsInitTableName = "init" // UtilityUser is the name of the percona utility user. - UtilityUser = "sys_utility_sidecar" - - // OrcTopologyDir contains the path where the secret with orc credentials is - // mounted. - OrcTopologyDir = constants.OrcTopologyDir + utilityUser = "sys_utility_sidecar" // ServerPort http server port - ServerPort = constants.SidecarServerPort + serverPort = constants.SidecarServerPort // ServerProbeEndpoint is the http server endpoint for probe - ServerProbeEndpoint = constants.SidecarServerProbePath + serverProbeEndpoint = constants.SidecarServerProbePath // ServerBackupEndpoint is the http server endpoint for backups - ServerBackupEndpoint = "/xbackup" + serverBackupEndpoint = "/xbackup" ) const ( // RcloneConfigFile represents the path to the file that contains rclon // configs. This path should be the same as defined in docker entrypoint // script from mysql-operator-sidecar/docker-entrypoint.sh. /etc/rclone.conf - RcloneConfigFile = "/etc/rclone.conf" + rcloneConfigFile = "/etc/rclone.conf" ) diff --git a/pkg/sidecar/apphelper/server.go b/pkg/sidecar/server.go similarity index 81% rename from pkg/sidecar/apphelper/server.go rename to pkg/sidecar/server.go index 48901a6b8..91e319d48 100644 --- a/pkg/sidecar/apphelper/server.go +++ b/pkg/sidecar/server.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apphelper +package sidecar import ( "context" @@ -23,28 +23,26 @@ import ( "net/http" "os" "os/exec" - - "github.com/presslabs/mysql-operator/pkg/sidecar/app" ) type server struct { - cfg *app.MysqlConfig + cfg *Config http.Server } -func newServer(cfg *app.MysqlConfig) *server { +func newServer(cfg *Config) *server { mux := http.NewServeMux() srv := &server{ cfg: cfg, Server: http.Server{ - Addr: fmt.Sprintf(":%d", app.ServerPort), + Addr: fmt.Sprintf(":%d", serverPort), Handler: mux, }, } // Add handle functions - mux.HandleFunc(app.ServerProbeEndpoint, srv.healthHandler) - mux.Handle(app.ServerBackupEndpoint, app.MaxClients(http.HandlerFunc(srv.backupHandler), 1)) + mux.HandleFunc(serverProbeEndpoint, srv.healthHandler) + mux.Handle(serverBackupEndpoint, maxClients(http.HandlerFunc(srv.backupHandler), 1)) // Shutdown gracefully the http server go func() { @@ -84,7 +82,7 @@ func (s *server) backupHandler(w http.ResponseWriter, r *http.Request) { // nolint: gosec xtrabackup := exec.Command("xtrabackup", "--backup", "--slave-info", "--stream=xbstream", - fmt.Sprintf("--tables-exclude=%s.%s", app.ToolsDbName, app.ToolsInitTableName), + fmt.Sprintf("--tables-exclude=%s.%s", toolsDbName, toolsInitTableName), "--host=127.0.0.1", fmt.Sprintf("--user=%s", s.cfg.ReplicationUser), fmt.Sprintf("--password=%s", s.cfg.ReplicationPassword)) @@ -127,3 +125,15 @@ func (s *server) isAuthenticated(r *http.Request) bool { user, pass, ok := r.BasicAuth() return ok && user == s.cfg.BackupUser && pass == s.cfg.BackupPassword } + +// maxClients limit an http endpoint to allow just n max concurrent connections +func maxClients(h http.Handler, n int) http.Handler { + sema := make(chan struct{}, n) + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sema <- struct{}{} + defer func() { <-sema }() + + h.ServeHTTP(w, r) + }) +} diff --git a/pkg/sidecar/app/util.go b/pkg/sidecar/util.go similarity index 56% rename from pkg/sidecar/app/util.go rename to pkg/sidecar/util.go index fbf0c3659..7c32f1bcf 100644 --- a/pkg/sidecar/app/util.go +++ b/pkg/sidecar/util.go @@ -14,10 +14,9 @@ See the License for the specific language governing permissions and limitations under the License. */ -package app +package sidecar import ( - "bufio" "database/sql" "fmt" "io" @@ -29,10 +28,10 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" ) -var log = logf.Log.WithName("sidecar.app") +var log = logf.Log.WithName("sidecar") -// RunQuery executes a query -func RunQuery(cfg *MysqlConfig, q string, args ...interface{}) error { +// runQuery executes a query +func runQuery(cfg *Config, q string, args ...interface{}) error { if len(cfg.MysqlDSN) == 0 { log.Info("could not get mysql connection DSN") return fmt.Errorf("no DSN specified") @@ -51,10 +50,10 @@ func RunQuery(cfg *MysqlConfig, q string, args ...interface{}) error { return nil } -// CopyFile the src file to dst. Any existing file will be overwritten and will not +// copyFile the src file to dst. Any existing file will be overwritten and will not // copy file attributes. // nolint: gosec -func CopyFile(src, dst string) error { +func copyFile(src, dst string) error { in, err := os.Open(src) if err != nil { return err @@ -82,23 +81,11 @@ func CopyFile(src, dst string) error { return nil } -// MaxClients limit an http endpoint to allow just n max concurrent connections -func MaxClients(h http.Handler, n int) http.Handler { - sema := make(chan struct{}, n) - - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - sema <- struct{}{} - defer func() { <-sema }() - - h.ServeHTTP(w, r) - }) -} - -// RequestABackup connects to specified host and endpoint and gets the backup -func RequestABackup(cfg *BaseConfig, host, endpoint string) (io.Reader, error) { +// requestABackup connects to specified host and endpoint and gets the backup +func requestABackup(cfg *Config, host, endpoint string) (io.Reader, error) { log.Info("initialize a backup", "host", host, "endpoint", endpoint) - req, err := http.NewRequest("GET", fmt.Sprintf("http://%s:%d%s", host, ServerPort, endpoint), nil) + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s:%d%s", host, serverPort, endpoint), nil) if err != nil { return nil, fmt.Errorf("fail to create request: %s", err) } @@ -120,50 +107,10 @@ func RequestABackup(cfg *BaseConfig, host, endpoint string) (io.Reader, error) { return resp.Body, nil } -// ReadPurgedGTID returns the GTID from xtrabackup_binlog_info file -func ReadPurgedGTID() (string, error) { - file, err := os.Open(fmt.Sprintf("%s/xtrabackup_binlog_info", DataDir)) - if err != nil { - return "", err - } - - defer func() { - if err1 := file.Close(); err1 != nil { - log.Error(err1, "failed to close file") - } - }() - - return getGTIDFrom(file) -} - -// getGTIDFrom parse the content from xtrabackup_binlog_info file passed as -// io.Reader and extracts the GTID. -func getGTIDFrom(reader io.Reader) (string, error) { - scanner := bufio.NewScanner(reader) - scanner.Split(bufio.ScanWords) - - count := 0 - gtid := "" - for scanner.Scan() { - if count == 2 { - gtid = scanner.Text() - } - count++ - } - - if err := scanner.Err(); err != nil { - return "", err - } else if len(gtid) == 0 { - return "", fmt.Errorf("failed to read GTID reached EOF") - } - - return gtid, nil -} - -// ShouldBootstrapNode checks if the mysql data is at the first initialization -func ShouldBootstrapNode() bool { - _, err := os.Open(fmt.Sprintf("%s/%s/%s.CSV", DataDir, - ToolsDbName, ToolsInitTableName)) +// shouldBootstrapNode checks if the mysql data is at the first initialization +func shouldBootstrapNode() bool { + _, err := os.Open(fmt.Sprintf("%s/%s/%s.CSV", dataDir, + toolsDbName, toolsInitTableName)) if os.IsNotExist(err) { return true } else if err != nil { From 0e47237ee58b932960b2906027b82e4943d125ec Mon Sep 17 00:00:00 2001 From: amecea Date: Mon, 11 Mar 2019 13:34:40 +0200 Subject: [PATCH 4/4] Small changes of code --- cmd/mysql-operator-sidecar/main.go | 6 +++--- pkg/sidecar/appconf.go | 33 +++++++++++++++++------------- pkg/sidecar/apphelper.go | 12 +++++------ pkg/sidecar/configs.go | 6 +----- pkg/sidecar/server.go | 4 ++-- 5 files changed, 30 insertions(+), 31 deletions(-) diff --git a/cmd/mysql-operator-sidecar/main.go b/cmd/mysql-operator-sidecar/main.go index 4def3e9bf..79b639fa0 100644 --- a/cmd/mysql-operator-sidecar/main.go +++ b/cmd/mysql-operator-sidecar/main.go @@ -31,7 +31,7 @@ import ( var log = logf.Log.WithName("sidecar") func main() { - stopCh := signals.SetupSignalHandler() + stop := signals.SetupSignalHandler() cmd := &cobra.Command{ Use: "mysql-operator-sidecar", @@ -57,7 +57,7 @@ func main() { logf.SetLogger(logf.ZapLogger(debug)) // init configs - cfg := sidecar.NewConfig(stopCh) + cfg := sidecar.NewConfig() confCmd := &cobra.Command{ Use: "init-configs", @@ -89,7 +89,7 @@ func main() { Use: "run", Short: "Configs mysql users, replication, and serve backups.", Run: func(cmd *cobra.Command, args []string) { - err := sidecar.RunSidecarCommand(cfg) + err := sidecar.RunSidecarCommand(cfg, stop) if err != nil { log.Error(err, "run command failed") os.Exit(1) diff --git a/pkg/sidecar/appconf.go b/pkg/sidecar/appconf.go index 7e704afb8..9216b3d88 100644 --- a/pkg/sidecar/appconf.go +++ b/pkg/sidecar/appconf.go @@ -19,6 +19,7 @@ package sidecar import ( "fmt" "os" + "path" "strconv" "github.com/go-ini/ini" @@ -33,41 +34,45 @@ const ( // RunConfigCommand generates my.cnf, client.cnf and 10-dynamic.cnf files. func RunConfigCommand(cfg *Config) error { log.Info("configuring server", "host", cfg.Hostname, "role", cfg.NodeRole()) + var err error - if err := copyFile(mountConfigDir+"/my.cnf", configDir+"/my.cnf"); err != nil { + if err = copyFile(mountConfigDir+"/my.cnf", configDir+"/my.cnf"); err != nil { return fmt.Errorf("copy file my.cnf: %s", err) } + if err = os.Mkdir(confDPath, os.FileMode(0755)); err != nil { + if !os.IsExist(err) { + return fmt.Errorf("error mkdir %s/conf.d: %s", configDir, err) + } + } + uPass := pkgutil.RandomString(rStrLen) reportHost := cfg.FQDNForServer(cfg.ServerID) - var dynCFG, utilityCFG, clientCFG *ini.File - var err error + var identityCFG, utilityCFG, clientCFG *ini.File - if dynCFG, err = getDynamicConfigs(cfg.ServerID, reportHost); err != nil { + // mysql server identity configs + if identityCFG, err = getIdentityConfigs(cfg.ServerID, reportHost); err != nil { return fmt.Errorf("failed to get dynamic configs: %s", err) } - - if err = os.Mkdir(confDPath, os.FileMode(0755)); err != nil { - if !os.IsExist(err) { - return fmt.Errorf("error mkdir %s/conf.d: %s", configDir, err) - } - } - if err = dynCFG.SaveTo(confDPath + "/10-dynamic.cnf"); err != nil { + if err = identityCFG.SaveTo(path.Join(confDPath, "10-identity.cnf")); err != nil { return fmt.Errorf("failed to save configs: %s", err) } + + // mysql server utility user configs if utilityCFG, err = getUtilityUserConfigs(utilityUser, uPass); err != nil { return fmt.Errorf("failed to configure utility user: %s", err) } - if err = utilityCFG.SaveTo(confDPath + "/10-utility-user.cnf"); err != nil { + if err = utilityCFG.SaveTo(path.Join(confDPath, "10-utility-user.cnf")); err != nil { return fmt.Errorf("failed to configure utility user: %s", err) } + // mysql client connect credentials if clientCFG, err = getClientConfigs(utilityUser, uPass); err != nil { return fmt.Errorf("failed to get client configs: %s", err) } - if err = clientCFG.SaveTo(configDir + "/client.cnf"); err != nil { + if err = clientCFG.SaveTo(path.Join(configDir, "client.cnf")); err != nil { return fmt.Errorf("failed to save configs: %s", err) } @@ -95,7 +100,7 @@ func getClientConfigs(user, pass string) (*ini.File, error) { return cfg, nil } -func getDynamicConfigs(id int, reportHost string) (*ini.File, error) { +func getIdentityConfigs(id int, reportHost string) (*ini.File, error) { cfg := ini.Empty() mysqld := cfg.Section("mysqld") diff --git a/pkg/sidecar/apphelper.go b/pkg/sidecar/apphelper.go index d2fb4528d..028fd0724 100644 --- a/pkg/sidecar/apphelper.go +++ b/pkg/sidecar/apphelper.go @@ -33,7 +33,7 @@ const ( // RunSidecarCommand is the main command, and represents the runtime helper that // configures the mysql server -func RunSidecarCommand(cfg *Config) error { +func RunSidecarCommand(cfg *Config, stop <-chan struct{}) error { log.Info("start initialization") // wait for mysql to be ready @@ -84,7 +84,7 @@ func RunSidecarCommand(cfg *Config) error { } log.V(1).Info("start http server") - srv := newServer(cfg) + srv := newServer(cfg, stop) return srv.ListenAndServe() } @@ -262,19 +262,17 @@ func getGTIDFrom(reader io.Reader) (string, error) { scanner := bufio.NewScanner(reader) scanner.Split(bufio.ScanWords) - count := 0 gtid := "" - for scanner.Scan() { - if count == 2 { + for i := 0; scanner.Scan(); i++ { + if i == 2 { gtid = scanner.Text() } - count++ } if err := scanner.Err(); err != nil { return "", err } else if len(gtid) == 0 { - return "", fmt.Errorf("failed to read GTID reached EOF") + return "", io.EOF } return gtid, nil diff --git a/pkg/sidecar/configs.go b/pkg/sidecar/configs.go index 737a85fb6..c3db1b938 100644 --- a/pkg/sidecar/configs.go +++ b/pkg/sidecar/configs.go @@ -43,9 +43,6 @@ const ( // Config contains information related with the pod. type Config struct { - // Stop represents the shutdown channel - Stop <-chan struct{} - // Hostname represents the pod hostname Hostname string // ClusterName is the MySQL cluster name @@ -128,9 +125,8 @@ func (cfg *Config) NodeRole() NodeRole { } // NewConfig returns a pointer to Config configured from environment variables -func NewConfig(stop <-chan struct{}) *Config { +func NewConfig() *Config { cfg := &Config{ - Stop: stop, Hostname: getEnvValue("HOSTNAME"), ClusterName: getEnvValue("MY_CLUSTER_NAME"), Namespace: getEnvValue("MY_NAMESPACE"), diff --git a/pkg/sidecar/server.go b/pkg/sidecar/server.go index 91e319d48..4e7bad43d 100644 --- a/pkg/sidecar/server.go +++ b/pkg/sidecar/server.go @@ -30,7 +30,7 @@ type server struct { http.Server } -func newServer(cfg *Config) *server { +func newServer(cfg *Config, stop <-chan struct{}) *server { mux := http.NewServeMux() srv := &server{ cfg: cfg, @@ -46,7 +46,7 @@ func newServer(cfg *Config) *server { // Shutdown gracefully the http server go func() { - <-cfg.Stop // wait for stop signal + <-stop // wait for stop signal if err := srv.Shutdown(context.Background()); err != nil { log.Error(err, "failed to stop http server")