Skip to content

Commit

Permalink
update: 重新设计kill操作支持,支持远端数据库kill和inception kill
Browse files Browse the repository at this point in the history
  • Loading branch information
hanchuanchuan committed May 11, 2019
1 parent c90f37d commit 3e3336b
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 152 deletions.
262 changes: 128 additions & 134 deletions session/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,165 +18,159 @@
package session

import (
"fmt"
// "math"
// "strings"
"database/sql"
"time"

// "github.com/hanchuanchuan/goInception/ast"
// "github.com/hanchuanchuan/goInception/expression"
// "github.com/hanchuanchuan/goInception/mysql"
// "github.com/hanchuanchuan/goInception/sessionctx/stmtctx"
mysqlDriver "github.com/go-sql-driver/mysql"
"github.com/jinzhu/gorm"
log "github.com/sirupsen/logrus"
"fmt"
// "math"
// "strings"
"database/sql"
"time"

// "github.com/hanchuanchuan/goInception/ast"
// "github.com/hanchuanchuan/goInception/expression"
// "github.com/hanchuanchuan/goInception/mysql"
// "github.com/hanchuanchuan/goInception/sessionctx/stmtctx"
mysqlDriver "github.com/go-sql-driver/mysql"
"github.com/jinzhu/gorm"
log "github.com/sirupsen/logrus"
)

const maxBadConnRetries = 1
const maxBadConnRetries = 2

// createNewConnection 用来创建新的连接
// 注意: 该方法可能导致driver: bad connection异常
func (s *session) createNewConnection(dbName string) {
addr := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local&maxAllowedPacket=4194304",
s.opt.user, s.opt.password, s.opt.host, s.opt.port, dbName)
addr := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local&maxAllowedPacket=4194304",
s.opt.user, s.opt.password, s.opt.host, s.opt.port, dbName)

db, err := gorm.Open("mysql", addr)
db, err := gorm.Open("mysql", addr)

if err != nil {
log.Error(err)
s.AppendErrorMessage(err.Error())
return
}
if err != nil {
log.Error(err)
s.AppendErrorMessage(err.Error())
return
}

if s.db != nil {
s.db.Close()
}
if s.db != nil {
s.db.Close()
}

// 禁用日志记录器,不显示任何日志
db.LogMode(false)
// 禁用日志记录器,不显示任何日志
db.LogMode(false)

// 为保证连接成功关闭,此处等待10ms
time.Sleep(10 * time.Millisecond)
// 为保证连接成功关闭,此处等待10ms
time.Sleep(10 * time.Millisecond)

s.db = db
s.db = db
}

// Raw 执行sql语句,连接失败时自动重连,自动重置当前数据库
func (s *session) Raw(sqlStr string) (rows *sql.Rows, err error) {

// 连接断开无效时,自动重试
for i := 0; i < maxBadConnRetries; i++ {
// rows, err = s.db.Raw(sqlStr).Rows()
rows, err = s.db.DB().Query(sqlStr)
if err == nil {
return
} else {
log.Error(err)
if err == mysqlDriver.ErrInvalidConn {
err = s.initConnection()
if err != nil {
return
}
} else {
return
}
}
}

return
// 连接断开无效时,自动重试
for i := 0; i < maxBadConnRetries; i++ {
rows, err = s.db.DB().Query(sqlStr)
if err == nil {
return
} else {
log.Error(err)
if err == mysqlDriver.ErrInvalidConn {
err1 := s.initConnection()
if err1 != nil {
return rows, err1
}
s.AppendErrorMessage(mysqlDriver.ErrInvalidConn.Error())
continue
} else {
return
}
}
}
return
}

// Raw 执行sql语句,连接失败时自动重连,自动重置当前数据库
func (s *session) Exec(sqlStr string) (res sql.Result, err error) {

// 连接断开无效时,自动重试
for i := 0; i < maxBadConnRetries; i++ {
res, err = s.db.DB().Exec(sqlStr)
// err = res.Error
if err == nil {
return
} else {
if err == mysqlDriver.ErrInvalidConn {
log.Error(err)
err = s.initConnection()
if err != nil {
return
}
} else {
return
// if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
// s.AppendErrorMessage(myErr.Message)
// } else {
// s.AppendErrorMessage(err.Error())
// }
// break
}
}
}

return
func (s *session) Exec(sqlStr string, retry bool) (res sql.Result, err error) {
// 连接断开无效时,自动重试
for i := 0; i < maxBadConnRetries; i++ {
res, err = s.db.DB().Exec(sqlStr)
if err == nil {
return
} else {
log.Error(err)
if err == mysqlDriver.ErrInvalidConn {
err1 := s.initConnection()
if err1 != nil {
return res, err1
}
if retry {
s.AppendErrorMessage(mysqlDriver.ErrInvalidConn.Error())
continue
} else {
return
}
}
return
}
}
return
}

// Raw 执行sql语句,连接失败时自动重连,自动重置当前数据库
func (s *session) RawScan(sqlStr string, dest interface{}) (err error) {

// 连接断开无效时,自动重试
for i := 0; i < maxBadConnRetries; i++ {
err = s.db.Raw(sqlStr).Scan(dest).Error
if err == nil {
return
} else {
if err == mysqlDriver.ErrInvalidConn {
log.Error(err)
err = s.initConnection()
if err != nil {
return
}
} else {
return
}
}
}

return
// 连接断开无效时,自动重试
for i := 0; i < maxBadConnRetries; i++ {
err = s.db.Raw(sqlStr).Scan(dest).Error
if err == nil {
return
} else {
log.Error(err)
if err == mysqlDriver.ErrInvalidConn {
err1 := s.initConnection()
if err1 != nil {
return err1
}
s.AppendErrorMessage(mysqlDriver.ErrInvalidConn.Error())
continue
} else {
return
}
}
}
return
}

// initConnection 连接失败时自动重连,重连后重置当前数据库
func (s *session) initConnection() (err error) {

name := s.DBName
if name == "" {
name = "mysql"
}

// 连接断开无效时,自动重试
for i := 0; i < maxBadConnRetries; i++ {
if err = s.db.Exec(fmt.Sprintf("USE `%s`", name)).Error; err == nil {
// 连接重连时,清除线程ID缓存
s.threadID = 0
return
} else {
if err != mysqlDriver.ErrInvalidConn {
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.AppendErrorMessage(myErr.Message)
} else {
s.AppendErrorMessage(err.Error())
}
return
}
log.Error(err)
}
}

if err != nil {
log.Error(err)
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.AppendErrorMessage(myErr.Message)
} else {
s.AppendErrorMessage(err.Error())
}
}

