Skip to content

Commit

Permalink
Exporter collects Proxy information.
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaoshuai123 committed Dec 12, 2023
1 parent 7ecf162 commit 086ab75
Show file tree
Hide file tree
Showing 14 changed files with 464 additions and 54 deletions.
4 changes: 3 additions & 1 deletion codis/pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,8 @@ type Stats struct {
PrimaryOnly bool `json:"primary_only"`
} `json:"backend"`

Runtime *RuntimeStats `json:"runtime,omitempty"`
Runtime *RuntimeStats `json:"runtime,omitempty"`
TimeoutCmdNumber int64 `json:"timeout_cmd_number"`
}

type RuntimeStats struct {
Expand Down Expand Up @@ -667,5 +668,6 @@ func (p *Proxy) Stats(flags StatsFlags) *Stats {
stats.Runtime.NumCgoCall = runtime.NumCgoCall()
stats.Runtime.MemOffheap = unsafe2.OffheapBytes()
}
stats.TimeoutCmdNumber = TimeoutCmdNumberInSecond.Int64()
return stats
}
1 change: 1 addition & 0 deletions codis/pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
if duration >= s.config.SlowlogLogSlowerThan {
TimeoutCmdNumber.Incr()
//client -> proxy -> server -> porxy -> client
//Record the waiting time from receiving the request from the client to sending it to the backend server
//the waiting time from sending the request to the backend server to receiving the response from the server
Expand Down
10 changes: 10 additions & 0 deletions codis/pkg/proxy/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
"pika/codis/v2/pkg/utils/sync2/atomic2"
)

var (
TimeoutCmdNumber atomic2.Int64
TimeoutCmdNumberInSecond atomic2.Int64
)

