Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update osc processlist #48

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions session/osc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -46,6 +47,10 @@ import (
log "github.com/sirupsen/logrus"
)

var (
oscConnID uint32
)

type ChanOscData struct {
out string
p *util.OscProcessInfo
Expand Down Expand Up @@ -479,6 +484,8 @@ func (s *session) mysqlExecuteAlterTableGhost(r *Record) {
}

p := &util.OscProcessInfo{
ID: uint64(atomic.AddUint32(&oscConnID, 1)),
ConnID: s.sessionVars.ConnectionID,
Schema: r.TableInfo.Schema,
Table: r.TableInfo.Name,
Command: r.Sql,
Expand All @@ -490,11 +497,11 @@ func (s *session) mysqlExecuteAlterTableGhost(r *Record) {
}
s.sessionManager.AddOscProcess(p)

defer func() {
// 执行完成或中止后清理osc进程信息
pl := s.sessionManager.ShowOscProcessList()
delete(pl, p.Sqlsha1)
}()
// defer func() {
// // 执行完成或中止后清理osc进程信息
// pl := s.sessionManager.ShowOscProcessList()
// delete(pl, p.Sqlsha1)
// }()

done := false
buf := bytes.NewBufferString("")
Expand Down Expand Up @@ -597,6 +604,8 @@ func (s *session) execCommand(r *Record, commandName string, params []string) bo
}

p := &util.OscProcessInfo{
ID: uint64(atomic.AddUint32(&oscConnID, 1)),
ConnID: s.sessionVars.ConnectionID,
Schema: r.TableInfo.Schema,
Table: r.TableInfo.Name,
Command: r.Sql,
Expand All @@ -607,9 +616,11 @@ func (s *session) execCommand(r *Record, commandName string, params []string) bo
}
s.sessionManager.AddOscProcess(p)

var wg sync.WaitGroup
wg.Add(2)

// 消息
reader := bufio.NewReader(stdout)

// 进度
reader2 := bufio.NewReader(stderr)

Expand All @@ -620,6 +631,7 @@ func (s *session) execCommand(r *Record, commandName string, params []string) bo
for {
line, err2 := reader.ReadString('\n')
if err2 != nil || io.EOF == err2 {
wg.Done()
break
}
buf.WriteString(line)
Expand All @@ -639,6 +651,8 @@ func (s *session) execCommand(r *Record, commandName string, params []string) bo
go f(reader)
go f(reader2)

wg.Wait()

//阻塞直到该命令执行完成,该命令必须是被Start方法开始执行的
err = cmd.Wait()
if err != nil {
Expand All @@ -647,6 +661,8 @@ func (s *session) execCommand(r *Record, commandName string, params []string) bo
log.Error(err)
}
if p.Percent < 100 || s.hasError() {
s.recordSets.MaxLevel = 2
r.ErrLevel = 2
r.StageStatus = StatusExecFail
} else {
r.StageStatus = StatusExecOK
Expand All @@ -659,8 +675,8 @@ func (s *session) execCommand(r *Record, commandName string, params []string) bo
}

// 执行完成或中止后清理osc进程信息
pl := s.sessionManager.ShowOscProcessList()
delete(pl, p.Sqlsha1)
// pl := s.sessionManager.ShowOscProcessList()
// delete(pl, p.Sqlsha1)

return true
}
Expand Down
102 changes: 71 additions & 31 deletions session/session_inception.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,11 @@ func (s *session) executeCommit(ctx context.Context) {
return
}

defer func() {
// 执行结束后清理osc进程信息
s.cleanup()
}()

s.executeAllStatement(ctx)

// 只要有执行成功的,就添加备份
Expand Down Expand Up @@ -4924,30 +4929,30 @@ func (s *session) executeLocalShowProcesslist(node *ast.ShowStmt) ([]ast.RecordS
res := NewProcessListSets(len(pl))

for _, k := range keys {
pi := pl[uint64(k)]

var info string
if node.Full {
info = pi.Info
} else {
info = fmt.Sprintf("%.100v", pi.Info)
}
if pi, ok := pl[uint64(k)]; ok {
var info string
if node.Full {
info = pi.Info
} else {
info = fmt.Sprintf("%.100v", pi.Info)
}

data := []interface{}{
pi.ID,
pi.DestUser,
pi.DestHost,
pi.DestPort,
pi.Host,
pi.Command,
pi.OperState,
int64(time.Since(pi.Time) / time.Second),
info,
}
if pi.Percent > 0 {
data = append(data, fmt.Sprintf("%.2f%%", pi.Percent*100))
data := []interface{}{
pi.ID,
pi.DestUser,
pi.DestHost,
pi.DestPort,
pi.Host,
pi.Command,
pi.OperState,
int64(time.Since(pi.Time) / time.Second),
info,
}
if pi.Percent > 0 {
data = append(data, fmt.Sprintf("%.2f%%", pi.Percent*100))
}
res.appendRow(data)
}
res.appendRow(data)
}

s.sessionVars.StmtCtx.AddAffectedRows(uint64(res.rc.count))
Expand All @@ -4961,17 +4966,28 @@ func (s *session) executeLocalShowOscProcesslist(node *ast.ShowOscStmt) ([]ast.R
res := NewOscProcessListSets(len(pl), node.Sqlsha1 != "")

if node.Sqlsha1 == "" {

var keys []int
all := make(map[uint64]*util.OscProcessInfo, len(pl))
for _, pi := range pl {
data := []interface{}{
pi.Schema,
pi.Table,
pi.Command,
pi.Sqlsha1,
pi.Percent,
pi.RemainTime,
pi.Info,
keys = append(keys, int(pi.ID))
all[pi.ID] = pi
}
sort.Ints(keys)

for _, k := range keys {
if pi, ok := all[uint64(k)]; ok {
data := []interface{}{
pi.Schema,
pi.Table,
pi.Command,
pi.Sqlsha1,
pi.Percent,
pi.RemainTime,
pi.Info,
}
res.appendRow(data)
}
res.appendRow(data)
}
} else if pi, ok := pl[node.Sqlsha1]; ok {
data := []interface{}{
Expand Down Expand Up @@ -6548,3 +6564,27 @@ func (s *session) addNewSplitNode() {
s.splitSets.ddlflag = 0
s.splitSets.sqlBuf = new(bytes.Buffer)
}

// cleanup 清理变量,缓存,osc进程等
func (s *session) cleanup() {
if s.sessionManager == nil {
return
}
// 执行完成或中止后清理osc进程信息
pl := s.sessionManager.ShowOscProcessList()
if len(pl) == 0 {
return
}
oscList := []string{}
for _, pi := range pl {
if pi.ConnID == s.sessionVars.ConnectionID {
oscList = append(oscList, pi.Sqlsha1)
}
}

if len(oscList) > 0 {
for _, sha1 := range oscList {
delete(pl, sha1)
}
}
}
4 changes: 4 additions & 0 deletions util/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type SessionManager interface {

// OscProcessInfo is a struct used for show osc processlist statement.
type OscProcessInfo struct {
ID uint64
// 连接ID
ConnID uint64

Schema string
Table string
Command string
Expand Down