Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix backup under stress #260

Merged
merged 4 commits into from
Mar 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions hack/dev-values.yaml

This file was deleted.

118 changes: 118 additions & 0 deletions hack/development/stress_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#!/usr/bin/python3

# Description:
# A simple python script that generates lots of queries to stress the MySQL server.
#
# Notes:
# pip install mysql-connector-python
#
# Examples:
# ./stress_test.py mysql://root:not-so-secure@127.0.0.1:33066/test --times=100

import time
import sys
import argparse
from urllib.parse import urlparse

import mysql.connector


class Base:
def __init__(self, db, args):
self.db = db
self.args = args

def __call__(self):
self.init_sql()

rows = self.args.rows
total_time = time.time()
for i in range(int(self.args.times)):
start_time = time.time()
self.test(i)
print ('{}: round: {} rows: {} to {} time: {}'.format(self.__class__.__name__,
i, i*rows, (i+1)*rows, time.time()-start_time))

def init_sql(self):
pass

def test(self, epoch):
pass


class InsertStress(Base):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.table_name = 'test'

def init_sql(self):
cursor = self.db.cursor()
cursor.execute('CREATE TABLE IF NOT EXISTS {} '
'(f1 VARCHAR(30), f2 VARCHAR(30), f3 VARCHAR(30))'.format(self.table_name))
self.db.commit()

def test(self, epoch):
sql = "INSERT INTO {} (f1, f2, f3) VALUES (%s, %s, %s)".format(self.table_name)
val = []
for i in range(self.args.rows):
val.append(("l_"+str(i), "f_"+str(i), "a_"+str(i)))


mycursor = self.db.cursor()
mycursor.executemany(sql, val)
self.db.commit()


def mysql_connection(dsn):
dsn = urlparse(dsn)
db_name = dsn.path.strip('/')
if len(db_name)==0:
raise RuntimeError('Database name not corectly specifyied: {}'.format(db_name))

try:
db = mysql.connector.connect(
host=dsn.hostname,
port=dsn.port,
user=dsn.username,
passwd=dsn.password,
database=db_name,
)
except mysql.connector.errors.ProgrammingError as e:
if 'Unknown database' in e.msg:
db = mysql.connector.connect(
host=dsn.hostname,
port=dsn.port,
user=dsn.username,
passwd=dsn.password,
)
cursor = db.cursor()
cursor.execute('CREATE DATABASE IF NOT EXISTS {}'.format(db_name))
cursor.execute('USE {}'.format(db_name))
db.commit()
else:
raise

return db


def parse_args():
parser = argparse.ArgumentParser(description='Stress test for mysql.')
parser.add_argument('DSN', type=str,
help='Data source name that contains user and password.')
parser.add_argument('--rows', type=int, default=100000, help='The number of rows to execute')
parser.add_argument('--times', type=int, default=100,
help='The number of times to execute the queries')

return parser.parse_args()


def main():
args = parse_args()
db = mysql_connection(args.DSN)

# run insert stress tests
InsertStress(db, args)()


