Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add succ filed to slow log and fix shallow copy problem when parse slow log file. #11417

Merged
merged 13 commits into from
Jul 24, 2019
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,10 +650,10 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
if costTime < threshold {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, sql))
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, succ, sql))
} else {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, sql))
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, succ, sql))
metrics.TotalQueryProcHistogram.Observe(costTime.Seconds())
metrics.TotalCopProcHistogram.Observe(execDetail.ProcessTime.Seconds())
metrics.TotalCopWaitHistogram.Observe(execDetail.WaitTime.Seconds())
Expand Down
144 changes: 50 additions & 94 deletions infoschema/slow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -60,7 +59,8 @@ var slowQueryCols = []columnInfo{
{variable.SlowLogCopWaitMax, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogCopWaitAddr, mysql.TypeVarchar, 64, 0, nil, nil},
{variable.SlowLogMemMax, mysql.TypeLonglong, 20, 0, nil, nil},
{variable.SlowLogQuerySQLStr, mysql.TypeVarchar, 4096, 0, nil, nil},
{variable.SlowLogSucc, mysql.TypeTiny, 1, 0, nil, nil},
{variable.SlowLogQuerySQLStr, mysql.TypeLongBlob, types.UnspecifiedLength, 0, nil, nil},
}

func dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) {
Expand Down Expand Up @@ -139,24 +139,34 @@ func ParseSlowLog(tz *time.Location, reader *bufio.Reader) ([][]types.Datum, err
}

func getOneLine(reader *bufio.Reader) ([]byte, error) {
var resByte []byte
lineByte, isPrefix, err := reader.ReadLine()
if isPrefix {
// Need to read more data.
resByte = make([]byte, len(lineByte), len(lineByte)*2)
} else {
resByte = make([]byte, len(lineByte))
}
// Use copy here to avoid shallow copy problem.
copy(resByte, lineByte)
if err != nil {
return lineByte, err
return resByte, err
}

var tempLine []byte
for isPrefix {
tempLine, isPrefix, err = reader.ReadLine()
lineByte = append(lineByte, tempLine...)
resByte = append(resByte, tempLine...)

// Use the max value of max_allowed_packet to check the single line length.
if len(lineByte) > int(variable.MaxOfMaxAllowedPacket) {
return lineByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
if len(resByte) > int(variable.MaxOfMaxAllowedPacket) {
return resByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
}
if err != nil {
return lineByte, err
return resByte, err
}
}
return lineByte, err
return resByte, err
}

type slowQueryTuple struct {
Expand Down Expand Up @@ -186,27 +196,23 @@ type slowQueryTuple struct {
maxWaitTime float64
maxWaitAddress string
memMax int64
succ bool
sql string
}

func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string) error {
value = stringutil.Copy(value)
var err error
switch field {
case variable.SlowLogTimeStr:
t, err := ParseTime(value)
st.time, err = ParseTime(value)
if err != nil {
return err
break
}
if t.Location() != tz {
t = t.In(tz)
if st.time.Location() != tz {
st.time = st.time.In(tz)
}
st.time = t
case variable.SlowLogTxnStartTSStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.txnStartTs = num
st.txnStartTs, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogUserStr:
fields := strings.SplitN(value, "@", 2)
if len(field) > 0 {
Expand All @@ -216,53 +222,21 @@ func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string)
st.host = fields[1]
}
case variable.SlowLogConnIDStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.connID = num
st.connID, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogQueryTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.queryTime = num
st.queryTime, err = strconv.ParseFloat(value, 64)
case execdetails.ProcessTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.processTime = num
st.processTime, err = strconv.ParseFloat(value, 64)
case execdetails.WaitTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.waitTime = num
st.waitTime, err = strconv.ParseFloat(value, 64)
case execdetails.BackoffTimeStr:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.backOffTime = num
st.backOffTime, err = strconv.ParseFloat(value, 64)
case execdetails.RequestCountStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.requestCount = num
st.requestCount, err = strconv.ParseUint(value, 10, 64)
case execdetails.TotalKeysStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.totalKeys = num
st.totalKeys, err = strconv.ParseUint(value, 10, 64)
case execdetails.ProcessKeysStr:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.processKeys = num
st.processKeys, err = strconv.ParseUint(value, 10, 64)
case variable.SlowLogDBStr:
st.db = value
case variable.SlowLogIndexIDsStr:
Expand All @@ -274,54 +248,31 @@ func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string)
case variable.SlowLogStatsInfoStr:
st.statsInfo = value
case variable.SlowLogCopProcAvg:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.avgProcessTime = num
st.avgProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopProcP90:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.p90ProcessTime = num
st.p90ProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopProcMax:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.maxProcessTime = num
st.maxProcessTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopProcAddr:
st.maxProcessAddress = value
case variable.SlowLogCopWaitAvg:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.avgWaitTime = num
st.avgWaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitP90:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.p90WaitTime = num
st.p90WaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitMax:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return errors.AddStack(err)
}
st.maxWaitTime = num
st.maxWaitTime, err = strconv.ParseFloat(value, 64)
case variable.SlowLogCopWaitAddr:
st.maxWaitAddress = value
case variable.SlowLogMemMax:
num, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return errors.AddStack(err)
}
st.memMax = num
st.memMax, err = strconv.ParseInt(value, 10, 64)
case variable.SlowLogSucc:
st.succ, err = strconv.ParseBool(value)
case variable.SlowLogQuerySQLStr:
st.sql = value
}
if err != nil {
return errors.Wrap(err, "parse slow log failed `"+field+"` error")
}
return nil
}

