Skip to content

Commit

Permalink
Stop polling metrics on websocket disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
deven96 committed Nov 2, 2022
1 parent 6f8fcda commit 16817d2
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 14 deletions.
12 changes: 9 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
)

type Client struct {
Socket *websocket.Conn
Send chan *SendMessage
Received chan *ReceiveMessage
Socket *websocket.Conn
Send chan *SendMessage
Received chan *ReceiveMessage
StopHostPolling chan bool
}

// Write to websocket
Expand All @@ -19,6 +20,9 @@ func (client *Client) Write() {
err = client.Socket.WriteJSON(msg)
if err != nil {
log.Error("Error inside client write ", err)
// most likely socket connection has been closed so
// just return
return
}
}
}
Expand All @@ -31,6 +35,8 @@ func (client *Client) Read() {
err := client.Socket.ReadJSON(&message)
if err != nil {
log.Errorf("While reading from client: %s", err)
client.StopHostPolling <- true
return
} else {
client.Received <- message
}
Expand Down
43 changes: 32 additions & 11 deletions client/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ type HostsController struct {
Drivers map[string]*driver.Driver
// ReadOnlyHosts : restrict pinging every other server except these
ReadOnlyHosts []string
Client chan *Client
Received chan *ReceiveMessage
// ClientConnected : shows that a client is connected
ClientConnected bool
Client chan *Client
Received chan *ReceiveMessage
StopPolling chan bool
}

func (hosts *HostsController) getDriver(address string) *driver.Driver {
Expand All @@ -48,6 +51,18 @@ func (hosts *HostsController) resetDriver(host config.Host) {
hosts.Drivers[host.Address] = &hostDriver
}

func (hosts *HostsController) clientConnected() bool {
hosts.mu.Lock()
defer hosts.mu.Unlock()
return hosts.ClientConnected
}

func (hosts *HostsController) setClientConnected(connected bool) {
hosts.mu.Lock()
defer hosts.mu.Unlock()
hosts.ClientConnected = connected
}

func (hosts *HostsController) setReadOnlyHost(hostlist config.HostList) {
hosts.mu.Lock()
defer hosts.mu.Unlock()
Expand Down Expand Up @@ -97,7 +112,7 @@ func (hosts *HostsController) sendMetric(host config.Host, client *Client) {
func (hosts *HostsController) Poll(client *Client) {
for {
for _, host := range hosts.Info.Hosts {
if config.Contains(hosts.ReadOnlyHosts, host) {
if config.Contains(hosts.ReadOnlyHosts, host) && hosts.clientConnected() {
go hosts.sendMetric(host, client)
}
}
Expand All @@ -117,6 +132,8 @@ func (hosts *HostsController) Run() {
} else {
hosts.setReadOnlyHost([]string{received.FilterBy})
}
case poll := <-hosts.StopPolling:
hosts.setClientConnected(!poll)
}
}

Expand All @@ -129,11 +146,13 @@ func (hosts *HostsController) ServeHTTP(w http.ResponseWriter, req *http.Request
return
}
client := &Client{
Socket: socket,
Send: make(chan *SendMessage, messageBufferSize),
Received: hosts.Received,
Socket: socket,
Send: make(chan *SendMessage, messageBufferSize),
Received: hosts.Received,
StopHostPolling: hosts.StopPolling,
}
hosts.Client <- client
hosts.StopPolling <- false
go client.Write()
client.Read()
}
Expand All @@ -142,11 +161,13 @@ func (hosts *HostsController) ServeHTTP(w http.ResponseWriter, req *http.Request
func NewHostsController(cfg *config.Config) *HostsController {
dashboardInfo := config.GetDashboardInfoConfig(cfg)
hosts := &HostsController{
Info: dashboardInfo,
Drivers: make(map[string]*driver.Driver),
ReadOnlyHosts: dashboardInfo.GetAllHostAddresses(),
Client: make(chan *Client),
Received: make(chan *ReceiveMessage),
Info: dashboardInfo,
Drivers: make(map[string]*driver.Driver),
ReadOnlyHosts: dashboardInfo.GetAllHostAddresses(),
Client: make(chan *Client),
Received: make(chan *ReceiveMessage),
StopPolling: make(chan bool),
ClientConnected: true,
}
return hosts
}

0 comments on commit 16817d2

Please sign in to comment.