return
name := s.DBName
if name == "" {
name = "mysql"
}

// 连接断开无效时,自动重试
for i := 0; i < maxBadConnRetries; i++ {
if err = s.db.Exec(fmt.Sprintf("USE `%s`", name)).Error; err == nil {
// 连接重连时,清除线程ID缓存
// s.threadID = 0
log.Info("数据库断开重连")
return
} else {
log.Error(err)
if err != mysqlDriver.ErrInvalidConn {
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.AppendErrorMessage(myErr.Message)
} else {
s.AppendErrorMessage(err.Error())
}
return
}
}
}

if err != nil {
log.Error(err)
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.AppendErrorMessage(myErr.Message)
} else {
s.AppendErrorMessage(err.Error())
}
}
return
}
34 changes: 27 additions & 7 deletions session/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *session) GetNextBackupRecord() *Record {

continue

} else if r.AffectedRows > 0 && s.checkSqlIsDML(r) {
} else if (r.AffectedRows > 0 || r.StageStatus == StatusExecFail) && s.checkSqlIsDML(r) {

// if s.opt.middlewareExtend != "" {
// continue
Expand All @@ -99,7 +99,10 @@ func (s *session) GetNextBackupRecord() *Record {
}

// 先置默认值为备份失败,在备份完成后置为成功
r.StageStatus = StatusBackupFail
// if r.AffectedRows > 0 {
if r.StageStatus != StatusExecFail {
r.StageStatus = StatusBackupFail
}
clearDeleteColumns(r.TableInfo)

return r
Expand Down Expand Up @@ -291,7 +294,14 @@ func (s *session) Parser(ctx context.Context) {
ENDCHECK:
// 如果操作已超过binlog范围,切换到下一日志
if currentPosition.Compare(stopPosition) > -1 {
record.StageStatus = StatusBackupOK
// sql被kill后,如果备份时可以检测到行,则认为执行成功
// 工单只有执行成功,才允许标记为备份成功
// if (record.StageStatus == StatusExecFail && record.AffectedRows > 0) ||
// record.StageStatus == StatusExecOK || record.StageStatus == StatusBackupFail {
if record.AffectedRows > 0 {
record.StageStatus = StatusBackupOK
}

record.BackupCostTime = fmt.Sprintf("%.3f", time.Since(startTime).Seconds())

next := s.GetNextBackupRecord()
Expand All @@ -307,10 +317,14 @@ func (s *session) Parser(ctx context.Context) {
}
}

// // 进程Killed
// if err := checkClose(ctx); err != nil {
// log.Warn("Killed: ", err)
// s.AppendErrorMessage("Operation has been killed!")
// 进程Killed
if err := checkClose(ctx); err != nil {
log.Warn("Killed: ", err)
s.AppendErrorMessage("Operation has been killed!")
break
}

// if s.hasErrorBefore() {
// break
// }
}
Expand Down Expand Up @@ -401,6 +415,12 @@ func (s *session) checkError(e error) {
}

func (s *session) write(b []byte, binEvent *replication.BinlogEvent) {
// 此处执行状态不确定的记录
if s.myRecord.StageStatus == StatusExecFail {
log.Info("auto fix record:", s.myRecord.OPID)
s.myRecord.AffectedRows += 1
s.TotalChangeRows += 1
}
s.ch <- &ChanData{sql: b, e: binEvent, opid: s.myRecord.OPID,
table: s.lastBackupTable, record: s.myRecord}
}
Expand Down
1 change: 1 addition & 0 deletions session/session_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (s *session) flushBackupRecord(dbname string, record *Record) {
if err != nil {
log.Error(err)
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.recordSets.MaxLevel = 2
record.StageStatus = StatusBackupFail
record.AppendErrorMessage(myErr.Message)
}
Expand Down
Loading

0 comments on commit 3e3336b

Please sign in to comment.