Skip to content

Commit

Permalink
agent: notify systemd after JoinLAN (#2121)
Browse files Browse the repository at this point in the history
This patch adds support for notifying systemd via the
NOTIFY_SOCKET by sending 'READY=1' to the socket after
a successful JoinLAN.

Fixes #2121
  • Loading branch information
magiconair authored Jun 21, 2017
1 parent 37d389f commit 31a310f
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 16 deletions.
47 changes: 31 additions & 16 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/agent/systemd"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
Expand Down Expand Up @@ -71,6 +72,11 @@ type delegate interface {
Stats() map[string]map[string]string
}

// notifier is called after a successful JoinLAN.
type notifier interface {
Notify(string) error
}

// The agent is the long running process that is run on every machine.
// It exposes an RPC interface that is used by the CLI to control the
// agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
Expand Down Expand Up @@ -141,6 +147,9 @@ type Agent struct {
shutdownCh chan struct{}
shutdownLock sync.Mutex

// joinLANNotifier is called after a successful JoinLAN.
joinLANNotifier notifier

// retryJoinCh transports errors from the retry join
// attempts.
retryJoinCh chan error
Expand Down Expand Up @@ -188,22 +197,23 @@ func New(c *Config) (*Agent, error) {
}

a := &Agent{
config: c,
acls: acls,
checkReapAfter: make(map[types.CheckID]time.Duration),
checkMonitors: make(map[types.CheckID]*CheckMonitor),
checkTTLs: make(map[types.CheckID]*CheckTTL),
checkHTTPs: make(map[types.CheckID]*CheckHTTP),
checkTCPs: make(map[types.CheckID]*CheckTCP),
checkDockers: make(map[types.CheckID]*CheckDocker),
eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256),
reloadCh: make(chan chan error),
retryJoinCh: make(chan error),
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
dnsAddrs: dnsAddrs,
httpAddrs: httpAddrs,
config: c,
acls: acls,
checkReapAfter: make(map[types.CheckID]time.Duration),
checkMonitors: make(map[types.CheckID]*CheckMonitor),
checkTTLs: make(map[types.CheckID]*CheckTTL),
checkHTTPs: make(map[types.CheckID]*CheckHTTP),
checkTCPs: make(map[types.CheckID]*CheckTCP),
checkDockers: make(map[types.CheckID]*CheckDocker),
eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256),
joinLANNotifier: &systemd.Notifier{},
reloadCh: make(chan chan error),
retryJoinCh: make(chan error),
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
dnsAddrs: dnsAddrs,
httpAddrs: httpAddrs,
}
if err := a.resolveTmplAddrs(); err != nil {
return nil, err
Expand Down Expand Up @@ -1216,6 +1226,11 @@ func (a *Agent) JoinLAN(addrs []string) (n int, err error) {
a.logger.Printf("[INFO] agent: (LAN) joining: %v", addrs)
n, err = a.delegate.JoinLAN(addrs)
a.logger.Printf("[INFO] agent: (LAN) joined: %d Err: %v", n, err)
if err == nil && a.joinLANNotifier != nil {
if notifErr := a.joinLANNotifier.Notify(systemd.Ready); notifErr != nil {
a.logger.Printf("[DEBUG] agent: systemd notify failed: ", notifErr)
}
}
return
}

Expand Down
32 changes: 32 additions & 0 deletions agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,38 @@ func TestAgent_Join_ACLDeny(t *testing.T) {
})
}

type mockNotifier struct{ s string }

func (n *mockNotifier) Notify(state string) error {
n.s = state
return nil
}

func TestAgent_JoinLANNotify(t *testing.T) {
t.Parallel()
a1 := NewTestAgent(t.Name(), nil)
defer a1.Shutdown()

cfg2 := TestConfig()
cfg2.Server = false
cfg2.Bootstrap = false
a2 := NewTestAgent(t.Name(), cfg2)
defer a2.Shutdown()

notif := &mockNotifier{}
a1.joinLANNotifier = notif

addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.Ports.SerfLan)
_, err := a1.JoinLAN([]string{addr})
if err != nil {
t.Fatalf("err: %v", err)
}

if got, want := notif.s, "READY=1"; got != want {
t.Fatalf("got joinLAN notification %q want %q", got, want)
}
}

func TestAgent_Leave(t *testing.T) {
t.Parallel()
a1 := NewTestAgent(t.Name(), nil)
Expand Down
42 changes: 42 additions & 0 deletions agent/systemd/notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package systemd

import (
"errors"
"net"
"os"
)

const (
// magic values for systemd
// from https://www.freedesktop.org/software/systemd/man/sd_notify.html#Description

Ready = "READY=1"
Reloading = "RELOADING=1"
Stopping = "STOPPING=1"
)

var NotifyNoSocket = errors.New("No socket")

// Notifier provides a method to send a message to systemd.
type Notifier struct{}

// Notify sends a message to the init daemon. It is common to ignore the error.
func (n *Notifier) Notify(state string) error {
addr := &net.UnixAddr{
Name: os.Getenv("NOTIFY_SOCKET"),
Net: "unixgram",
}

if addr.Name == "" {
return NotifyNoSocket
}

conn, err := net.DialUnix(addr.Net, nil, addr)
if err != nil {
return err
}
defer conn.Close()

_, err = conn.Write([]byte(state))
return err
}
5 changes: 5 additions & 0 deletions website/source/docs/agent/basics.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ There are several important messages that [`consul agent`](/docs/commands/agent.
Consul agents in a cluster. Not all Consul agents in a cluster have to
use the same port, but this address **MUST** be reachable by all other nodes.

When running under `systemd` on Linux, Consul notifies systemd by sending
`READY=1` to the `$NOTIFY_SOCKET` when a LAN join has completed. For
this either the `join` or `retry_join` option has to be set and the
service definition file has to have `Type=notify` set.

## Stopping an Agent

An agent can be stopped in two ways: gracefully or forcefully. To gracefully
Expand Down

0 comments on commit 31a310f

Please sign in to comment.