Skip to content

Commit

Permalink
heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
yeabow committed Mar 15, 2021
1 parent ebf7782 commit d97db86
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 26 deletions.
14 changes: 7 additions & 7 deletions cat/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
)

type serverAddress struct {
host string
port int
httpPort int
Host string `json:"host"`
Port int `json:"port"`
HttpPort int `json:"http_port"`
}

func resolveServerAddresses(router string) (addresses []serverAddress) {
Expand All @@ -26,8 +26,8 @@ func resolveServerAddresses(router string) (addresses []serverAddress) {
logger.Warning("%s isn't a valid server address.", segment)
} else {
addresses = append(addresses, serverAddress{
host: fragments[0],
port: port,
Host: fragments[0],
Port: port,
})
}
}
Expand All @@ -38,8 +38,8 @@ func compareServerAddress(a, b *serverAddress) bool {
if a == nil || b == nil {
return false
}
if strings.Compare(a.host, b.host) == 0 {
return a.port == b.port
if strings.Compare(a.Host, b.Host) == 0 {
return a.Port == b.Port
} else {
return false
}
Expand Down
11 changes: 7 additions & 4 deletions cat/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cat

import (
"encoding/json"
"encoding/xml"
"io/ioutil"
"net"
Expand Down Expand Up @@ -114,13 +115,15 @@ func loadXmlConfig(c XMLConfig) (err error) {

for _, x := range c.Servers.Servers {
config.serverAddress = append(config.serverAddress, serverAddress{
host: x.Host,
port: x.Port,
httpPort: x.HttpPort,
Host: x.Host,
Port: x.Port,
HttpPort: x.HttpPort,
})
}

logger.Info("Server addresses: %s", config.serverAddress)
json, _ := json.Marshal(config.serverAddress)

logger.Info("Server addresses: %s", string(json))

return err
}
Expand Down
2 changes: 2 additions & 0 deletions cat/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func (p *catMessageManager) flush(m message.Messager) {
} else {
aggregator.transaction.Put(m)
}
case *message.Heartbeat:
sender.handleHeartbeat(m)
case *message.Event:
if m.Status != SUCCESS {
sender.handleEvent(m)
Expand Down
4 changes: 2 additions & 2 deletions cat/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (m *catMonitor) buildXml() *bytes.Buffer {

buf := bytes.NewBuffer([]byte{})
encoder := xml.NewEncoder(buf)
encoder.Indent("", "\t")
//encoder.Indent("", "\t")

if err := encoder.Encode(status); err != nil {
buf.Reset()
Expand All @@ -106,7 +106,7 @@ func (m *catMonitor) collectAndSend() {
var trans = message.NewTransaction(typeSystem, "Status", manager.flush)
defer trans.Complete()

trans.LogEvent("Cat_golang_Client_Version", GoCatVersion)
//trans.LogEvent("Cat_golang_Client_Version", GoCatVersion)

// NOTE type & name is useless while sending a heartbeat
heartbeat := message.NewHeartbeat("Heartbeat", config.ip, nil)
Expand Down
42 changes: 29 additions & 13 deletions cat/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cat
import (
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (c *catRouterConfig) updateRouterConfig() {
}

for _, server := range config.serverAddress {
u.Host = fmt.Sprintf("%s:%d", server.host, server.httpPort)
u.Host = fmt.Sprintf("%s:%d", server.Host, server.HttpPort)
logger.Info("Getting router config from %s", u.String())

resp, err := client.Get(u.String())
Expand All @@ -76,8 +77,12 @@ func (c *catRouterConfig) updateRouterConfig() {
continue
}

c.parse(resp.Body)
return
err = c.parse(resp.Body)
if err == nil {
return
} else {
continue
}
}

logger.Error("Can't get router config from remote server.")
Expand Down Expand Up @@ -113,14 +118,16 @@ func (c *catRouterConfig) process() {
}
}

func (c *catRouterConfig) updateSample(v string) {
func (c *catRouterConfig) updateSample(v string) error {
sample, err := strconv.ParseFloat(v, 32)
if err != nil {
logger.Warning("Sample should be a valid float, %s given", v)
return err
} else if math.Abs(sample-c.sample) > 1e-9 {
c.sample = sample
logger.Info("Sample rate has been set to %f%%", c.sample*100)
}
return nil
}

func (c *catRouterConfig) updateBlock(v string) {
Expand All @@ -131,10 +138,10 @@ func (c *catRouterConfig) updateBlock(v string) {
}
}

func (c *catRouterConfig) parse(reader io.ReadCloser) {
func (c *catRouterConfig) parse(reader io.ReadCloser) error {
bytes, err := ioutil.ReadAll(reader)
if err != nil {
return
return err
}

t := new(routerConfigJson)
Expand All @@ -145,22 +152,29 @@ func (c *catRouterConfig) parse(reader io.ReadCloser) {
for k, v := range t.Kvs {
switch k {
case propertySample:
c.updateSample(v)
err = c.updateSample(v)
if err != nil {
return err
}
case propertyRouters:
c.updateRouters(v)
err = c.updateRouters(v)
if err != nil {
return err
}
case propertyBlock:
c.updateBlock(v)
}
}
return nil
}

func (c *catRouterConfig) updateRouters(router string) {
func (c *catRouterConfig) updateRouters(router string) error {
newRouters := resolveServerAddresses(router)

oldLen, newLen := len(c.routers), len(newRouters)

if newLen == 0 {
return
return errors.New("Routers not found")
} else if oldLen == 0 {
logger.Info("Routers has been initialized to: %s", newRouters)
c.routers = newRouters
Expand All @@ -179,19 +193,21 @@ func (c *catRouterConfig) updateRouters(router string) {

for _, server := range newRouters {
if compareServerAddress(c.current, &server) {
return
return nil
}

addr := fmt.Sprintf("%s:%d", server.host, server.port)
addr := fmt.Sprintf("%s:%d", server.Host, server.Port)
if conn, err := net.DialTimeout("tcp", addr, time.Second); err != nil {
logger.Info("Failed to connect to %s, retrying...", addr)
return errors.New("Failed to connect to " + addr)
} else {
c.current = &server
logger.Info("Connected to %s.", addr)
sender.chConn <- conn
return
return nil
}
}

logger.Info("Cannot established a connection to cat server.")
return nil
}
8 changes: 8 additions & 0 deletions cat/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ func (s *catMessageSender) handleTransaction(trans *message.Transaction) {
}
}

func (s *catMessageSender) handleHeartbeat(heartbeat *message.Heartbeat) {
select {
case s.normal <- heartbeat:
default:
// logger.Warning("Normal priority channel is full, event has been discarded.")
}
}

func (s *catMessageSender) handleEvent(event *message.Event) {
select {
case s.normal <- event:
Expand Down

0 comments on commit d97db86

Please sign in to comment.