Expand Down Expand Up @@ -357,6 +308,11 @@ func (st *slowQueryTuple) convertToDatumRow() []types.Datum {
record = append(record, types.NewFloat64Datum(st.maxWaitTime))
record = append(record, types.NewStringDatum(st.maxWaitAddress))
record = append(record, types.NewIntDatum(st.memMax))
if st.succ {
record = append(record, types.NewIntDatum(1))
} else {
record = append(record, types.NewIntDatum(0))
}
record = append(record, types.NewStringDatum(st.sql))
return record
}
Expand Down
29 changes: 28 additions & 1 deletion infoschema/slow_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (s *testSuite) TestParseSlowLogFile(c *C) {
# Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03 Cop_proc_addr: 127.0.0.1:20160
# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 Cop_wait_addr: 0.0.0.0:20160
# Mem_max: 70724
# Succ: false
select * from t;`)
reader := bufio.NewReader(slowLog)
loc, err := time.LoadLocation("Asia/Shanghai")
Expand All @@ -53,7 +54,7 @@ select * from t;`)
}
recordString += str
}
expectRecordString := "2019-04-28 15:24:04.309074,405888132465033227,,,0,0.216905,0.021,0,0,1,637,0,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,select * from t;"
expectRecordString := "2019-04-28 15:24:04.309074,405888132465033227,,,0,0.216905,0.021,0,0,1,637,0,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,0,select * from t;"
c.Assert(expectRecordString, Equals, recordString)

// fix sql contain '# ' bug
Expand All @@ -67,6 +68,7 @@ select a# from t;
# Is_internal: true
# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
# Stats: t1:1,t2:2
# Succ: false
select * from t;
`)
reader = bufio.NewReader(slowLog)
Expand Down Expand Up @@ -110,6 +112,17 @@ select * from t;
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, IsNil)

// Add parse error check.
slowLog = bytes.NewBufferString(
`# Time: 2019-04-28T15:24:04.309074+08:00
# Succ: abc
select * from t;
`)
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "parse slow log failed `Succ` error: strconv.ParseBool: parsing \"abc\": invalid syntax")
}

func (s *testSuite) TestSlowLogParseTime(c *C) {
Expand Down Expand Up @@ -159,3 +172,17 @@ select * from t;`)
_, err = infoschema.ParseSlowLog(loc, scanner)
c.Assert(err, IsNil)
}

func (s *testSuite) TestSlowLogLongQuery(c *C) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, This test is redundant, I will remove this test.

