Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add succ filed to slow log and fix shallow copy problem when parse slow log file. #11417

Merged
merged 13 commits into from
Jul 24, 2019
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,10 +650,10 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
if costTime < threshold {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, sql))
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, succ, sql))
} else {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, sql))
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, succ, sql))
metrics.TotalQueryProcHistogram.Observe(costTime.Seconds())
metrics.TotalCopProcHistogram.Observe(execDetail.ProcessTime.Seconds())
metrics.TotalCopWaitHistogram.Observe(execDetail.WaitTime.Seconds())
Expand Down
45 changes: 33 additions & 12 deletions infoschema/slow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -60,7 +59,8 @@ var slowQueryCols = []columnInfo{
{variable.SlowLogCopWaitMax, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogCopWaitAddr, mysql.TypeVarchar, 64, 0, nil, nil},
{variable.SlowLogMemMax, mysql.TypeLonglong, 20, 0, nil, nil},
{variable.SlowLogQuerySQLStr, mysql.TypeVarchar, 4096, 0, nil, nil},
{variable.SlowLogSucc, mysql.TypeTiny, 1, 0, nil, nil},
{variable.SlowLogQuerySQLStr, mysql.TypeLongBlob, types.UnspecifiedLength, 0, nil, nil},
}

func dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) {
Expand Down Expand Up @@ -102,7 +102,7 @@ func ParseSlowLog(tz *time.Location, reader *bufio.Reader) ([][]types.Datum, err
st = &slowQueryTuple{}
err = st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):])
if err != nil {
return rows, err
return rows, errors.Wrap(err, "parse slow log filed `"+variable.SlowLogTimeStr+"` error")
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
startFlag = true
continue
Expand All @@ -120,14 +120,14 @@ func ParseSlowLog(tz *time.Location, reader *bufio.Reader) ([][]types.Datum, err
}
err = st.setFieldValue(tz, field, fieldValues[i+1])
if err != nil {
return rows, err
return rows, errors.Wrap(err, "parse slow log filed `"+field+"` error")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}
} else if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) {
// Get the sql string, and mark the start flag to false.
err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)))
if err != nil {
return rows, err
return rows, errors.Wrap(err, "parse slow log filed `"+variable.SlowLogQuerySQLStr+"` error")
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
rows = append(rows, st.convertToDatumRow())
startFlag = false
Expand All @@ -139,24 +139,34 @@ func ParseSlowLog(tz *time.Location, reader *bufio.Reader) ([][]types.Datum, err
}

func getOneLine(reader *bufio.Reader) ([]byte, error) {
var resByte []byte
lineByte, isPrefix, err := reader.ReadLine()
if isPrefix {
// Need to read more data.
resByte = make([]byte, len(lineByte), len(lineByte)*2)
} else {
resByte = make([]byte, len(lineByte))
}
// Use copy here to avoid shallow copy problem.
copy(resByte, lineByte)
if err != nil {
return lineByte, err
return resByte, err
}

var tempLine []byte
for isPrefix {
tempLine, isPrefix, err = reader.ReadLine()
lineByte = append(lineByte, tempLine...)
resByte = append(resByte, tempLine...)

// Use the max value of max_allowed_packet to check the single line length.
if len(lineByte) > int(variable.MaxOfMaxAllowedPacket) {
return lineByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
if len(resByte) > int(variable.MaxOfMaxAllowedPacket) {
return resByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
}
if err != nil {
return lineByte, err
return resByte, err
}
}
return lineByte, err
return resByte, err
}

type slowQueryTuple struct {
Expand Down Expand Up @@ -186,11 +196,11 @@ type slowQueryTuple struct {
maxWaitTime float64
maxWaitAddress string
memMax int64
succ bool
sql string
}

func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string) error {
value = stringutil.Copy(value)
switch field {
case variable.SlowLogTimeStr:
t, err := ParseTime(value)
Expand Down Expand Up @@ -319,6 +329,12 @@ func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string)
return errors.AddStack(err)
}
st.memMax = num
case variable.SlowLogSucc:
succ, err := strconv.ParseBool(value)
if err != nil {
return errors.AddStack(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use errors.Wrap() to explain which operation causes this error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks, Done.
I add errors.Wrap() outside of this function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to wrap it here? Why delaying it to the outside?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}
st.succ = succ
case variable.SlowLogQuerySQLStr:
st.sql = value
}
Expand Down Expand Up @@ -357,6 +373,11 @@ func (st *slowQueryTuple) convertToDatumRow() []types.Datum {
record = append(record, types.NewFloat64Datum(st.maxWaitTime))
record = append(record, types.NewStringDatum(st.maxWaitAddress))
record = append(record, types.NewIntDatum(st.memMax))
if st.succ {
record = append(record, types.NewIntDatum(1))
} else {
record = append(record, types.NewIntDatum(0))
}
record = append(record, types.NewStringDatum(st.sql))
return record
}
Expand Down
29 changes: 28 additions & 1 deletion infoschema/slow_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (s *testSuite) TestParseSlowLogFile(c *C) {
# Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03 Cop_proc_addr: 127.0.0.1:20160
# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 Cop_wait_addr: 0.0.0.0:20160
# Mem_max: 70724
# Succ: false
select * from t;`)
reader := bufio.NewReader(slowLog)
loc, err := time.LoadLocation("Asia/Shanghai")
Expand All @@ -53,7 +54,7 @@ select * from t;`)
}
recordString += str
}
expectRecordString := "2019-04-28 15:24:04.309074,405888132465033227,,,0,0.216905,0.021,0,0,1,637,0,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,select * from t;"
expectRecordString := "2019-04-28 15:24:04.309074,405888132465033227,,,0,0.216905,0.021,0,0,1,637,0,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,0,select * from t;"
c.Assert(expectRecordString, Equals, recordString)

