Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Exporter for Proxy #2199

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion codis/pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,8 @@ type Stats struct {
PrimaryOnly bool `json:"primary_only"`
} `json:"backend"`

Runtime *RuntimeStats `json:"runtime,omitempty"`
Runtime *RuntimeStats `json:"runtime,omitempty"`
SlowCmdCount int64 `json:"slow_cmd_count"` // Cumulative count of slow log
}

type RuntimeStats struct {
Expand Down Expand Up @@ -667,5 +668,6 @@ func (p *Proxy) Stats(flags StatsFlags) *Stats {
stats.Runtime.NumCgoCall = runtime.NumCgoCall()
stats.Runtime.MemOffheap = unsafe2.OffheapBytes()
}
stats.SlowCmdCount = SlowCmdCount.Int64()
return stats
}
13 changes: 11 additions & 2 deletions codis/pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,14 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
} else {
s.incrOpStats(r, resp.Type)
}
nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
s.updateMaxDelay(duration, r)
if fflush {
s.flushOpStats(false)
}
nowTime := time.Now().UnixNano()
duration := int64((nowTime - r.ReceiveTime) / 1e3)
if duration >= s.config.SlowlogLogSlowerThan {
SlowCmdCount.Incr() // Atomic global variable, increment by 1 when slow log occurs.
//client -> proxy -> server -> porxy -> client
//Record the waiting time from receiving the request from the client to sending it to the backend server
//the waiting time from sending the request to the backend server to receiving the response from the server
Expand Down Expand Up @@ -758,3 +760,10 @@ func (s *Session) handlePConfig(r *Request) error {
}
return nil
}

func (s *Session) updateMaxDelay(duration int64, r *Request) {
e := s.getOpStats(r.OpStr) // There is no race condition in the session
if duration > e.maxDelay.Int64() {
e.maxDelay.Set(duration)
}
}
40 changes: 36 additions & 4 deletions codis/pkg/proxy/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"pika/codis/v2/pkg/utils/sync2/atomic2"
)

var SlowCmdCount atomic2.Int64 // Cumulative count of slow log

