Skip to content

Commit

Permalink
MF-38 - Hardcoded parameters for Heartbeat and Terminal are made conf…
Browse files Browse the repository at this point in the history
…igurable (#43)

* make hardcoded vals configurable

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* change var name

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* add merge configs to use default values if not exisiting in bootstrap

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* add logging if interval is bad

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* update readme with vars

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove some vars, use sec and parsetime

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* update config.toml

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* var name, log text changes

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix values for time

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix offline time logic

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>
  • Loading branch information
mteodor authored Jun 22, 2020
1 parent 11a9764 commit d50ceff
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 83 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ Environment:
| MF_AGENT_MQTT_RETAIN | MQTT retain | false |
| MF_AGENT_MQTT_CLIENT_CERT | Location of client certificate for MTLS | thing.cert |
| MF_AGENT_MQTT_CLIENT_PK | Location of client certificate key for MTLS | thing.key |

| MF_AGENT_HEARTBEAT_INTERVAL | Interval in which heartbeat from service is expected | 30s |
| MF_AGENT_TERMINAL_SESSION_TIMEOUT | Timeout for terminal session | 30s |

Here `thing` is a Mainflux thing, and control channel from `channels` is used with `req` and `res` subtopic
(i.e. app needs to PUB/SUB on `/channels/<control_channel_id>/messages/req` and `/channels/<control_channel_id>/messages/res`).
Expand Down
69 changes: 49 additions & 20 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
Expand All @@ -20,9 +21,9 @@ import (
"github.com/mainflux/agent/pkg/conn"
"github.com/mainflux/agent/pkg/edgex"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/logger"
nats "github.com/nats-io/nats.go"
"github.com/pkg/errors"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -51,6 +52,8 @@ const (
defMqttPrivKey = "thing.key"
defConfigFile = "config.toml"
defNatsURL = nats.DefaultURL
defHeartbeatInterval = "10s"
defTermSessionTimeout = "60s"

envConfigFile = "MF_AGENT_CONFIG_FILE"
envLogLevel = "MF_AGENT_LOG_LEVEL"
Expand All @@ -67,37 +70,40 @@ const (
envEncryption = "MF_AGENT_ENCRYPTION"
envNatsURL = "MF_AGENT_NATS_URL"

envMqttUsername = "MF_AGENT_MQTT_USERNAME"
envMqttPassword = "MF_AGENT_MQTT_PASSWORD"
envMqttSkipTLSVer = "MF_AGENT_MQTT_SKIP_TLS"
envMqttMTLS = "MF_AGENT_MQTT_MTLS"
envMqttCA = "MF_AGENT_MQTT_CA"
envMqttQoS = "MF_AGENT_MQTT_QOS"
envMqttRetain = "MF_AGENT_MQTT_RETAIN"
envMqttCert = "MF_AGENT_MQTT_CLIENT_CERT"
envMqttPrivKey = "MF_AGENT_MQTT_CLIENT_PK"
envMqttUsername = "MF_AGENT_MQTT_USERNAME"
envMqttPassword = "MF_AGENT_MQTT_PASSWORD"
envMqttSkipTLSVer = "MF_AGENT_MQTT_SKIP_TLS"
envMqttMTLS = "MF_AGENT_MQTT_MTLS"
envMqttCA = "MF_AGENT_MQTT_CA"
envMqttQoS = "MF_AGENT_MQTT_QOS"
envMqttRetain = "MF_AGENT_MQTT_RETAIN"
envMqttCert = "MF_AGENT_MQTT_CLIENT_CERT"
envMqttPrivKey = "MF_AGENT_MQTT_CLIENT_PK"
envHeartbeatInterval = "MF_AGENT_HEARTBEAT_INTERVAL"
envTermSessionTimeout = "MF_AGENT_TERMINAL_SESSION_TIMEOUT"
)

var (
errFailedToSetupMTLS = errors.New("Failed to set up mtls certs")
errFetchingBootstrapFailed = errors.New("Fetching bootstrap failed with error")
errFailedToReadConfig = errors.New("Failed to read config")
errFailedToConfigHeartbeat = errors.New("Failed to configure heartbeat")
)

func main() {
cfg, err := loadEnvConfig()
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to load config: %s", err.Error()))
log.Fatalf(fmt.Sprintf("Failed to load config: %s", err))
}

logger, err := logger.New(os.Stdout, cfg.Agent.Log.Level)
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to create logger: %s", err.Error()))
log.Fatalf(fmt.Sprintf("Failed to create logger: %s", err))
}

cfg, err = loadBootConfig(cfg, logger)
if err != nil {
logger.Error(fmt.Sprintf("Failed to load config: %s", err.Error()))
logger.Error(fmt.Sprintf("Failed to load config: %s", err))
}

nc, err := nats.Connect(cfg.Agent.Server.NatsURL)
Expand All @@ -116,7 +122,7 @@ func main() {

svc, err := agent.New(mqttClient, &cfg, edgexClient, nc, logger)
if err != nil {
logger.Error(fmt.Sprintf("Error in agent service: %s", err.Error()))
logger.Error(fmt.Sprintf("Error in agent service: %s", err))
os.Exit(1)
}

Expand Down Expand Up @@ -166,6 +172,21 @@ func loadEnvConfig() (config.Config, error) {
Control: mainflux.Env(envCtrlChan, defCtrlChan),
Data: mainflux.Env(envDataChan, defDataChan),
}
interval, err := time.ParseDuration(mainflux.Env(envHeartbeatInterval, defHeartbeatInterval))
if err != nil {
return config.Config{}, errors.Wrap(errFailedToConfigHeartbeat, err)
}

ch := config.Heartbeat{
Interval: interval,
}
termSessionTimeout, err := time.ParseDuration(mainflux.Env(envTermSessionTimeout, defTermSessionTimeout))
if err != nil {
return config.Config{}, err
}
ct := config.Terminal{
SessionTimeout: termSessionTimeout,
}
ec := config.EdgexConf{URL: mainflux.Env(envEdgexURL, defEdgexURL)}
lc := config.LogConf{Level: mainflux.Env(envLogLevel, defLogLevel)}

Expand All @@ -175,10 +196,10 @@ func loadEnvConfig() (config.Config, error) {
Password: mainflux.Env(envMqttPassword, defMqttPassword),
}
file := mainflux.Env(envConfigFile, defConfigFile)
c := config.New(sc, cc, ec, lc, mc, file)
mc, err := loadCertificate(c.Agent.MQTT)
c := config.New(sc, cc, ec, lc, mc, ch, ct, file)
mc, err = loadCertificate(c.Agent.MQTT)
if err != nil {
return c, errors.Wrap(errFailedToSetupMTLS, err.Error())
return c, errors.Wrap(errFailedToSetupMTLS, err)
}
c.Agent.MQTT = mc
return c, nil
Expand All @@ -196,16 +217,24 @@ func loadBootConfig(c config.Config, logger logger.Logger) (bsc config.Config, e
}

if err := bootstrap.Bootstrap(bsConfig, logger, file); err != nil {
return c, errors.Wrap(errFetchingBootstrapFailed, err.Error())
return c, errors.Wrap(errFetchingBootstrapFailed, err)
}

if bsc, err = config.Read(file); err != nil {
return c, errors.Wrap(errFailedToReadConfig, err.Error())
return c, errors.Wrap(errFailedToReadConfig, err)
}

mc, err := loadCertificate(bsc.Agent.MQTT)
if err != nil {
return bsc, errors.Wrap(errFailedToSetupMTLS, err.Error())
return bsc, errors.Wrap(errFailedToSetupMTLS, err)
}

if bsc.Agent.Heartbeat.Interval <= 0 {
bsc.Agent.Heartbeat.Interval = c.Agent.Heartbeat.Interval
}

if bsc.Agent.Terminal.SessionTimeout <= 0 {
bsc.Agent.Terminal.SessionTimeout = c.Agent.Terminal.SessionTimeout
}

bsc.Agent.MQTT = mc
Expand Down
8 changes: 8 additions & 0 deletions configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,11 @@
[Agent.server]
nats_url = "localhost:4222"
port = "9000"

# interval - interval in seconds in which heartbeat is expected
[Agent.heartbeat]
interval = "30s"

# session_timeout in sec, when expired terminal session ends
[Agent.terminal]
session_timeout = "30s"
28 changes: 12 additions & 16 deletions pkg/agent/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import (
)

const (
timeout = 3
interval = 10000

online = "online"
offline = "offline"

Expand All @@ -20,10 +17,10 @@ const (
// Services send heartbeat to nats thus updating last seen.
// When service doesnt send heartbeat for some time gets marked offline.
type svc struct {
info Info
counter int
ticker *time.Ticker
mu sync.Mutex
info Info
interval time.Duration
ticker *time.Ticker
mu sync.Mutex
}

type Info struct {
Expand All @@ -41,17 +38,19 @@ type Heartbeat interface {
Info() Info
}

func NewHeartbeat(name, svctype string) Heartbeat {
ticker := time.NewTicker(interval * time.Millisecond)
// interval - duration of interval
// if service doesnt send heartbeat during interval it is marked offline
func NewHeartbeat(name, svcType string, interval time.Duration) Heartbeat {
ticker := time.NewTicker(interval)
s := svc{
info: Info{
Name: name,
Status: online,
Type: svctype,
Type: svcType,
LastSeen: time.Now(),
},
counter: timeout,
ticker: ticker,
ticker: ticker,
interval: interval,
}
s.listen()
return &s
Expand All @@ -65,10 +64,8 @@ func (s *svc) listen() {
// TODO - we can disable ticker when the status gets OFFLINE
// and on the next heartbeat enable it again
s.mu.Lock()
s.counter = s.counter - 1
if s.counter == 0 {
if time.Now().After(s.info.LastSeen.Add(s.interval)) {
s.info.Status = offline
s.counter = timeout
}
s.mu.Unlock()
}
Expand All @@ -80,7 +77,6 @@ func (s *svc) Update() {
s.mu.Lock()
defer s.mu.Unlock()
s.info.LastSeen = time.Now()
s.counter = timeout
s.info.Status = online
}

Expand Down
17 changes: 11 additions & 6 deletions pkg/agent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os/exec"
"sort"
"strings"
"time"

paho "github.com/eclipse/paho.mqtt.golang"
"github.com/mainflux/agent/pkg/config"
Expand Down Expand Up @@ -132,6 +133,10 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log
terminals: make(map[string]terminal.Session),
}

if cfg.Agent.Heartbeat.Interval <= 0 {
ag.logger.Error(fmt.Sprintf("invalid heartbeat interval %d", cfg.Agent.Heartbeat.Interval))
}

_, err := ag.nats.Subscribe(Hearbeat, func(msg *nats.Msg) {
sub := msg.Subject
tok := strings.Split(sub, ".")
Expand All @@ -145,7 +150,7 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log
// if there is multiple instances of the same service
// we will have to add another distinction
if _, ok := ag.svcs[svcname]; !ok {
svc := NewHeartbeat(svcname, svctype)
svc := NewHeartbeat(svcname, svctype, cfg.Agent.Heartbeat.Interval)
ag.svcs[svcname] = svc
ag.logger.Info(fmt.Sprintf("Services '%s-%s' registered", svcname, svctype))
}
Expand Down Expand Up @@ -270,7 +275,7 @@ func (a *agent) Terminal(uuid, cmdStr string) error {
return err
}
case open:
if err := a.terminalOpen(uuid); err != nil {
if err := a.terminalOpen(uuid, a.config.Agent.Terminal.SessionTimeout); err != nil {
return err
}
case close:
Expand All @@ -281,15 +286,15 @@ func (a *agent) Terminal(uuid, cmdStr string) error {
return nil
}

func (a *agent) terminalOpen(uuid string) error {
func (a *agent) terminalOpen(uuid string, timeout time.Duration) error {
if _, ok := a.terminals[uuid]; !ok {
term, err := terminal.NewSession(uuid, a.Publish, a.logger)
term, err := terminal.NewSession(uuid, timeout, a.Publish, a.logger)
if err != nil {
return errors.Wrap(errors.Wrap(errFailedToCreateTerminalSession, fmt.Errorf(" for %s", uuid)), err)
}
a.terminals[uuid] = term
go func() {
for _ = range term.IsDone() {
for range term.IsDone() {
// Terminal is inactive, should be closed
a.logger.Debug((fmt.Sprintf("Closing terminal session %s", uuid)))
a.terminalClose(uuid)
Expand All @@ -312,7 +317,7 @@ func (a *agent) terminalClose(uuid string) error {
}

func (a *agent) terminalWrite(uuid, cmd string) error {
if err := a.terminalOpen(uuid); err != nil {
if err := a.terminalOpen(uuid, a.config.Agent.Terminal.SessionTimeout); err != nil {
return err
}
term := a.terminals[uuid]
Expand Down
15 changes: 8 additions & 7 deletions pkg/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error {
cfg.ID, cfg.URL))
ic := infraConfig{}
if err := json.Unmarshal([]byte(dc.Content), &ic); err != nil {
return errors.New(err.Error())
return err
}

saveExportConfig(ic.ExportConfig, logger)
Expand Down Expand Up @@ -117,8 +117,9 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error {
Password: dc.MainfluxKey,
Username: dc.MainfluxID,
}

c := config.New(sc, cc, ec, lc, mc, file)
hc := config.Heartbeat{}
tc := config.Terminal{}
c := config.New(sc, cc, ec, lc, mc, hc, tc, file)

return config.Save(c)
}
Expand Down Expand Up @@ -160,26 +161,26 @@ func getConfig(bsID, bsKey, bsSvrURL string, logger log.Logger) (deviceConfig, e

req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return deviceConfig{}, errors.New(err.Error())
return deviceConfig{}, err
}

req.Header.Add("Authorization", bsKey)
resp, err := client.Do(req)
if err != nil {
return deviceConfig{}, errors.New(err.Error())
return deviceConfig{}, err
}
if resp.StatusCode >= http.StatusBadRequest {
return deviceConfig{}, errors.New(http.StatusText(resp.StatusCode))
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return deviceConfig{}, errors.New(err.Error())
return deviceConfig{}, err
}
defer resp.Body.Close()
dc := deviceConfig{}
if err := json.Unmarshal(body, &dc); err != nil {
return deviceConfig{}, errors.New(err.Error())
return deviceConfig{}, err
}

return dc, nil
Expand Down
Loading

0 comments on commit d50ceff

Please sign in to comment.