Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: 添加选项max_execution_time设置会话级DML语句的最大执行时间 #319

Merged
merged 1 commit into from
Mar 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,11 @@ type Inc struct {

// 支持的存储引擎,多个时以分号分隔
SupportEngine string `toml:"support_engine" json:"support_engine"`
// 远端数据库等待超时时间单位:秒
// 远端数据库等待超时时间, 单位:秒
WaitTimeout int `toml:"wait_timeout" json:"wait_timeout"`

// 远端数据库最大执行时间, 单位:秒
MaxExecutionTime int `toml:"max_execution_time" json:"max_execution_time"`
// 版本信息
Version string `toml:"version" json:"version"`
}
Expand Down
24 changes: 24 additions & 0 deletions session/session_inception.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ func (s *session) executeCommit(ctx context.Context) {
}

s.modifyWaitTimeout()
s.modifyMaxExecutionTime()

if s.opt.Backup {
if !s.checkBinlogIsOn() {
Expand Down Expand Up @@ -1864,6 +1865,29 @@ func (s *session) modifyWaitTimeout() {
}
}

func (s *session) modifyMaxExecutionTime() {
if s.inc.MaxExecutionTime <= 0 {
return
}
log.Debug("modifyMaxExecutionTime")

var sql string
if s.dbVersion < 50708 || s.dbType == DBTypeMariaDB {
sql = fmt.Sprintf("set session max_statement_time=%d;", s.inc.MaxExecutionTime)
} else {
sql = fmt.Sprintf("set session max_execution_time=%d;", s.inc.MaxExecutionTime)
}

if _, err := s.exec(sql, true); err != nil {
log.Errorf("con:%d %v", s.sessionVars.ConnectionID, err)
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.appendErrorMessage(myErr.Message)
} else {
s.appendErrorMessage(err.Error())
}
}
}

func (s *session) modifyBinlogRowImageFull() {
log.Debug("modifyBinlogRowImageFull")

Expand Down
25 changes: 25 additions & 0 deletions session/session_inception_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,3 +1069,28 @@ func (s *testSessionIncExecSuite) TestExecAnyStatement(c *C) {
s.mustRunExec(c, "create user test1@'127.0.0.1' identified by '123';")
s.mustRunExec(c, "drop user test1@'127.0.0.1';")
}

func (s *testSessionIncExecSuite) TestMaxExecutionTime(c *C) {
saved := config.GetGlobalConfig().Inc
savedOsc := config.GetGlobalConfig().Osc
defer func() {
config.GetGlobalConfig().Inc = saved
config.GetGlobalConfig().Osc = savedOsc
}()

config.GetGlobalConfig().Inc.CheckColumnComment = false
config.GetGlobalConfig().Inc.CheckTableComment = false
config.GetGlobalConfig().Inc.EnableDropTable = true
config.GetGlobalConfig().Osc.OscOn = true
config.GetGlobalConfig().Ghost.GhostOn = false
config.GetGlobalConfig().Osc.OscMinTableSize = 0
config.GetGlobalConfig().Inc.MaxExecutionTime = 1

sql := "drop table if exists t1;create table t1(id int auto_increment primary key,c1 int);"
s.mustRunExec(c, sql)

// 删除后添加列
sql = `# 这是一条注释
alter table t1 drop column c1;alter table t1 add column c1 varchar(20);`
s.testErrorCode(c, sql)
}