Skip to content

Commit

Permalink
proxy: stats will record error response (including rewritten internel…
Browse files Browse the repository at this point in the history
… errors)
  • Loading branch information
spinlock committed Dec 8, 2016
1 parent 365c911 commit 42d4014
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 31 deletions.
6 changes: 5 additions & 1 deletion cmd/fe/assets/dashboard-fe.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ function processProxyStats(codis_stats) {
} else {
if (s.stats.online) {
p.sessions = "total=" + s.stats.sessions.total + ",alive=" + s.stats.sessions.alive;
p.commands = "total=" + s.stats.ops.total + ",fails=" + s.stats.ops.fails + ",qps=" + s.stats.ops.qps;
p.commands = "total=" + s.stats.ops.total + ",fails=" + s.stats.ops.fails;
if (s.stats.ops.redis != undefined) {
p.commands += ",errors=" + s.stats.ops.redis.errors;
}
p.commands += ",qps=" + s.stats.ops.qps;
p.status = "HEALTHY";
} else {
p.status = "PENDING";
Expand Down
1 change: 1 addition & 0 deletions pkg/proxy/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (p *Proxy) startMetricsInfluxdb() {
fields := map[string]interface{}{
"ops_total": stats.Ops.Total,
"ops_fails": stats.Ops.Fails,
"ops_redis_errors": stats.Ops.Redis.Errors,
"ops_qps": stats.Ops.QPS,
"sessions_total": stats.Sessions.Total,
"sessions_alive": stats.Sessions.Alive,
Expand Down
12 changes: 8 additions & 4 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,13 @@ type Stats struct {
} `json:"sentinels"`

Ops struct {
Total int64 `json:"total"`
Fails int64 `json:"fails"`
QPS int64 `json:"qps"`
Cmd []*OpStats `json:"cmd,omitempty"`
Total int64 `json:"total"`
Fails int64 `json:"fails"`
Redis struct {
Errors int64 `json:"errors"`
} `json:"redis"`
QPS int64 `json:"qps"`
Cmd []*OpStats `json:"cmd,omitempty"`
} `json:"ops"`

Sessions struct {
Expand Down Expand Up @@ -553,6 +556,7 @@ func (s *Proxy) Stats(flags StatsFlags) *Stats {

stats.Ops.Total = OpTotal()
stats.Ops.Fails = OpFails()
stats.Ops.Redis.Errors = OpRedisErrors()
stats.Ops.QPS = OpQPS()

if flags.HasBit(StatsCmds) {
Expand Down
44 changes: 27 additions & 17 deletions pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ func (s *Session) loopReader(tasks chan<- *Request, d *Router) (err error) {
func (s *Session) loopWriter(tasks <-chan *Request) (err error) {
defer func() {
s.CloseWithError(err)
for _ = range tasks {
s.incrOpFails(nil)
for r := range tasks {
s.incrOpFails(r, nil)
}
s.flushOpStats(true)
}()
Expand All @@ -197,16 +197,16 @@ func (s *Session) loopWriter(tasks <-chan *Request) (err error) {
resp = redis.NewErrorf("ERR handle response, %s", err)
if sensitive {
s.Conn.Encode(resp, true)
return s.incrOpFails(err)
return s.incrOpFails(r, err)
}
}
if err := p.Encode(resp); err != nil {
return s.incrOpFails(err)
return s.incrOpFails(r, err)
}
if err := p.Flush(len(tasks) == 0); err != nil {
return s.incrOpFails(err)
return s.incrOpFails(r, err)
} else {
s.incrOpStats(r)
s.incrOpStats(r, resp.Type)
}
if len(tasks) == 0 {
s.flushOpStats(false)
Expand Down Expand Up @@ -570,19 +570,29 @@ func (s *Session) incrOpTotal() {
s.stats.total.Incr()
}

func (s *Session) incrOpFails(err error) error {
incrOpFails()
return err
}

func (s *Session) incrOpStats(r *Request) {
e := s.stats.opmap[r.OpStr]
func (s *Session) getOpStats(opstr string) *opStats {
e := s.stats.opmap[opstr]
if e == nil {
e = &opStats{opstr: r.OpStr}
s.stats.opmap[r.OpStr] = e
e = &opStats{opstr: opstr}
s.stats.opmap[opstr] = e
}
return e
}

func (s *Session) incrOpStats(r *Request, t redis.RespType) {
e := s.getOpStats(r.OpStr)
e.calls.Incr()
e.nsecs.Add(time.Now().UnixNano() - r.Start)
switch t {
case redis.TypeError:
e.redis.errors.Incr()
}
}

func (s *Session) incrOpFails(r *Request, err error) error {
e := s.getOpStats(r.OpStr)
e.fails.Incr()
return err
}

func (s *Session) flushOpStats(force bool) {
Expand All @@ -597,8 +607,8 @@ func (s *Session) flushOpStats(force bool) {

incrOpTotal(s.stats.total.Swap(0))
for _, e := range s.stats.opmap {
if n := e.calls.Swap(0); n != 0 {
incrOpStats(e.opstr, n, e.nsecs.Swap(0))
if e.calls.Get() != 0 || e.fails.Get() != 0 {
incrOpStats(e)
}
}
s.stats.flush.n++
Expand Down
38 changes: 29 additions & 9 deletions pkg/proxy/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@ type opStats struct {
opstr string
calls atomic2.Int64
nsecs atomic2.Int64
fails atomic2.Int64
redis struct {
errors atomic2.Int64
}
}

func (s *opStats) OpStats() *OpStats {
o := &OpStats{
OpStr: s.opstr,
Calls: s.calls.Get(),
Usecs: s.nsecs.Get() / 1e3,
Fails: s.fails.Get(),
}
if o.Calls != 0 {
o.UsecsPercall = o.Usecs / o.Calls
}
o.RedisErrType = s.redis.errors.Get()
return o
}

Expand All @@ -36,14 +42,19 @@ type OpStats struct {
Calls int64 `json:"calls"`
Usecs int64 `json:"usecs"`
UsecsPercall int64 `json:"usecs_percall"`
Fails int64 `json:"fails"`
RedisErrType int64 `json:"redis_errtype"`
}

var cmdstats struct {
sync.RWMutex

opmap map[string]*opStats
total atomic2.Int64
fails atomic2.Int64
opmap map[string]*opStats
redis struct {
errors atomic2.Int64
}

qps atomic2.Int64
}
Expand All @@ -70,6 +81,10 @@ func OpFails() int64 {
return cmdstats.fails.Get()
}

func OpRedisErrors() int64 {
return cmdstats.redis.errors.Get()
}

func OpQPS() int64 {
return cmdstats.qps.Get()
}
Expand Down Expand Up @@ -125,21 +140,26 @@ func ResetStats() {

cmdstats.total.Set(0)
cmdstats.fails.Set(0)
cmdstats.redis.errors.Set(0)
sessions.total.Set(sessions.alive.Get())
}

func incrOpTotal(n int64) {
cmdstats.total.Add(n)
}

func incrOpFails() {
cmdstats.fails.Incr()
}

func incrOpStats(opstr string, calls int64, nsecs int64) {
s := getOpStats(opstr, true)
s.calls.Add(calls)
s.nsecs.Add(nsecs)
func incrOpStats(e *opStats) {
s := getOpStats(e.opstr, true)
s.calls.Add(e.calls.Swap(0))
s.nsecs.Add(e.nsecs.Swap(0))
if n := e.fails.Swap(0); n != 0 {
s.fails.Add(n)
cmdstats.fails.Add(n)
}
if n := e.redis.errors.Swap(0); n != 0 {
s.redis.errors.Add(n)
cmdstats.redis.errors.Add(n)
}
}

var sessions struct {
Expand Down

0 comments on commit 42d4014

Please sign in to comment.