type opStats struct {
opstr string
calls atomic2.Int64
Expand Down Expand Up @@ -62,6 +67,8 @@ var cmdstats struct {

func init() {
cmdstats.opmap = make(map[string]*opStats, 128)
TimeoutCmdNumber.Set(0)
TimeoutCmdNumberInSecond.Set(0)
go func() {
for {
start := time.Now()
Expand All @@ -70,6 +77,9 @@ func init() {
delta := cmdstats.total.Int64() - total
normalized := math.Max(0, float64(delta)) * float64(time.Second) / float64(time.Since(start))
cmdstats.qps.Set(int64(normalized + 0.5))

TimeoutCmdNumberInSecond.Swap(TimeoutCmdNumber.Int64())
TimeoutCmdNumber.Set(0)
}
}()
}
Expand Down
9 changes: 9 additions & 0 deletions tools/pika_exporter/discovery/codis_dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,19 @@ type RedisInfo struct {
Errors int `json:"errors"`
}

type CmdInfo struct {
Opstr string `json:"opstr"`
Calls int64 `json:"calls"`
Usecs_percall int64 `json:"usecs_percall"`
Fails int64 `json:"fails"`
}

type ProxyOpsInfo struct {
Total int `json:"total"`
Fails int `json:"fails"`
Redis RedisInfo `json:"redis"`
Qps int `json:"qps"`
Cmd []CmdInfo `json:"cmd"`
}

type RowInfo struct {
Expand Down Expand Up @@ -89,6 +97,7 @@ type RunTimeInfo struct {
}

type ProxyStats struct {
Online bool `json:"online"`
Ops ProxyOpsInfo `json:"ops"`
Rusage RusageInfo `json:"rusage"`
RunTime RunTimeInfo `json:"runtime"`
Expand Down
1 change: 1 addition & 0 deletions tools/pika_exporter/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func NewCodisDiscovery(url, password, alias string) (*codisDiscovery, error) {
func (d *codisDiscovery) GetInstances() []Instance {
return d.instances
}

func (d *codisDiscovery) GetInstancesProxy() []InstanceProxy {
return d.instanceProxy
}
Expand Down
14 changes: 14 additions & 0 deletions tools/pika_exporter/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,19 @@ func TestNewCodisDiscovery(t *testing.T) {
"password1",
"password2",
}
expectedAddrsForProxy := []string{
"1.2.3.4:1234",
"1.2.3.4:4321",
}

if len(discovery.instances) != len(expectedAddrs) {
t.Errorf("expected %d instances but got %d", len(expectedAddrs), len(discovery.instances))
}

if len(discovery.instanceProxy) != len(expectedAddrsForProxy) {
t.Errorf("expected %d instances but got %d", len(expectedAddrs), len(discovery.instances))
}

for i := range expectedAddrs {
if discovery.instances[i].Addr != expectedAddrs[i] {
t.Errorf("instance %d address: expected %s but got %s", i, expectedAddrs[i], discovery.instances[i].Addr)
Expand All @@ -57,4 +65,10 @@ func TestNewCodisDiscovery(t *testing.T) {
t.Errorf("instance %d password: expected %s but got %s", i, expectedPasswords[i], discovery.instances[i].Password)
}
}

for i := range expectedAddrsForProxy {
if expectedAddrsForProxy[i] != discovery.instanceProxy[i].Addr {
t.Errorf("instance %d address: expected %s but got %s", i, expectedAddrs[i], discovery.instances[i].Addr)
}
}
}
2 changes: 2 additions & 0 deletions tools/pika_exporter/discovery/mockCodisTopom.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@
{
"id": 2,
"token": "4337153123141c4df6e363d281",
"admin_addr": "1.2.3.4:1234",
"start_time": "2021-07-06 10:30:33.588330243 +0800 CST",
"datacenter": ""
},
{
"id": 3,
"token": "ef7e1ad5422ab8e28241410e856",
"admin_addr": "1.2.3.4:4321",
"start_time": "2021-07-06 10:34:21.305110128 +0800 CST",
"datacenter": ""
}
Expand Down
2 changes: 1 addition & 1 deletion tools/pika_exporter/exporter/future.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (f *future) Wait() map[futureKey]error {
}

type futureKeyForProxy struct {
addr, instance, ID, productName string
addr, ID, productName string
}

type futureForProxy struct {
Expand Down
4 changes: 2 additions & 2 deletions tools/pika_exporter/exporter/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ const (
LabelNameAlias = "alias"
LabelInstanceMode = "instance-mode"
LabelConsensusLevel = "consensus-level"
LabelInstance = "instance"
LabelID = "id"
LabelID = "proxy_id"
LabelProductName = "product_name"
LabelOpstr = "opstr"
)

type Describer interface {
Expand Down
99 changes: 84 additions & 15 deletions tools/pika_exporter/exporter/metrics/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ const (
)

type ParseOption struct {
Version *semver.Version
Extracts map[string]string
Info string
Version *semver.Version
Extracts map[string]string
ExtractsProxy map[string][]int64
Info string
}

type Parser interface {
Expand Down Expand Up @@ -242,9 +243,9 @@ func convertToFloat64(s string) float64 {
s = strings.ToLower(s)

switch s {
case "yes", "up", "online":
case "yes", "up", "online", "true":
return 1
case "no", "down", "offline", "null":
case "no", "down", "offline", "null", "false":
return 0
}

Expand Down Expand Up @@ -274,25 +275,93 @@ func convertTimeToUnix(ts string) (int64, error) {
return t.Unix(), nil
}

func StructToMap(obj interface{}) (map[string]string, error) {
type proxyParser struct{}

func (p *proxyParser) Parse(m MetricMeta, c Collector, opt ParseOption) {
m.Lookup(func(m MetaData) {
for opstr, v := range opt.ExtractsProxy {
metric := Metric{
MetaData: m,
LabelValues: make([]string, len(m.Labels)),
Value: defaultValue,
}

for i := 0; i < len(m.Labels)-1; i++ {
labelValue, ok := findInMap(m.Labels[i], opt.Extracts)
if !ok {
log.Debugf("normalParser::Parse not found label value. metricName:%s labelName:%s",
m.Name, m.Labels[i])
}

metric.LabelValues[i] = labelValue
}
metric.LabelValues[len(m.Labels)-1] = opstr

switch m.ValueName {
case "calls":
metric.Value = convertToFloat64(strconv.FormatInt(v[0], 10))
case "usecs_percall":
metric.Value = convertToFloat64(strconv.FormatInt(v[1], 10))
case "fails":
metric.Value = convertToFloat64(strconv.FormatInt(v[2], 10))
}

if err := c.Collect(metric); err != nil {
log.Errorf("proxyParser::Parse metric collect failed. metric:%#v err:%s",
m, m.ValueName)
}
}
})

}

func StructToMap(obj interface{}) (map[string]string, map[string][]int64, error) {
result := make(map[string]string)
cmdResult := make(map[string][]int64)
objValue := reflect.ValueOf(obj)
objType := objValue.Type()

data := make(map[string]string)

for i := 0; i < objValue.NumField(); i++ {
field := objValue.Field(i)
fieldName := objType.Field(i).Name
fieldType := objType.Field(i)
jsonName := fieldType.Tag.Get("json")
if jsonName == "" {
jsonName = fieldType.Name
}
value := field.Interface()

if field.Kind() == reflect.Struct {
innerData, _ := StructToMap(field.Interface())
for k, v := range innerData {
data[k] = v
subMap, subCmdMap, _ := StructToMap(value)
for k, v := range subMap {
result[strings.ToLower(jsonName+"_"+k)] = v
}
for k, v := range subCmdMap {
if v != nil {
for index := range v {
cmdResult[k] = append(cmdResult[k], v[index])
}
}
}
} else if field.Kind() == reflect.Slice && field.Len() > 0 {
for j := 0; j < field.Len(); j++ {
elemType := field.Index(j).Type()
elemValue := field.Index(j)
if elemType.Kind() == reflect.Struct {
var key string
for p := 0; p < elemValue.NumField(); p++ {
if p == 0 {
key = elemValue.Field(p).String()
} else {
cmdResult[key] = append(cmdResult[key], elemValue.Field(p).Int())
}
}
} else {
result[strings.ToLower(jsonName)] = fmt.Sprintf("%v", value)
}
}
} else {
data[fieldName] = fmt.Sprintf("%v", field.Interface())
result[strings.ToLower(jsonName)] = fmt.Sprintf("%v", value)
}
}

return data, nil
return result, cmdResult, nil
}
Loading

0 comments on commit 086ab75

Please sign in to comment.