diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..e42d0a4 --- /dev/null +++ b/client/client.go @@ -0,0 +1,39 @@ +package client + +import ( + "github.com/gorilla/websocket" + log "github.com/sirupsen/logrus" +) + +type Client struct { + Socket *websocket.Conn + Send chan *SendMessage + Received chan *ReceiveMessage +} + +// Write to websocket +func (client *Client) Write() { + defer client.Socket.Close() + var err error + for msg := range client.Send { + err = client.Socket.WriteJSON(msg) + if err != nil { + log.Error("Error inside client write ", err) + } + } +} + +// Read from websocket +func (client *Client) Read() { + defer client.Socket.Close() + for { + var message *ReceiveMessage + err := client.Socket.ReadJSON(&message) + if err != nil { + log.Errorf("While reading from client: %s", err) + } else { + client.Received <- message + } + + } +} diff --git a/client/const.go b/client/const.go new file mode 100644 index 0000000..1b54577 --- /dev/null +++ b/client/const.go @@ -0,0 +1,6 @@ +package client + +const ( + socketBufferSize = 1042 + messageBufferSize = 256 +) diff --git a/client/controller.go b/client/controller.go new file mode 100644 index 0000000..3f8ba6d --- /dev/null +++ b/client/controller.go @@ -0,0 +1,152 @@ +package client + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" + log "github.com/sirupsen/logrus" + + "github.com/bisohns/saido/config" + "github.com/bisohns/saido/driver" + "github.com/bisohns/saido/inspector" +) + +var upgrader = &websocket.Upgrader{ + ReadBufferSize: socketBufferSize, + WriteBufferSize: socketBufferSize, + CheckOrigin: func(r *http.Request) bool { + return true + }} + +type HostsController struct { + Info *config.DashboardInfo + // Connections : hostname mapped to connection instances to reuse + // across metrics + mu sync.Mutex + Drivers map[string]*driver.Driver + // ReadOnlyHosts : restrict pinging every other server except these + ReadOnlyHosts []string + Client chan *Client + Received chan *ReceiveMessage +} + +func (hosts *HostsController) getDriver(address string) *driver.Driver { + hosts.mu.Lock() + defer hosts.mu.Unlock() + return hosts.Drivers[address] +} + +func (hosts *HostsController) resetDriver(host config.Host) { + hosts.mu.Lock() + defer hosts.mu.Unlock() + hostDriver := host.Connection.ToDriver() + hosts.Drivers[host.Address] = &hostDriver +} + +func (hosts *HostsController) setReadOnlyHost(hostlist config.HostList) { + hosts.mu.Lock() + defer hosts.mu.Unlock() + hosts.ReadOnlyHosts = hostlist +} + +func (hosts *HostsController) sendMetric(host config.Host, client *Client) { + if hosts.getDriver(host.Address) == nil { + hosts.resetDriver(host) + } + for _, metric := range hosts.Info.Metrics { + driver := hosts.getDriver(host.Address) + initializedMetric, err := inspector.Init(metric, driver) + data, err := initializedMetric.Execute() + if err == nil { + var unmarsh interface{} + json.Unmarshal(data, &unmarsh) + message := &SendMessage{ + Message: Message{ + Host: host.Address, + Platform: (*driver).GetDetails().Name, + Name: metric, + Data: unmarsh, + }, + Error: false, + } + client.Send <- message + } else { + // check for error 127 which means command was not found + var errorContent string + if !strings.Contains(fmt.Sprintf("%s", err), "127") { + errorContent = fmt.Sprintf("Could not retrieve metric %s from driver %s with error %s, resetting connection...", metric, host.Address, err) + } else { + errorContent = fmt.Sprintf("Command %s not found on driver %s", metric, host.Address) + } + log.Error(errorContent) + hosts.resetDriver(host) + message := &SendMessage{ + Message: errorContent, + Error: true, + } + client.Send <- message + } + } +} + +func (hosts *HostsController) Poll(client *Client) { + for { + for _, host := range hosts.Info.Hosts { + if config.Contains(hosts.ReadOnlyHosts, host) { + go hosts.sendMetric(host, client) + } + } + log.Infof("Delaying for %d seconds", hosts.Info.PollInterval) + time.Sleep(time.Duration(hosts.Info.PollInterval) * time.Second) + } +} + +func (hosts *HostsController) Run() { + for { + select { + case client := <-hosts.Client: + go hosts.Poll(client) + case received := <-hosts.Received: + if received.FilterBy == "" { + hosts.setReadOnlyHost(hosts.Info.GetAllHostAddresses()) + } else { + hosts.setReadOnlyHost([]string{received.FilterBy}) + } + } + } + +} + +func (hosts *HostsController) ServeHTTP(w http.ResponseWriter, req *http.Request) { + socket, err := upgrader.Upgrade(w, req, nil) + if err != nil { + log.Fatal(err) + return + } + client := &Client{ + Socket: socket, + Send: make(chan *SendMessage, messageBufferSize), + Received: hosts.Received, + } + hosts.Client <- client + go client.Write() + client.Read() +} + +// NewHostsController : initialze host controller with config file +func NewHostsController(cfg *config.Config) *HostsController { + dashboardInfo := config.GetDashboardInfoConfig(cfg) + hosts := &HostsController{ + Info: dashboardInfo, + Drivers: make(map[string]*driver.Driver), + ReadOnlyHosts: dashboardInfo.GetAllHostAddresses(), + Client: make(chan *Client), + Received: make(chan *ReceiveMessage), + } + return hosts +} diff --git a/client/messages.go b/client/messages.go new file mode 100644 index 0000000..a5d423c --- /dev/null +++ b/client/messages.go @@ -0,0 +1,18 @@ +package client + +type SendMessage struct { + Error bool + Message interface{} +} + +type Message struct { + Host string + Name string + Platform string + Data interface{} +} + +// ReceiveMessage : specify the host to filter by +type ReceiveMessage struct { + FilterBy string +} diff --git a/cmd/api.go b/cmd/api.go index cd21898..59da2e2 100644 --- a/cmd/api.go +++ b/cmd/api.go @@ -16,181 +16,27 @@ limitations under the License. package cmd import ( - "encoding/json" - "fmt" "net/http" "os" "strconv" - "strings" - "sync" - "time" - "github.com/bisohns/saido/config" - "github.com/bisohns/saido/driver" - "github.com/bisohns/saido/inspector" + "github.com/bisohns/saido/client" "github.com/gorilla/handlers" - "github.com/gorilla/websocket" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) -const ( - socketBufferSize = 1042 - messageBufferSize = 256 -) - var ( - port string - server = http.NewServeMux() - upgrader = &websocket.Upgrader{ - ReadBufferSize: socketBufferSize, - WriteBufferSize: socketBufferSize, - CheckOrigin: func(r *http.Request) bool { - return true - }} + port string + server = http.NewServeMux() ) -type FullMessage struct { - Error bool - Message interface{} -} - -type Message struct { - Host string - Name string - Platform string - Data interface{} -} - -type Client struct { - Socket *websocket.Conn - Send chan *FullMessage -} - -// Write to websocket -func (client *Client) Write() { - defer client.Socket.Close() - var err error - for msg := range client.Send { - err = client.Socket.WriteJSON(msg) - if err != nil { - log.Error("Error inside client write ", err) - } - } -} - -type Hosts struct { - Info *config.DashboardInfo - // Connections : hostname mapped to connection instances to reuse - // across metrics - mu sync.Mutex - Drivers map[string]*driver.Driver - // ReadOnlyHosts : restrict pinging every other server except these - ReadOnlyHosts []string - Client chan *Client - Start chan bool -} - -func (hosts *Hosts) getDriver(address string) *driver.Driver { - hosts.mu.Lock() - defer hosts.mu.Unlock() - return hosts.Drivers[address] -} - -func (hosts *Hosts) resetDriver(host config.Host) { - hosts.mu.Lock() - defer hosts.mu.Unlock() - hostDriver := host.Connection.ToDriver() - hosts.Drivers[host.Address] = &hostDriver -} - -func (hosts *Hosts) sendMetric(host config.Host, client *Client) { - if hosts.getDriver(host.Address) == nil { - hosts.resetDriver(host) - } - for _, metric := range hosts.Info.Metrics { - driver := hosts.getDriver(host.Address) - initializedMetric, err := inspector.Init(metric, driver) - data, err := initializedMetric.Execute() - if err == nil { - var unmarsh interface{} - json.Unmarshal(data, &unmarsh) - message := &FullMessage{ - Message: Message{ - Host: host.Address, - Platform: (*driver).GetDetails().Name, - Name: metric, - Data: unmarsh, - }, - Error: false, - } - client.Send <- message - } else { - // check for error 127 which means command was not found - var errorContent string - if !strings.Contains(fmt.Sprintf("%s", err), "127") { - errorContent = fmt.Sprintf("Could not retrieve metric %s from driver %s with error %s, resetting connection...", metric, host.Address, err) - } else { - errorContent = fmt.Sprintf("Command %s not found on driver %s", metric, host.Address) - } - log.Error(errorContent) - hosts.resetDriver(host) - message := &FullMessage{ - Message: errorContent, - Error: true, - } - client.Send <- message - } - } -} - -func (hosts *Hosts) Run() { - log.Debug("In Running") - for { - select { - case client := <-hosts.Client: - for { - for _, host := range hosts.Info.Hosts { - go hosts.sendMetric(host, client) - } - log.Infof("Delaying for %d seconds", hosts.Info.PollInterval) - time.Sleep(time.Duration(hosts.Info.PollInterval) * time.Second) - } - } - } - -} - -func (hosts *Hosts) ServeHTTP(w http.ResponseWriter, req *http.Request) { - socket, err := upgrader.Upgrade(w, req, nil) - if err != nil { - log.Fatal(err) - return - } - client := &Client{ - Socket: socket, - Send: make(chan *FullMessage, messageBufferSize), - } - hosts.Client <- client - client.Write() -} - -func newHosts(cfg *config.Config) *Hosts { - dashboardInfo := config.GetDashboardInfoConfig(cfg) - hosts := &Hosts{ - Info: dashboardInfo, - Drivers: make(map[string]*driver.Driver), - Client: make(chan *Client), - } - return hosts -} - var apiCmd = &cobra.Command{ Use: "dashboard", Short: "Run saido dashboard on a PORT env variable, fallback to set argument", Long: ``, Run: func(cmd *cobra.Command, args []string) { - hosts := newHosts(cfg) + hosts := client.NewHostsController(cfg) server.Handle("/metrics", hosts) log.Info("listening on :", port) _, err := strconv.Atoi(port) diff --git a/config/config.go b/config/config.go index 993d950..e37d097 100644 --- a/config/config.go +++ b/config/config.go @@ -18,6 +18,26 @@ type DashboardInfo struct { PollInterval int } +type HostList = []string + +func Contains(hostList HostList, host Host) bool { + for _, compare := range hostList { + if host.Address == compare { + return true + } + } + return false +} + +// GetAllHostAddresses : returns list of all hosts in the dashboard +func (dashboardInfo *DashboardInfo) GetAllHostAddresses() (addresses HostList) { + addresses = []string{} + for _, host := range dashboardInfo.Hosts { + addresses = append(addresses, host.Address) + } + return +} + type Connection struct { Type string `mapstructure:"type"` Username string `mapstructure:"username"`