t1Str := "2019-01-24T22:32:29.313255+08:00"
t2Str := "2019-01-24T22:32:29.313255"
t1, err := infoschema.ParseTime(t1Str)
c.Assert(err, IsNil)
loc, err := time.LoadLocation("Asia/Shanghai")
c.Assert(err, IsNil)
t2, err := time.ParseInLocation("2006-01-02T15:04:05.999999999", t2Str, loc)
c.Assert(err, IsNil)
c.Assert(t1.Unix(), Equals, t2.Unix())
t1Format := t1.In(loc).Format(logutil.SlowLogTimeFormat)
c.Assert(t1Format, Equals, t1Str)
}
24 changes: 21 additions & 3 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,18 +458,36 @@ func (s *testTableSuite) TestSlowQuery(c *C) {
# Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03 Cop_proc_addr: 127.0.0.1:20160
# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 Cop_wait_addr: 0.0.0.0:20160
# Mem_max: 70724
# Succ: true
select * from t_slim;`))
c.Assert(f.Close(), IsNil)
c.Assert(f.Sync(), IsNil)
c.Assert(err, IsNil)

tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", slowLogFileName))
tk.MustExec("set time_zone = '+08:00';")
re := tk.MustQuery("select * from information_schema.slow_query")
re.Check(testutil.RowsWithSep("|",
"2019-02-12 19:33:56.571953|406315658548871171|root|127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|select * from t_slim;"))
"2019-02-12 19:33:56.571953|406315658548871171|root|127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|1|select * from t_slim;"))
tk.MustExec("set time_zone = '+00:00';")
re = tk.MustQuery("select * from information_schema.slow_query")
re.Check(testutil.RowsWithSep("|", "2019-02-12 11:33:56.571953|406315658548871171|root|127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|select * from t_slim;"))
re.Check(testutil.RowsWithSep("|", "2019-02-12 11:33:56.571953|406315658548871171|root|127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|1|select * from t_slim;"))

// Test for long query.
_, err = f.Write([]byte(`
# Time: 2019-02-13T19:33:56.571953+08:00
`))
c.Assert(err, IsNil)
sql := "select * from "
for len(sql) < 5000 {
sql += "abcdefghijklmnopqrstuvwxyz_1234567890_qwertyuiopasdfghjklzxcvbnm"
}
sql += ";"
_, err = f.Write([]byte(sql))
c.Assert(err, IsNil)
c.Assert(f.Close(), IsNil)
re = tk.MustQuery("select query from information_schema.slow_query order by time desc limit 1")
rows := re.Rows()
c.Assert(rows[0][0], Equals, sql)
}

func (s *testTableSuite) TestForAnalyzeStatus(c *C) {
Expand Down
6 changes: 5 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,8 @@ const (
SlowLogCopWaitAddr = "Cop_wait_addr"
// SlowLogMemMax is the max number bytes of memory used in this statement.
SlowLogMemMax = "Mem_max"
// SlowLogSucc is used to indicate whether this sql execute successfully.
SlowLogSucc = "Succ"
)

// SlowLogFormat uses for formatting slow log.
Expand All @@ -1038,9 +1040,10 @@ const (
// # Cop_process: Avg_time: 1s P90_time: 2s Max_time: 3s Max_addr: 10.6.131.78
// # Cop_wait: Avg_time: 10ms P90_time: 20ms Max_time: 30ms Max_Addr: 10.6.131.79
// # Memory_max: 4096
// # Succ: true
// select * from t_slim;
func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDetail execdetails.ExecDetails, indexIDs string, digest string,
statsInfos map[string]uint64, copTasks *stmtctx.CopTasksDetails, memMax int64, sql string) string {
statsInfos map[string]uint64, copTasks *stmtctx.CopTasksDetails, memMax int64, succ bool, sql string) string {
var buf bytes.Buffer
execDetailStr := execDetail.String()
buf.WriteString(SlowLogRowPrefixStr + SlowLogTxnStartTSStr + SlowLogSpaceMarkStr + strconv.FormatUint(txnTS, 10) + "\n")
Expand Down Expand Up @@ -1100,6 +1103,7 @@ func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDe
if memMax > 0 {
buf.WriteString(SlowLogRowPrefixStr + SlowLogMemMax + SlowLogSpaceMarkStr + strconv.FormatInt(memMax, 10) + "\n")
}
buf.WriteString(SlowLogRowPrefixStr + SlowLogSucc + SlowLogSpaceMarkStr + strconv.FormatBool(succ) + "\n")
if len(sql) == 0 {
sql = ";"
}
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
# Cop_proc_avg: 1 Cop_proc_p90: 2 Cop_proc_max: 3 Cop_proc_addr: 10.6.131.78
# Cop_wait_avg: 0.01 Cop_wait_p90: 0.02 Cop_wait_max: 0.03 Cop_wait_addr: 10.6.131.79
# Mem_max: 2333
# Succ: true
select * from t;`
sql := "select * from t"
digest := parser.DigestHash(sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, copTasks, memMax, sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, copTasks, memMax, true, sql)
c.Assert(logString, Equals, resultString)
}