diff --git a/hack/dev-values.yaml b/hack/dev-values.yaml deleted file mode 100644 index 11422f1f8..000000000 --- a/hack/dev-values.yaml +++ /dev/null @@ -1,9 +0,0 @@ -extraArgs: - - --debug - -installCRDs: false - -orchestrator: - topologyPassword: password1 - antiAffinity: soft - replicas: 1 diff --git a/hack/development/stress_test.py b/hack/development/stress_test.py new file mode 100755 index 000000000..419cd61fa --- /dev/null +++ b/hack/development/stress_test.py @@ -0,0 +1,118 @@ +#!/usr/bin/python3 + +# Description: +# A simple python script that generates lots of queries to stress the MySQL server. +# +# Notes: +# pip install mysql-connector-python +# +# Examples: +# ./stress_test.py mysql://root:not-so-secure@127.0.0.1:33066/test --times=100 + +import time +import sys +import argparse +from urllib.parse import urlparse + +import mysql.connector + + +class Base: + def __init__(self, db, args): + self.db = db + self.args = args + + def __call__(self): + self.init_sql() + + rows = self.args.rows + total_time = time.time() + for i in range(int(self.args.times)): + start_time = time.time() + self.test(i) + print ('{}: round: {} rows: {} to {} time: {}'.format(self.__class__.__name__, + i, i*rows, (i+1)*rows, time.time()-start_time)) + + def init_sql(self): + pass + + def test(self, epoch): + pass + + +class InsertStress(Base): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.table_name = 'test' + + def init_sql(self): + cursor = self.db.cursor() + cursor.execute('CREATE TABLE IF NOT EXISTS {} ' + '(f1 VARCHAR(30), f2 VARCHAR(30), f3 VARCHAR(30))'.format(self.table_name)) + self.db.commit() + + def test(self, epoch): + sql = "INSERT INTO {} (f1, f2, f3) VALUES (%s, %s, %s)".format(self.table_name) + val = [] + for i in range(self.args.rows): + val.append(("l_"+str(i), "f_"+str(i), "a_"+str(i))) + + + mycursor = self.db.cursor() + mycursor.executemany(sql, val) + self.db.commit() + + +def mysql_connection(dsn): + dsn = urlparse(dsn) + db_name = dsn.path.strip('/') + if len(db_name)==0: + raise RuntimeError('Database name not corectly specifyied: {}'.format(db_name)) + + try: + db = mysql.connector.connect( + host=dsn.hostname, + port=dsn.port, + user=dsn.username, + passwd=dsn.password, + database=db_name, + ) + except mysql.connector.errors.ProgrammingError as e: + if 'Unknown database' in e.msg: + db = mysql.connector.connect( + host=dsn.hostname, + port=dsn.port, + user=dsn.username, + passwd=dsn.password, + ) + cursor = db.cursor() + cursor.execute('CREATE DATABASE IF NOT EXISTS {}'.format(db_name)) + cursor.execute('USE {}'.format(db_name)) + db.commit() + else: + raise + + return db + + +def parse_args(): + parser = argparse.ArgumentParser(description='Stress test for mysql.') + parser.add_argument('DSN', type=str, + help='Data source name that contains user and password.') + parser.add_argument('--rows', type=int, default=100000, help='The number of rows to execute') + parser.add_argument('--times', type=int, default=100, + help='The number of times to execute the queries') + + return parser.parse_args() + + +def main(): + args = parse_args() + db = mysql_connection(args.DSN) + + # run insert stress tests + InsertStress(db, args)() + + +if __name__ == "__main__": + main() diff --git a/pkg/controller/mysqlcluster/internal/syncer/statefullset.go b/pkg/controller/mysqlcluster/internal/syncer/statefullset.go index 1e2549f3f..7e9bccfa0 100644 --- a/pkg/controller/mysqlcluster/internal/syncer/statefullset.go +++ b/pkg/controller/mysqlcluster/internal/syncer/statefullset.go @@ -407,6 +407,9 @@ func (s *sfsSyncer) ensureContainersSpec() []core.Container { "--database", HelperDbName, "--table", "heartbeat", "--defaults-file", fmt.Sprintf("%s/client.cnf", ConfVolumeMountPath), + // it's important to exit when exceeding more than 20 failed attempts otherwise + // pt-heartbeat will run forever using old connection. + "--fail-successive-errors=20", }, ) heartbeat.Resources = ensureResources(containerHeartBeatName) diff --git a/pkg/sidecar/appclone.go b/pkg/sidecar/appclone.go index 3921f325c..9aa2cd9c8 100644 --- a/pkg/sidecar/appclone.go +++ b/pkg/sidecar/appclone.go @@ -153,7 +153,7 @@ func cloneFromBucket(initBucket string) error { func cloneFromSource(cfg *Config, host string) error { log.Info("cloning from node", "host", host) - backupBody, err := requestABackup(cfg, host, serverBackupEndpoint) + response, err := requestABackup(cfg, host, serverBackupEndpoint) if err != nil { return fmt.Errorf("fail to get backup: %s", err) } @@ -164,7 +164,7 @@ func cloneFromSource(cfg *Config, host string) error { // nolint: gosec xbstream := exec.Command("xbstream", "-x", "-C", dataDir) - xbstream.Stdin = backupBody + xbstream.Stdin = response.Body xbstream.Stderr = os.Stderr if err := xbstream.Start(); err != nil { @@ -175,6 +175,10 @@ func cloneFromSource(cfg *Config, host string) error { return fmt.Errorf("xbstream wait error: %s", err) } + if err := checkBackupTrailers(response); err != nil { + return err + } + return nil } diff --git a/pkg/sidecar/apptakebackup.go b/pkg/sidecar/apptakebackup.go index 79dd72b9c..776ebf209 100644 --- a/pkg/sidecar/apptakebackup.go +++ b/pkg/sidecar/apptakebackup.go @@ -31,8 +31,9 @@ func RunTakeBackupCommand(cfg *Config, srcHost, destBucket string) error { } func pushBackupFromTo(cfg *Config, srcHost, destBucket string) error { + tmpDestBucket := fmt.Sprintf("%s.tmp", destBucket) - backupBody, err := requestABackup(cfg, srcHost, serverBackupEndpoint) + response, err := requestABackup(cfg, srcHost, serverBackupEndpoint) if err != nil { return fmt.Errorf("getting backup: %s", err) } @@ -42,9 +43,9 @@ func pushBackupFromTo(cfg *Config, srcHost, destBucket string) error { // nolint: gosec rclone := exec.Command("rclone", - fmt.Sprintf("--config=%s", rcloneConfigFile), "rcat", destBucket) + fmt.Sprintf("--config=%s", rcloneConfigFile), "rcat", tmpDestBucket) - gzip.Stdin = backupBody + gzip.Stdin = response.Body gzip.Stderr = os.Stderr rclone.Stderr = os.Stderr @@ -66,11 +67,33 @@ func pushBackupFromTo(cfg *Config, srcHost, destBucket string) error { // wait for both commands to finish successful for i := 1; i <= 2; i++ { - if err := <-errChan; err != nil { + if err = <-errChan; err != nil { return err } } + if err = checkBackupTrailers(response); err != nil { + // backup failed so delete it from remote + log.Info("backup was partially taken", "trailers", response.Trailer) + return err + } + + log.Info("backup was taken successfully now move, now move it to permanent URL") + + // the backup was a success + // remove .tmp extension + // nolint: gosec + rcMove := exec.Command("rclone", + fmt.Sprintf("--config=%s", rcloneConfigFile), "moveto", tmpDestBucket, destBucket) + + if err = rcMove.Start(); err != nil { + return fmt.Errorf("final move failed: %s", err) + } + + if err = rcMove.Wait(); err != nil { + return fmt.Errorf("final move failed: %s", err) + } + return nil } diff --git a/pkg/sidecar/server.go b/pkg/sidecar/server.go index 4e7bad43d..4f7c607da 100644 --- a/pkg/sidecar/server.go +++ b/pkg/sidecar/server.go @@ -25,6 +25,12 @@ import ( "os/exec" ) +const ( + backupStatusTrailer = "X-Backup-Status" + backupSuccessfull = "Success" + backupFailed = "Failed" +) + type server struct { cfg *Config http.Server @@ -79,6 +85,7 @@ func (s *server) backupHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("Connection", "keep-alive") + w.Header().Set("Trailer", backupStatusTrailer) // nolint: gosec xtrabackup := exec.Command("xtrabackup", "--backup", "--slave-info", "--stream=xbstream", @@ -112,13 +119,16 @@ func (s *server) backupHandler(w http.ResponseWriter, r *http.Request) { return } - flusher.Flush() - if err := xtrabackup.Wait(); err != nil { log.Error(err, "failed waiting for xtrabackup to finish") + w.Header().Set(backupStatusTrailer, backupFailed) http.Error(w, "xtrabackup failed", http.StatusInternalServerError) return } + + // success + w.Header().Set(backupStatusTrailer, backupSuccessfull) + flusher.Flush() } func (s *server) isAuthenticated(r *http.Request) bool { @@ -137,3 +147,49 @@ func maxClients(h http.Handler, n int) http.Handler { h.ServeHTTP(w, r) }) } + +// requestABackup connects to specified host and endpoint and gets the backup +func requestABackup(cfg *Config, host, endpoint string) (*http.Response, error) { + log.Info("initialize a backup", "host", host, "endpoint", endpoint) + + req, err := http.NewRequest("GET", fmt.Sprintf( + "http://%s:%d%s", host, serverPort, endpoint), nil) + + if err != nil { + return nil, fmt.Errorf("fail to create request: %s", err) + } + + // set authentification user and password + req.SetBasicAuth(cfg.BackupUser, cfg.BackupPassword) + + client := &http.Client{} + + resp, err := client.Do(req) + if err != nil || resp.StatusCode != 200 { + status := "unknown" + if resp != nil { + status = resp.Status + } + return nil, fmt.Errorf("fail to get backup: %s, code: %s", err, status) + } + + return resp, nil +} + +func checkBackupTrailers(resp *http.Response) error { + if values, ok := resp.Trailer[backupStatusTrailer]; !ok || !stringInSlice(backupSuccessfull, values) { + // backup is failed, remove from remote + return fmt.Errorf("backup failed to be taken: no 'Success' trailer found") + } + + return nil +} + +func stringInSlice(a string, list []string) bool { + for _, b := range list { + if b == a { + return true + } + } + return false +} diff --git a/pkg/sidecar/util.go b/pkg/sidecar/util.go index 49393d39d..9ac33bb12 100644 --- a/pkg/sidecar/util.go +++ b/pkg/sidecar/util.go @@ -20,7 +20,6 @@ import ( "database/sql" "fmt" "io" - "net/http" "os" // add mysql driver @@ -47,7 +46,7 @@ func runQuery(cfg *Config, q string, args ...interface{}) error { } }() - log.V(1).Info("running query", "query", q, "args", args) + log.V(1).Info("running query", "query", q) if _, err := db.Exec(q, args...); err != nil { return err } @@ -86,32 +85,6 @@ func copyFile(src, dst string) error { return nil } -// requestABackup connects to specified host and endpoint and gets the backup -func requestABackup(cfg *Config, host, endpoint string) (io.Reader, error) { - log.Info("initialize a backup", "host", host, "endpoint", endpoint) - - req, err := http.NewRequest("GET", fmt.Sprintf("http://%s:%d%s", host, serverPort, endpoint), nil) - if err != nil { - return nil, fmt.Errorf("fail to create request: %s", err) - } - - // set authentification user and password - req.SetBasicAuth(cfg.BackupUser, cfg.BackupPassword) - - client := &http.Client{} - - resp, err := client.Do(req) - if err != nil || resp.StatusCode != 200 { - status := "unknown" - if resp != nil { - status = resp.Status - } - return nil, fmt.Errorf("fail to get backup: %s, code: %s", err, status) - } - - return resp.Body, nil -} - // shouldBootstrapNode checks if the mysql data is at the first initialization func shouldBootstrapNode() bool { _, err := os.Open(fmt.Sprintf("%s/%s/%s.CSV", dataDir,