type opStats struct {
opstr string
calls atomic2.Int64
Expand All @@ -22,14 +24,16 @@ type opStats struct {
redis struct {
errors atomic2.Int64
}
maxDelay atomic2.Int64
}

func (s *opStats) OpStats() *OpStats {
o := &OpStats{
OpStr: s.opstr,
Calls: s.calls.Int64(),
Usecs: s.nsecs.Int64() / 1e3,
Fails: s.fails.Int64(),
OpStr: s.opstr,
Calls: s.calls.Int64(),
Usecs: s.nsecs.Int64() / 1e3,
Fails: s.fails.Int64(),
MaxDelay: s.maxDelay.Int64(),
}
if o.Calls != 0 {
o.UsecsPercall = o.Usecs / o.Calls
Expand All @@ -45,6 +49,7 @@ type OpStats struct {
UsecsPercall int64 `json:"usecs_percall"`
Fails int64 `json:"fails"`
RedisErrType int64 `json:"redis_errtype"`
MaxDelay int64 `json:"max_delay"`
}

var cmdstats struct {
Expand All @@ -62,6 +67,7 @@ var cmdstats struct {

func init() {
cmdstats.opmap = make(map[string]*opStats, 128)
SlowCmdCount.Set(0)
go func() {
for {
start := time.Now()
Expand All @@ -72,6 +78,16 @@ func init() {
cmdstats.qps.Set(int64(normalized + 0.5))
}
}()

// Clear the accumulated maximum delay to 0 every 15 seconds.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这边辛苦小帅调整下,改成动态可配置重置时间,会更好些。避免信息缺失

go func() {
for {
time.Sleep(15 * time.Second)
for _, s := range cmdstats.opmap {
s.maxDelay.Set(0)
}
}
}()
}

func OpTotal() int64 {
Expand Down Expand Up @@ -165,6 +181,22 @@ func incrOpStats(e *opStats) {
s.redis.errors.Add(n)
cmdstats.redis.errors.Add(n)
}

/**
Each session refreshes its own saved metrics, and there is a race condition at this time.
Use the CAS method to update.
*/
for {
oldValue := s.maxDelay
if e.maxDelay > oldValue {
if s.maxDelay.CompareAndSwap(oldValue.Int64(), e.maxDelay.Int64()) {
e.maxDelay.Set(0)
break
}
} else {
break
}
}
}

var sessions struct {
Expand Down
2 changes: 2 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ class PikaServer : public pstd::noncopyable {
uint32_t SlowlogLen();
void SlowlogObtain(int64_t number, std::vector<SlowlogEntry>* slowlogs);
void SlowlogPushEntry(const PikaCmdArgsType& argv, int64_t time, int64_t duration);
uint64_t SlowlogCount();

/*
* Statistic used
Expand Down Expand Up @@ -680,6 +681,7 @@ class PikaServer : public pstd::noncopyable {
* Slowlog used
*/
uint64_t slowlog_entry_id_ = 0;
uint64_t slowlog_counter_ = 0;
std::shared_mutex slowlog_protector_;
std::list<SlowlogEntry> slowlog_list_;

Expand Down
2 changes: 1 addition & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ void InfoCmd::InfoStats(std::string& info) {
tmp_stream << "is_slots_migrating:" << (is_migrating ? "Yes, " : "No, ") << start_migration_time_str << ", "
<< (is_migrating ? (current_time_s - start_migration_time) : (end_migration_time - start_migration_time))
<< "\r\n";

tmp_stream << "slow_logs_count:" << g_pika_server->SlowlogCount() << "\r\n";
info.append(tmp_stream.str());
}

Expand Down
6 changes: 6 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1198,11 +1198,17 @@ void PikaServer::SlowlogPushEntry(const PikaCmdArgsType& argv, int64_t time, int
entry.start_time = time;
entry.duration = duration;
slowlog_list_.push_front(entry);
slowlog_counter_++;
}

SlowlogTrim();
}

uint64_t PikaServer::SlowlogCount() {
std::shared_lock l(slowlog_protector_);
return slowlog_counter_;
}

void PikaServer::ResetStat() {
statistic_.server_stat.accumulative_connections.store(0);
statistic_.server_stat.qps.querynum.store(0);
Expand Down
77 changes: 77 additions & 0 deletions tools/pika_exporter/discovery/codis_dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,91 @@ type CodisModelInfo struct {
Servers []CodisServerInfo `json:"servers"`
}

type CodisProxyModelInfo struct {
Id int `json:"id"`
AdminAddr string `json:"admin_addr"`
ProductName string `json:"product_name"`
DataCenter string `json:"data_center"`
}

type CodisGroupInfo struct {
Models []CodisModelInfo `json:"models"`
}

type CodisProxyInfo struct {
Models []CodisProxyModelInfo `json:"models"`
}

type CodisStatsInfo struct {
Group CodisGroupInfo `json:"group"`
Proxy CodisProxyInfo `json:"proxy"`
}

type CodisTopomInfo struct {
Stats CodisStatsInfo `json:"stats"`
}

type RedisInfo struct {
Errors int `json:"errors"`
}

type CmdInfo struct {
Opstr string `json:"opstr"`
Calls int64 `json:"calls"`
Usecs_percall int64 `json:"usecs_percall"`
Fails int64 `json:"fails"`
MaxDelay int64 `json:"max_delay"`
}

type ProxyOpsInfo struct {
Total int `json:"total"`
Fails int `json:"fails"`
Redis RedisInfo `json:"redis"`
Qps int `json:"qps"`
Cmd []CmdInfo `json:"cmd"`
}

type RowInfo struct {
Utime int64 `json:"utime"`
Stime int64 `json:"stime"`
MaxRss int64 `json:"max_rss"`
IxRss int64 `json:"ix_rss"`
IdRss int64 `json:"id_rss"`
IsRss int64 `json:"is_rss"`
}

type RusageInfo struct {
Now string `json:"now"`
Cpu float64 `json:"cpu"`
Mem float64 `json:"mem"`
Raw RowInfo `json:"raw"`
}

type GeneralInfo struct {
Alloc int64 `json:"alloc"`
Sys int64 `json:"sys"`
Lookups int64 `json:"lookups"`
Mallocs int64 `json:"mallocs"`
Frees int64 `json:"frees"`
}

type HeapInfo struct {
Alloc int64 `json:"alloc"`
Sys int64 `json:"sys"`
Idle int64 `json:"idle"`
Inuse int64 `json:"inuse"`
Objects int64 `json:"objects"`
}

type RunTimeInfo struct {
General GeneralInfo `json:"general"`
Heap HeapInfo `json:"heap"`
}

type ProxyStats struct {
Online bool `json:"online"`
Ops ProxyOpsInfo `json:"ops"`
Rusage RusageInfo `json:"rusage"`
RunTime RunTimeInfo `json:"runtime"`
SlowCmdCount int64 `json:"slow_cmd_count"`
}
49 changes: 45 additions & 4 deletions tools/pika_exporter/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@ type Instance struct {
Alias string
}

type InstanceProxy struct {
ID int
Addr string
ProductName string
}

type Discovery interface {
GetInstances() []Instance
GetInstancesProxy() []InstanceProxy
CheckUpdate(chan int, string)
}

Expand Down Expand Up @@ -58,6 +65,10 @@ func (d *cmdArgsDiscovery) GetInstances() []Instance {
return d.instances
}

func (d *cmdArgsDiscovery) GetInstancesProxy() []InstanceProxy {
return nil
}

func (d *cmdArgsDiscovery) CheckUpdate(chan int, string) {}

type fileDiscovery struct {
Expand Down Expand Up @@ -107,10 +118,15 @@ func (d *fileDiscovery) GetInstances() []Instance {
return d.instances
}

func (d *fileDiscovery) GetInstancesProxy() []InstanceProxy {
return nil
}

func (d *fileDiscovery) CheckUpdate(chan int, string) {}

type codisDiscovery struct {
instances []Instance
instances []Instance
instanceProxy []InstanceProxy
}

func NewCodisDiscovery(url, password, alias string) (*codisDiscovery, error) {
Expand Down Expand Up @@ -155,13 +171,29 @@ func NewCodisDiscovery(url, password, alias string) (*codisDiscovery, error) {
Alias: aliases[i],
}
}
return &codisDiscovery{instances: instances}, nil

instancesproxy := make([]InstanceProxy, len(result.Stats.Proxy.Models))
for i := range result.Stats.Proxy.Models {
instancesproxy[i] = InstanceProxy{
ID: result.Stats.Proxy.Models[i].Id,
Addr: result.Stats.Proxy.Models[i].AdminAddr,
ProductName: result.Stats.Proxy.Models[i].ProductName,
}
}
return &codisDiscovery{
instances: instances,
instanceProxy: instancesproxy,
}, nil
}

func (d *codisDiscovery) GetInstances() []Instance {
return d.instances
}

func (d *codisDiscovery) GetInstancesProxy() []InstanceProxy {
dingxiaoshuai123 marked this conversation as resolved.
Show resolved Hide resolved
return d.instanceProxy
}

func (d *codisDiscovery) CheckUpdate(updatechan chan int, codisaddr string) {
newdis, err := NewCodisDiscovery(codisaddr, "", "")
if err != nil {
Expand All @@ -174,7 +206,7 @@ func (d *codisDiscovery) CheckUpdate(updatechan chan int, codisaddr string) {
}

func (d *codisDiscovery) comparedis(new_instance *codisDiscovery) bool {
var addrs []string
var addrs, addrsProxy []string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var (
addrs []string
addrsProxy []string
diff bool
)

var diff bool = false
for _, instance := range new_instance.instances {
addrs = append(addrs, instance.Addr)
Expand All @@ -185,7 +217,16 @@ func (d *codisDiscovery) comparedis(new_instance *codisDiscovery) bool {
return false
}
}
if !diff && len(new_instance.instances) == len(d.instances) {
for _, instance := range new_instance.instanceProxy {
addrsProxy = append(addrsProxy, instance.Addr)
}
for _, instance := range d.instanceProxy {
if !contains(instance.Addr, addrsProxy) {
diff = true
return false
}
}
if !diff && len(new_instance.instances) == len(d.instances) && len(new_instance.instanceProxy) == len(d.instanceProxy) {
return true
}
return false
Expand Down
14 changes: 14 additions & 0 deletions tools/pika_exporter/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,19 @@ func TestNewCodisDiscovery(t *testing.T) {
"password1",
"password2",
}
expectedAddrsForProxy := []string{
"1.2.3.4:1234",
"1.2.3.4:4321",
}

if len(discovery.instances) != len(expectedAddrs) {
t.Errorf("expected %d instances but got %d", len(expectedAddrs), len(discovery.instances))
}

if len(discovery.instanceProxy) != len(expectedAddrsForProxy) {
t.Errorf("expected %d instances but got %d", len(expectedAddrs), len(discovery.instances))
}

for i := range expectedAddrs {
if discovery.instances[i].Addr != expectedAddrs[i] {
t.Errorf("instance %d address: expected %s but got %s", i, expectedAddrs[i], discovery.instances[i].Addr)
Expand All @@ -57,4 +65,10 @@ func TestNewCodisDiscovery(t *testing.T) {
t.Errorf("instance %d password: expected %s but got %s", i, expectedPasswords[i], discovery.instances[i].Password)
}
}

for i := range expectedAddrsForProxy {
if expectedAddrsForProxy[i] != discovery.instanceProxy[i].Addr {
t.Errorf("instance %d address: expected %s but got %s", i, expectedAddrs[i], discovery.instances[i].Addr)
}
}
}
Loading
Loading