diff --git a/README.md b/README.md index 813351fd..1b59e236 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,8 @@ Environment: | MF_AGENT_MQTT_RETAIN | MQTT retain | false | | MF_AGENT_MQTT_CLIENT_CERT | Location of client certificate for MTLS | thing.cert | | MF_AGENT_MQTT_CLIENT_PK | Location of client certificate key for MTLS | thing.key | - +| MF_AGENT_HEARTBEAT_INTERVAL | Interval in which heartbeat from service is expected | 30s | +| MF_AGENT_TERMINAL_SESSION_TIMEOUT | Timeout for terminal session | 30s | Here `thing` is a Mainflux thing, and control channel from `channels` is used with `req` and `res` subtopic (i.e. app needs to PUB/SUB on `/channels//messages/req` and `/channels//messages/res`). diff --git a/cmd/main.go b/cmd/main.go index 96f3ebf6..1f0734f1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -10,6 +10,7 @@ import ( "os" "os/signal" "syscall" + "time" mqtt "github.com/eclipse/paho.mqtt.golang" kitprometheus "github.com/go-kit/kit/metrics/prometheus" @@ -20,9 +21,9 @@ import ( "github.com/mainflux/agent/pkg/conn" "github.com/mainflux/agent/pkg/edgex" "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/logger" nats "github.com/nats-io/nats.go" - "github.com/pkg/errors" stdprometheus "github.com/prometheus/client_golang/prometheus" ) @@ -51,6 +52,8 @@ const ( defMqttPrivKey = "thing.key" defConfigFile = "config.toml" defNatsURL = nats.DefaultURL + defHeartbeatInterval = "10s" + defTermSessionTimeout = "60s" envConfigFile = "MF_AGENT_CONFIG_FILE" envLogLevel = "MF_AGENT_LOG_LEVEL" @@ -67,37 +70,40 @@ const ( envEncryption = "MF_AGENT_ENCRYPTION" envNatsURL = "MF_AGENT_NATS_URL" - envMqttUsername = "MF_AGENT_MQTT_USERNAME" - envMqttPassword = "MF_AGENT_MQTT_PASSWORD" - envMqttSkipTLSVer = "MF_AGENT_MQTT_SKIP_TLS" - envMqttMTLS = "MF_AGENT_MQTT_MTLS" - envMqttCA = "MF_AGENT_MQTT_CA" - envMqttQoS = "MF_AGENT_MQTT_QOS" - envMqttRetain = "MF_AGENT_MQTT_RETAIN" - envMqttCert = "MF_AGENT_MQTT_CLIENT_CERT" - envMqttPrivKey = "MF_AGENT_MQTT_CLIENT_PK" + envMqttUsername = "MF_AGENT_MQTT_USERNAME" + envMqttPassword = "MF_AGENT_MQTT_PASSWORD" + envMqttSkipTLSVer = "MF_AGENT_MQTT_SKIP_TLS" + envMqttMTLS = "MF_AGENT_MQTT_MTLS" + envMqttCA = "MF_AGENT_MQTT_CA" + envMqttQoS = "MF_AGENT_MQTT_QOS" + envMqttRetain = "MF_AGENT_MQTT_RETAIN" + envMqttCert = "MF_AGENT_MQTT_CLIENT_CERT" + envMqttPrivKey = "MF_AGENT_MQTT_CLIENT_PK" + envHeartbeatInterval = "MF_AGENT_HEARTBEAT_INTERVAL" + envTermSessionTimeout = "MF_AGENT_TERMINAL_SESSION_TIMEOUT" ) var ( errFailedToSetupMTLS = errors.New("Failed to set up mtls certs") errFetchingBootstrapFailed = errors.New("Fetching bootstrap failed with error") errFailedToReadConfig = errors.New("Failed to read config") + errFailedToConfigHeartbeat = errors.New("Failed to configure heartbeat") ) func main() { cfg, err := loadEnvConfig() if err != nil { - log.Fatalf(fmt.Sprintf("Failed to load config: %s", err.Error())) + log.Fatalf(fmt.Sprintf("Failed to load config: %s", err)) } logger, err := logger.New(os.Stdout, cfg.Agent.Log.Level) if err != nil { - log.Fatalf(fmt.Sprintf("Failed to create logger: %s", err.Error())) + log.Fatalf(fmt.Sprintf("Failed to create logger: %s", err)) } cfg, err = loadBootConfig(cfg, logger) if err != nil { - logger.Error(fmt.Sprintf("Failed to load config: %s", err.Error())) + logger.Error(fmt.Sprintf("Failed to load config: %s", err)) } nc, err := nats.Connect(cfg.Agent.Server.NatsURL) @@ -116,7 +122,7 @@ func main() { svc, err := agent.New(mqttClient, &cfg, edgexClient, nc, logger) if err != nil { - logger.Error(fmt.Sprintf("Error in agent service: %s", err.Error())) + logger.Error(fmt.Sprintf("Error in agent service: %s", err)) os.Exit(1) } @@ -166,6 +172,21 @@ func loadEnvConfig() (config.Config, error) { Control: mainflux.Env(envCtrlChan, defCtrlChan), Data: mainflux.Env(envDataChan, defDataChan), } + interval, err := time.ParseDuration(mainflux.Env(envHeartbeatInterval, defHeartbeatInterval)) + if err != nil { + return config.Config{}, errors.Wrap(errFailedToConfigHeartbeat, err) + } + + ch := config.Heartbeat{ + Interval: interval, + } + termSessionTimeout, err := time.ParseDuration(mainflux.Env(envTermSessionTimeout, defTermSessionTimeout)) + if err != nil { + return config.Config{}, err + } + ct := config.Terminal{ + SessionTimeout: termSessionTimeout, + } ec := config.EdgexConf{URL: mainflux.Env(envEdgexURL, defEdgexURL)} lc := config.LogConf{Level: mainflux.Env(envLogLevel, defLogLevel)} @@ -175,10 +196,10 @@ func loadEnvConfig() (config.Config, error) { Password: mainflux.Env(envMqttPassword, defMqttPassword), } file := mainflux.Env(envConfigFile, defConfigFile) - c := config.New(sc, cc, ec, lc, mc, file) - mc, err := loadCertificate(c.Agent.MQTT) + c := config.New(sc, cc, ec, lc, mc, ch, ct, file) + mc, err = loadCertificate(c.Agent.MQTT) if err != nil { - return c, errors.Wrap(errFailedToSetupMTLS, err.Error()) + return c, errors.Wrap(errFailedToSetupMTLS, err) } c.Agent.MQTT = mc return c, nil @@ -196,16 +217,24 @@ func loadBootConfig(c config.Config, logger logger.Logger) (bsc config.Config, e } if err := bootstrap.Bootstrap(bsConfig, logger, file); err != nil { - return c, errors.Wrap(errFetchingBootstrapFailed, err.Error()) + return c, errors.Wrap(errFetchingBootstrapFailed, err) } if bsc, err = config.Read(file); err != nil { - return c, errors.Wrap(errFailedToReadConfig, err.Error()) + return c, errors.Wrap(errFailedToReadConfig, err) } mc, err := loadCertificate(bsc.Agent.MQTT) if err != nil { - return bsc, errors.Wrap(errFailedToSetupMTLS, err.Error()) + return bsc, errors.Wrap(errFailedToSetupMTLS, err) + } + + if bsc.Agent.Heartbeat.Interval <= 0 { + bsc.Agent.Heartbeat.Interval = c.Agent.Heartbeat.Interval + } + + if bsc.Agent.Terminal.SessionTimeout <= 0 { + bsc.Agent.Terminal.SessionTimeout = c.Agent.Terminal.SessionTimeout } bsc.Agent.MQTT = mc diff --git a/configs/config.toml b/configs/config.toml index 2aaf983b..3f0563dc 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -25,3 +25,11 @@ [Agent.server] nats_url = "localhost:4222" port = "9000" + + # interval - interval in seconds in which heartbeat is expected + [Agent.heartbeat] + interval = "30s" + + # session_timeout in sec, when expired terminal session ends + [Agent.terminal] + session_timeout = "30s" diff --git a/pkg/agent/heartbeat.go b/pkg/agent/heartbeat.go index 671af61a..23bd99bb 100644 --- a/pkg/agent/heartbeat.go +++ b/pkg/agent/heartbeat.go @@ -6,9 +6,6 @@ import ( ) const ( - timeout = 3 - interval = 10000 - online = "online" offline = "offline" @@ -20,10 +17,10 @@ const ( // Services send heartbeat to nats thus updating last seen. // When service doesnt send heartbeat for some time gets marked offline. type svc struct { - info Info - counter int - ticker *time.Ticker - mu sync.Mutex + info Info + interval time.Duration + ticker *time.Ticker + mu sync.Mutex } type Info struct { @@ -41,17 +38,19 @@ type Heartbeat interface { Info() Info } -func NewHeartbeat(name, svctype string) Heartbeat { - ticker := time.NewTicker(interval * time.Millisecond) +// interval - duration of interval +// if service doesnt send heartbeat during interval it is marked offline +func NewHeartbeat(name, svcType string, interval time.Duration) Heartbeat { + ticker := time.NewTicker(interval) s := svc{ info: Info{ Name: name, Status: online, - Type: svctype, + Type: svcType, LastSeen: time.Now(), }, - counter: timeout, - ticker: ticker, + ticker: ticker, + interval: interval, } s.listen() return &s @@ -65,10 +64,8 @@ func (s *svc) listen() { // TODO - we can disable ticker when the status gets OFFLINE // and on the next heartbeat enable it again s.mu.Lock() - s.counter = s.counter - 1 - if s.counter == 0 { + if time.Now().After(s.info.LastSeen.Add(s.interval)) { s.info.Status = offline - s.counter = timeout } s.mu.Unlock() } @@ -80,7 +77,6 @@ func (s *svc) Update() { s.mu.Lock() defer s.mu.Unlock() s.info.LastSeen = time.Now() - s.counter = timeout s.info.Status = online } diff --git a/pkg/agent/service.go b/pkg/agent/service.go index 2563e4f6..25200c86 100644 --- a/pkg/agent/service.go +++ b/pkg/agent/service.go @@ -10,6 +10,7 @@ import ( "os/exec" "sort" "strings" + "time" paho "github.com/eclipse/paho.mqtt.golang" "github.com/mainflux/agent/pkg/config" @@ -132,6 +133,10 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log terminals: make(map[string]terminal.Session), } + if cfg.Agent.Heartbeat.Interval <= 0 { + ag.logger.Error(fmt.Sprintf("invalid heartbeat interval %d", cfg.Agent.Heartbeat.Interval)) + } + _, err := ag.nats.Subscribe(Hearbeat, func(msg *nats.Msg) { sub := msg.Subject tok := strings.Split(sub, ".") @@ -145,7 +150,7 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log // if there is multiple instances of the same service // we will have to add another distinction if _, ok := ag.svcs[svcname]; !ok { - svc := NewHeartbeat(svcname, svctype) + svc := NewHeartbeat(svcname, svctype, cfg.Agent.Heartbeat.Interval) ag.svcs[svcname] = svc ag.logger.Info(fmt.Sprintf("Services '%s-%s' registered", svcname, svctype)) } @@ -270,7 +275,7 @@ func (a *agent) Terminal(uuid, cmdStr string) error { return err } case open: - if err := a.terminalOpen(uuid); err != nil { + if err := a.terminalOpen(uuid, a.config.Agent.Terminal.SessionTimeout); err != nil { return err } case close: @@ -281,15 +286,15 @@ func (a *agent) Terminal(uuid, cmdStr string) error { return nil } -func (a *agent) terminalOpen(uuid string) error { +func (a *agent) terminalOpen(uuid string, timeout time.Duration) error { if _, ok := a.terminals[uuid]; !ok { - term, err := terminal.NewSession(uuid, a.Publish, a.logger) + term, err := terminal.NewSession(uuid, timeout, a.Publish, a.logger) if err != nil { return errors.Wrap(errors.Wrap(errFailedToCreateTerminalSession, fmt.Errorf(" for %s", uuid)), err) } a.terminals[uuid] = term go func() { - for _ = range term.IsDone() { + for range term.IsDone() { // Terminal is inactive, should be closed a.logger.Debug((fmt.Sprintf("Closing terminal session %s", uuid))) a.terminalClose(uuid) @@ -312,7 +317,7 @@ func (a *agent) terminalClose(uuid string) error { } func (a *agent) terminalWrite(uuid, cmd string) error { - if err := a.terminalOpen(uuid); err != nil { + if err := a.terminalOpen(uuid, a.config.Agent.Terminal.SessionTimeout); err != nil { return err } term := a.terminals[uuid] diff --git a/pkg/bootstrap/bootstrap.go b/pkg/bootstrap/bootstrap.go index a030c117..95aae1cf 100644 --- a/pkg/bootstrap/bootstrap.go +++ b/pkg/bootstrap/bootstrap.go @@ -85,7 +85,7 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error { cfg.ID, cfg.URL)) ic := infraConfig{} if err := json.Unmarshal([]byte(dc.Content), &ic); err != nil { - return errors.New(err.Error()) + return err } saveExportConfig(ic.ExportConfig, logger) @@ -117,8 +117,9 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error { Password: dc.MainfluxKey, Username: dc.MainfluxID, } - - c := config.New(sc, cc, ec, lc, mc, file) + hc := config.Heartbeat{} + tc := config.Terminal{} + c := config.New(sc, cc, ec, lc, mc, hc, tc, file) return config.Save(c) } @@ -160,13 +161,13 @@ func getConfig(bsID, bsKey, bsSvrURL string, logger log.Logger) (deviceConfig, e req, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { - return deviceConfig{}, errors.New(err.Error()) + return deviceConfig{}, err } req.Header.Add("Authorization", bsKey) resp, err := client.Do(req) if err != nil { - return deviceConfig{}, errors.New(err.Error()) + return deviceConfig{}, err } if resp.StatusCode >= http.StatusBadRequest { return deviceConfig{}, errors.New(http.StatusText(resp.StatusCode)) @@ -174,12 +175,12 @@ func getConfig(bsID, bsKey, bsSvrURL string, logger log.Logger) (deviceConfig, e body, err := ioutil.ReadAll(resp.Body) if err != nil { - return deviceConfig{}, errors.New(err.Error()) + return deviceConfig{}, err } defer resp.Body.Close() dc := deviceConfig{} if err := json.Unmarshal(body, &dc); err != nil { - return deviceConfig{}, errors.New(err.Error()) + return deviceConfig{}, err } return dc, nil diff --git a/pkg/config/config.go b/pkg/config/config.go index 8d09c01f..59f0010b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -7,6 +7,7 @@ import ( "crypto/tls" "fmt" "io/ioutil" + "time" "github.com/mainflux/mainflux/errors" "github.com/pelletier/go-toml" @@ -45,13 +46,23 @@ type MQTTConf struct { Cert tls.Certificate `json:"-" toml:"-"` } +type Heartbeat struct { + Interval time.Duration `toml:"interval"` +} + +type Terminal struct { + SessionTimeout time.Duration `toml:"session_timeout"` +} + // Config struct of Mainflux Agent type AgentConf struct { - Server ServerConf `toml:"server"` - Channels ChanConf `toml:"channels"` - Edgex EdgexConf `toml:"edgex"` - Log LogConf `toml:"log"` - MQTT MQTTConf `toml:"mqtt"` + Server ServerConf `toml:"server"` + Terminal Terminal `toml:"terminal"` + Heartbeat Heartbeat `toml:"heartbeat"` + Channels ChanConf `toml:"channels"` + Edgex EdgexConf `toml:"edgex"` + Log LogConf `toml:"log"` + MQTT MQTTConf `toml:"mqtt"` } type Config struct { @@ -59,13 +70,15 @@ type Config struct { File string } -func New(sc ServerConf, cc ChanConf, ec EdgexConf, lc LogConf, mc MQTTConf, file string) Config { +func New(sc ServerConf, cc ChanConf, ec EdgexConf, lc LogConf, mc MQTTConf, hc Heartbeat, tc Terminal, file string) Config { ac := AgentConf{ - Server: sc, - Channels: cc, - Edgex: ec, - Log: lc, - MQTT: mc, + Server: sc, + Channels: cc, + Edgex: ec, + Log: lc, + MQTT: mc, + Heartbeat: hc, + Terminal: tc, } return Config{ diff --git a/pkg/terminal/terminal.go b/pkg/terminal/terminal.go index cb535e12..e137ad76 100644 --- a/pkg/terminal/terminal.go +++ b/pkg/terminal/terminal.go @@ -17,8 +17,8 @@ import ( ) const ( - timeoutInterval = 30 - terminal = "term" + terminal = "term" + second = time.Duration(1 * time.Second) ) var ( @@ -26,16 +26,17 @@ var ( ) type term struct { - uuid string - ptmx *os.File - writer io.Writer - done chan bool - topic string - timeout int - timer *time.Ticker - publish func(channel, payload string) error - logger logger.Logger - mu sync.Mutex + uuid string + ptmx *os.File + writer io.Writer + done chan bool + topic string + timeout time.Duration + resetTimeout time.Duration + timer *time.Ticker + publish func(channel, payload string) error + logger logger.Logger + mu sync.Mutex } type Session interface { @@ -44,14 +45,15 @@ type Session interface { io.Writer } -func NewSession(uuid string, publish func(channel, payload string) error, logger logger.Logger) (Session, error) { +func NewSession(uuid string, timeout time.Duration, publish func(channel, payload string) error, logger logger.Logger) (Session, error) { t := &term{ - logger: logger, - uuid: uuid, - publish: publish, - timeout: timeoutInterval, - topic: fmt.Sprintf("term/%s", uuid), - done: make(chan bool), + logger: logger, + uuid: uuid, + publish: publish, + timeout: timeout, + resetTimeout: timeout, + topic: fmt.Sprintf("term/%s", uuid), + done: make(chan bool), } c := exec.Command("bash") @@ -82,7 +84,7 @@ func NewSession(uuid string, publish func(channel, payload string) error, logger return t, nil } -func (t *term) resetCounter(timeout int) { +func (t *term) resetCounter(timeout time.Duration) { t.mu.Lock() defer t.mu.Unlock() if timeout > 0 { @@ -93,7 +95,7 @@ func (t *term) resetCounter(timeout int) { func (t *term) decrementCounter() { t.mu.Lock() defer t.mu.Unlock() - t.timeout = t.timeout - 1 + t.timeout = t.timeout - second if t.timeout == 0 { t.done <- true t.timer.Stop() @@ -105,7 +107,7 @@ func (t *term) IsDone() chan bool { } func (t *term) Write(p []byte) (int, error) { - t.resetCounter(timeoutInterval) + t.resetCounter(t.resetTimeout) n := len(p) payload, err := encoder.EncodeSenML(t.uuid, terminal, string(p)) if err != nil {