if __name__ == "__main__":
main()
3 changes: 3 additions & 0 deletions pkg/controller/mysqlcluster/internal/syncer/statefullset.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ func (s *sfsSyncer) ensureContainersSpec() []core.Container {
"--database", HelperDbName,
"--table", "heartbeat",
"--defaults-file", fmt.Sprintf("%s/client.cnf", ConfVolumeMountPath),
// it's important to exit when exceeding more than 20 failed attempts otherwise
// pt-heartbeat will run forever using old connection.
"--fail-successive-errors=20",
},
)
heartbeat.Resources = ensureResources(containerHeartBeatName)
Expand Down
8 changes: 6 additions & 2 deletions pkg/sidecar/appclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func cloneFromBucket(initBucket string) error {
func cloneFromSource(cfg *Config, host string) error {
log.Info("cloning from node", "host", host)

backupBody, err := requestABackup(cfg, host, serverBackupEndpoint)
response, err := requestABackup(cfg, host, serverBackupEndpoint)
if err != nil {
return fmt.Errorf("fail to get backup: %s", err)
}
Expand All @@ -164,7 +164,7 @@ func cloneFromSource(cfg *Config, host string) error {
// nolint: gosec
xbstream := exec.Command("xbstream", "-x", "-C", dataDir)

xbstream.Stdin = backupBody
xbstream.Stdin = response.Body
xbstream.Stderr = os.Stderr

if err := xbstream.Start(); err != nil {
Expand All @@ -175,6 +175,10 @@ func cloneFromSource(cfg *Config, host string) error {
return fmt.Errorf("xbstream wait error: %s", err)
}

if err := checkBackupTrailers(response); err != nil {
return err
}

return nil
}

Expand Down
31 changes: 27 additions & 4 deletions pkg/sidecar/apptakebackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ func RunTakeBackupCommand(cfg *Config, srcHost, destBucket string) error {
}

func pushBackupFromTo(cfg *Config, srcHost, destBucket string) error {
tmpDestBucket := fmt.Sprintf("%s.tmp", destBucket)

backupBody, err := requestABackup(cfg, srcHost, serverBackupEndpoint)
response, err := requestABackup(cfg, srcHost, serverBackupEndpoint)
if err != nil {
return fmt.Errorf("getting backup: %s", err)
}
Expand All @@ -42,9 +43,9 @@ func pushBackupFromTo(cfg *Config, srcHost, destBucket string) error {

// nolint: gosec
rclone := exec.Command("rclone",
fmt.Sprintf("--config=%s", rcloneConfigFile), "rcat", destBucket)
fmt.Sprintf("--config=%s", rcloneConfigFile), "rcat", tmpDestBucket)

gzip.Stdin = backupBody
gzip.Stdin = response.Body
gzip.Stderr = os.Stderr
rclone.Stderr = os.Stderr

Expand All @@ -66,11 +67,33 @@ func pushBackupFromTo(cfg *Config, srcHost, destBucket string) error {

// wait for both commands to finish successful
for i := 1; i <= 2; i++ {
if err := <-errChan; err != nil {
if err = <-errChan; err != nil {
return err
}
}

if err = checkBackupTrailers(response); err != nil {
// backup failed so delete it from remote
log.Info("backup was partially taken", "trailers", response.Trailer)
return err
}

log.Info("backup was taken successfully now move, now move it to permanent URL")

// the backup was a success
// remove .tmp extension
// nolint: gosec
rcMove := exec.Command("rclone",
fmt.Sprintf("--config=%s", rcloneConfigFile), "moveto", tmpDestBucket, destBucket)

if err = rcMove.Start(); err != nil {
return fmt.Errorf("final move failed: %s", err)
}

if err = rcMove.Wait(); err != nil {
return fmt.Errorf("final move failed: %s", err)
}

return nil
}

Expand Down
60 changes: 58 additions & 2 deletions pkg/sidecar/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import (
"os/exec"
)

const (
backupStatusTrailer = "X-Backup-Status"
backupSuccessfull = "Success"
backupFailed = "Failed"
)

type server struct {
cfg *Config
http.Server
Expand Down Expand Up @@ -79,6 +85,7 @@ func (s *server) backupHandler(w http.ResponseWriter, r *http.Request) {

w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Trailer", backupStatusTrailer)

// nolint: gosec
xtrabackup := exec.Command("xtrabackup", "--backup", "--slave-info", "--stream=xbstream",
Expand Down Expand Up @@ -112,13 +119,16 @@ func (s *server) backupHandler(w http.ResponseWriter, r *http.Request) {
return
}

flusher.Flush()

if err := xtrabackup.Wait(); err != nil {
log.Error(err, "failed waiting for xtrabackup to finish")
w.Header().Set(backupStatusTrailer, backupFailed)
http.Error(w, "xtrabackup failed", http.StatusInternalServerError)
return
}

// success
w.Header().Set(backupStatusTrailer, backupSuccessfull)
flusher.Flush()
}

func (s *server) isAuthenticated(r *http.Request) bool {
Expand All @@ -137,3 +147,49 @@ func maxClients(h http.Handler, n int) http.Handler {
h.ServeHTTP(w, r)
})
}

// requestABackup connects to specified host and endpoint and gets the backup
func requestABackup(cfg *Config, host, endpoint string) (*http.Response, error) {
log.Info("initialize a backup", "host", host, "endpoint", endpoint)

req, err := http.NewRequest("GET", fmt.Sprintf(
"http://%s:%d%s", host, serverPort, endpoint), nil)

if err != nil {
return nil, fmt.Errorf("fail to create request: %s", err)
}

// set authentification user and password
req.SetBasicAuth(cfg.BackupUser, cfg.BackupPassword)

client := &http.Client{}

resp, err := client.Do(req)
if err != nil || resp.StatusCode != 200 {
status := "unknown"
if resp != nil {
status = resp.Status
}
return nil, fmt.Errorf("fail to get backup: %s, code: %s", err, status)
}

return resp, nil
}

func checkBackupTrailers(resp *http.Response) error {
if values, ok := resp.Trailer[backupStatusTrailer]; !ok || !stringInSlice(backupSuccessfull, values) {
// backup is failed, remove from remote
return fmt.Errorf("backup failed to be taken: no 'Success' trailer found")
}

return nil
}

func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
29 changes: 1 addition & 28 deletions pkg/sidecar/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"database/sql"
"fmt"
"io"
"net/http"
"os"

// add mysql driver
Expand All @@ -47,7 +46,7 @@ func runQuery(cfg *Config, q string, args ...interface{}) error {
}
}()

log.V(1).Info("running query", "query", q, "args", args)
log.V(1).Info("running query", "query", q)
if _, err := db.Exec(q, args...); err != nil {
return err
}
Expand Down Expand Up @@ -86,32 +85,6 @@ func copyFile(src, dst string) error {
return nil
}

// requestABackup connects to specified host and endpoint and gets the backup
func requestABackup(cfg *Config, host, endpoint string) (io.Reader, error) {
log.Info("initialize a backup", "host", host, "endpoint", endpoint)

req, err := http.NewRequest("GET", fmt.Sprintf("http://%s:%d%s", host, serverPort, endpoint), nil)
if err != nil {
return nil, fmt.Errorf("fail to create request: %s", err)
}

// set authentification user and password
req.SetBasicAuth(cfg.BackupUser, cfg.BackupPassword)

client := &http.Client{}

resp, err := client.Do(req)
if err != nil || resp.StatusCode != 200 {
status := "unknown"
if resp != nil {
status = resp.Status
}
return nil, fmt.Errorf("fail to get backup: %s, code: %s", err, status)
}

return resp.Body, nil
}

// shouldBootstrapNode checks if the mysql data is at the first initialization
func shouldBootstrapNode() bool {
_, err := os.Open(fmt.Sprintf("%s/%s/%s.CSV", dataDir,
Expand Down