// fix sql contain '# ' bug
Expand All @@ -67,6 +68,7 @@ select a# from t;
# Is_internal: true
# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
# Stats: t1:1,t2:2
# Succ: false
select * from t;
`)
reader = bufio.NewReader(slowLog)
Expand Down Expand Up @@ -110,6 +112,17 @@ select * from t;
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, IsNil)

// Add parse error check.
slowLog = bytes.NewBufferString(
`# Time: 2019-04-28T15:24:04.309074+08:00
# Succ: abc
select * from t;
`)
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "parse slow log filed `Succ` error: strconv.ParseBool: parsing \"abc\": invalid syntax")
}

func (s *testSuite) TestSlowLogParseTime(c *C) {
Expand Down Expand Up @@ -159,3 +172,17 @@ select * from t;`)
_, err = infoschema.ParseSlowLog(loc, scanner)
c.Assert(err, IsNil)
}

func (s *testSuite) TestSlowLogLongQuery(c *C) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, This test is redundant, I will remove this test.

t1Str := "2019-01-24T22:32:29.313255+08:00"
t2Str := "2019-01-24T22:32:29.313255"
t1, err := infoschema.ParseTime(t1Str)
c.Assert(err, IsNil)
loc, err := time.LoadLocation("Asia/Shanghai")
c.Assert(err, IsNil)
t2, err := time.ParseInLocation("2006-01-02T15:04:05.999999999", t2Str, loc)
c.Assert(err, IsNil)
c.Assert(t1.Unix(), Equals, t2.Unix())
t1Format := t1.In(loc).Format(logutil.SlowLogTimeFormat)
c.Assert(t1Format, Equals, t1Str)
}
24 changes: 21 additions & 3 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,18 +458,36 @@ func (s *testTableSuite) TestSlowQuery(c *C) {
# Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03 Cop_proc_addr: 127.0.0.1:20160
# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 Cop_wait_addr: 0.0.0.0:20160
# Mem_max: 70724
# Succ: true
select * from t_slim;`))
c.Assert(f.Close(), IsNil)
c.Assert(f.Sync(), IsNil)
c.Assert(err, IsNil)

tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", slowLogFileName))
tk.MustExec("set time_zone = '+08:00';")
re := tk.MustQuery("select * from information_schema.slow_query")
re.Check(testutil.RowsWithSep("|",
"2019-02-12 19:33:56.571953|406315658548871171|root|127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|select * from t_slim;"))
"2019-02-12 19:33:56.571953|406315658548871171|root|127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|1|select * from t_slim;"))
tk.MustExec("set time_zone = '+00:00';")
re = tk.MustQuery("select * from information_schema.slow_query")
re.Check(testutil.RowsWithSep("|", "2019-02-12 11:33:56.571953|406315658548871171|root|127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|select * from t_slim;"))
re.Check(testutil.RowsWithSep("|", "2019-02-12 11:33:56.571953|406315658548871171|root|127.0.0.1|6|4.895492|0.161|0.101|0.092|1|100001|100000|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|1|select * from t_slim;"))

// Test for long query.
_, err = f.Write([]byte(`
# Time: 2019-02-13T19:33:56.571953+08:00
`))
c.Assert(err, IsNil)
sql := "select * from "
for len(sql) < 5000 {
sql += "abcdefghijklmnopqrstuvwxyz_1234567890_qwertyuiopasdfghjklzxcvbnm"
}
sql += ";"
_, err = f.Write([]byte(sql))
c.Assert(err, IsNil)
c.Assert(f.Close(), IsNil)
re = tk.MustQuery("select query from information_schema.slow_query order by time desc limit 1")
rows := re.Rows()
c.Assert(rows[0][0], Equals, sql)
}

func (s *testTableSuite) TestForAnalyzeStatus(c *C) {
Expand Down
6 changes: 5 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,8 @@ const (
SlowLogCopWaitAddr = "Cop_wait_addr"
// SlowLogMemMax is the max number bytes of memory used in this statement.
SlowLogMemMax = "Mem_max"
// SlowLogSucc is used to indicate whether this sql execute successfully.
SlowLogSucc = "Succ"
)

// SlowLogFormat uses for formatting slow log.
Expand All @@ -1038,9 +1040,10 @@ const (
// # Cop_process: Avg_time: 1s P90_time: 2s Max_time: 3s Max_addr: 10.6.131.78
// # Cop_wait: Avg_time: 10ms P90_time: 20ms Max_time: 30ms Max_Addr: 10.6.131.79
// # Memory_max: 4096
// # Succ: true
// select * from t_slim;
func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDetail execdetails.ExecDetails, indexIDs string, digest string,
statsInfos map[string]uint64, copTasks *stmtctx.CopTasksDetails, memMax int64, sql string) string {
statsInfos map[string]uint64, copTasks *stmtctx.CopTasksDetails, memMax int64, succ bool, sql string) string {
var buf bytes.Buffer
execDetailStr := execDetail.String()
buf.WriteString(SlowLogRowPrefixStr + SlowLogTxnStartTSStr + SlowLogSpaceMarkStr + strconv.FormatUint(txnTS, 10) + "\n")
Expand Down Expand Up @@ -1100,6 +1103,7 @@ func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDe
if memMax > 0 {
buf.WriteString(SlowLogRowPrefixStr + SlowLogMemMax + SlowLogSpaceMarkStr + strconv.FormatInt(memMax, 10) + "\n")
}
buf.WriteString(SlowLogRowPrefixStr + SlowLogSucc + SlowLogSpaceMarkStr + strconv.FormatBool(succ) + "\n")
if len(sql) == 0 {
sql = ";"
}
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
# Cop_proc_avg: 1 Cop_proc_p90: 2 Cop_proc_max: 3 Cop_proc_addr: 10.6.131.78
# Cop_wait_avg: 0.01 Cop_wait_p90: 0.02 Cop_wait_max: 0.03 Cop_wait_addr: 10.6.131.79
# Mem_max: 2333
# Succ: true
select * from t;`
sql := "select * from t"
digest := parser.DigestHash(sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, copTasks, memMax, sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, copTasks, memMax, true, sql)
c.Assert(logString, Equals, resultString)
}