Skip to content

Commit

Permalink
PR #118 添加新参数hex_blob以支持回滚时解析二进制类型
Browse files Browse the repository at this point in the history
feature: 添加新参数hex_blob以支持回滚时解析二进制类型
  • Loading branch information
hanchuanchuan authored Nov 19, 2019
2 parents 5280e21 + feed312 commit a2eb1d8
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 252 deletions.
6 changes: 4 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,10 @@ type Inc struct {

// 全量日志
GeneralLog bool `toml:"general_log" json:"general_log"`

Lang string `toml:"lang" json:"lang"`
// 使用十六进制表示法转储二进制列
// 受影响的数据类型为BINARY,VARBINARY,BLOB类型
HexBlob bool `toml:"hex_blob" json:"hex_blob"`
Lang string `toml:"lang" json:"lang"`
// 连接服务器允许的最大包大小,以字节为单位 默认值为4194304(即4MB)
MaxAllowedPacket uint `toml:"max_allowed_packet" json:"max_allowed_packet"`
MaxCharLength uint `toml:"max_char_length" json:"max_char_length"`
Expand Down
50 changes: 40 additions & 10 deletions session/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package session
import (
"context"
"database/sql/driver"
"encoding/hex"
"fmt"
"math"
"reflect"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"

mysqlDriver "github.com/go-sql-driver/mysql"
"github.com/juju/errors"
Expand Down Expand Up @@ -414,6 +416,8 @@ func (s *session) flush(table string, record *Record) {
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
record.StageStatus = StatusBackupFail
record.AppendErrorMessage(myErr.Message)
// log.Error(fmt.Sprintf(sql, table, values))
// log.Error(s.insertBuffer)
log.Error(myErr)
}
}
Expand Down Expand Up @@ -486,7 +490,7 @@ func (s *session) generateInsertSql(t *TableInfo, e *replication.RowsEvent,
// }
}

r, err := InterpolateParams(sql, vv)
r, err := InterpolateParams(sql, vv, s.Inc.HexBlob)
s.checkError(err)

s.write(r, binEvent)
Expand Down Expand Up @@ -549,7 +553,7 @@ func (s *session) generateDeleteSql(t *TableInfo, e *replication.RowsEvent,
}
newSql := strings.Join([]string{sql, strings.Join(columnNames, " AND")}, "")

r, err := InterpolateParams(newSql, vv)
r, err := InterpolateParams(newSql, vv, s.Inc.HexBlob)
s.checkError(err)

s.write(r, binEvent)
Expand Down Expand Up @@ -729,7 +733,7 @@ func (s *session) generateUpdateSql(t *TableInfo, e *replication.RowsEvent,
}
newSql = strings.Join([]string{sql, strings.Join(columnNames, " AND")}, "")
newValues = append(newValues, oldValues...)
r, err := InterpolateParams(newSql, newValues)
r, err := InterpolateParams(newSql, newValues, s.Inc.HexBlob)
s.checkError(err)

s.write(r, binEvent)
Expand All @@ -742,7 +746,7 @@ func (s *session) generateUpdateSql(t *TableInfo, e *replication.RowsEvent,
return string(buf), nil
}

func InterpolateParams(query string, args []driver.Value) ([]byte, error) {
func InterpolateParams(query string, args []driver.Value, hexBlob bool) ([]byte, error) {
// Number of ? should be same to len(args)
if strings.Count(query, "?") != len(args) {
log.Error("sql", query, "需要参数", strings.Count(query, "?"),
Expand Down Expand Up @@ -772,6 +776,9 @@ func InterpolateParams(query string, args []driver.Value) ([]byte, error) {
continue
}

// log.Info(arg)
// log.Infof("%T", arg)

switch v := arg.(type) {
case int8:
buf = strconv.AppendInt(buf, int64(v), 10)
Expand Down Expand Up @@ -843,18 +850,41 @@ func InterpolateParams(query string, args []driver.Value) ([]byte, error) {
buf = append(buf, '\'')
}
case string:
buf = append(buf, '\'')
buf = escapeBytesBackslash(buf, []byte(v))
if hexBlob {
if utf8.ValidString(v) {
buf = append(buf, '\'')
buf = escapeBytesBackslash(buf, []byte(v))
} else {
buf = append(buf, 'X')
buf = append(buf, '\'')
b := hex.EncodeToString([]byte(v))
buf = append(buf, b...)
}
} else {
buf = append(buf, '\'')
buf = escapeBytesBackslash(buf, []byte(v))
}

buf = append(buf, '\'')
case []byte:
if v == nil {
buf = append(buf, "NULL"...)
} else {
// buf = append(buf, "_binary'"...)
buf = append(buf, '\'')

buf = escapeBytesBackslash(buf, v)

if hexBlob {
if utf8.Valid(v) {
buf = append(buf, '\'')
buf = escapeBytesBackslash(buf, v)
} else {
buf = append(buf, 'X')
buf = append(buf, '\'')
b := hex.EncodeToString(v)
buf = append(buf, b...)
}
} else {
buf = append(buf, '\'')
buf = escapeBytesBackslash(buf, v)
}
buf = append(buf, '\'')
}
default:
Expand Down
2 changes: 1 addition & 1 deletion session/session_inception.go
Original file line number Diff line number Diff line change
Expand Up @@ -6159,7 +6159,7 @@ func (s *session) executeInceptionShow(sql string) ([]sqlexec.RecordSet, error)
vv = append(vv, *val)
}

res, err := InterpolateParams(paramValues, vv)
res, err := InterpolateParams(paramValues, vv, s.Inc.HexBlob)
if err != nil {
s.AppendErrorMessage(err.Error())
return nil, nil
Expand Down
Loading

0 comments on commit a2eb1d8

Please sign in to comment.