Skip to content

Commit

Permalink
sd-notify: do not hang when NOTIFY_SOCKET is used with create
Browse files Browse the repository at this point in the history
if NOTIFY_SOCKET is used, do not block the main runc process waiting
for events on the notify socket.  Bind mount the parent directory of
the notify socket, so that "start" can create the socket and it is
still accessible from the container.

Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
  • Loading branch information
giuseppe committed Mar 15, 2019
1 parent 7341c22 commit e3b3789
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 36 deletions.
113 changes: 83 additions & 30 deletions notify_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"bytes"
"fmt"
"net"
"os"
"path"
"path/filepath"
"strconv"
"time"

"github.com/opencontainers/runc/libcontainer"
"github.com/opencontainers/runtime-spec/specs-go"

"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)

Expand All @@ -26,12 +29,12 @@ func newNotifySocket(context *cli.Context, notifySocketHost string, id string) *
}

root := filepath.Join(context.GlobalString("root"), id)
path := filepath.Join(root, "notify.sock")
socketPath := filepath.Join(root, "notify", "notify.sock")

notifySocket := &notifySocket{
socket: nil,
host: notifySocketHost,
socketPath: path,
socketPath: socketPath,
}

return notifySocket
Expand All @@ -43,13 +46,19 @@ func (s *notifySocket) Close() error {

// If systemd is supporting sd_notify protocol, this function will add support
// for sd_notify protocol from within the container.
func (s *notifySocket) setupSpec(context *cli.Context, spec *specs.Spec) {
mount := specs.Mount{Destination: s.host, Source: s.socketPath, Options: []string{"bind"}}
func (s *notifySocket) setupSpec(context *cli.Context, spec *specs.Spec) error {
pathInContainer := filepath.Join("/run/notify", path.Base(s.socketPath))
mount := specs.Mount{
Destination: path.Dir(pathInContainer),
Source: path.Dir(s.socketPath),
Options: []string{"bind", "nosuid", "noexec", "nodev", "ro"},
}
spec.Mounts = append(spec.Mounts, mount)
spec.Process.Env = append(spec.Process.Env, fmt.Sprintf("NOTIFY_SOCKET=%s", s.host))
spec.Process.Env = append(spec.Process.Env, fmt.Sprintf("NOTIFY_SOCKET=%s", pathInContainer))
return nil
}

func (s *notifySocket) setupSocket() error {
func (s *notifySocket) bindSocket() error {
addr := net.UnixAddr{
Name: s.socketPath,
Net: "unixgram",
Expand All @@ -64,45 +73,89 @@ func (s *notifySocket) setupSocket() error {
return nil
}

// pid1 must be set only with -d, as it is used to set the new process as the main process
// for the service in systemd
func (s *notifySocket) run(pid1 int) {
buf := make([]byte, 512)
notifySocketHostAddr := net.UnixAddr{Name: s.host, Net: "unixgram"}
func (s *notifySocket) setupSocketDirectory() error {
return os.Mkdir(path.Dir(s.socketPath), 0755)
}

func notifySocketStart(context *cli.Context, notifySocketHost, id string) (*notifySocket, error) {
notifySocket := newNotifySocket(context, notifySocketHost, id)
if notifySocket == nil {
return nil, nil
}

if err := notifySocket.bindSocket(); err != nil {
return nil, err
}
return notifySocket, nil
}

func (n *notifySocket) waitForContainer(container libcontainer.Container) error {
s, err := container.State()
if err != nil {
return err
}
return n.run(s.InitProcessPid)
}

func (n *notifySocket) run(pid1 int) error {
if n.socket == nil {
return nil
}
notifySocketHostAddr := net.UnixAddr{Name: n.host, Net: "unixgram"}
client, err := net.DialUnix("unixgram", nil, &notifySocketHostAddr)
if err != nil {
logrus.Error(err)
return
return err
}
for {
r, err := s.socket.Read(buf)
if err != nil {
break

ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()

fileChan := make(chan []byte)
go func() {
for {
buf := make([]byte, 512)
r, err := n.socket.Read(buf)
if err != nil {
return
}
got := buf[0:r]
if !bytes.HasPrefix(got, []byte("READY=")) {
continue
}
fileChan <- got
return
}
var out bytes.Buffer
for _, line := range bytes.Split(buf[0:r], []byte{'\n'}) {
if bytes.HasPrefix(line, []byte("READY=")) {
}()

for {
select {
case <-ticker.C:
_, err := os.Stat(filepath.Join("/proc", strconv.Itoa(pid1)))
if err != nil {
return nil
}
case b := <-fileChan:
for _, line := range bytes.Split(b, []byte{'\n'}) {
var out bytes.Buffer
_, err = out.Write(line)
if err != nil {
return
return err
}

_, err = out.Write([]byte{'\n'})
if err != nil {
return
return err
}

_, err = client.Write(out.Bytes())
if err != nil {
return
return err
}

// now we can inform systemd to use pid1 as the pid to monitor
if pid1 > 0 {
newPid := fmt.Sprintf("MAINPID=%d\n", pid1)
client.Write([]byte(newPid))
}
return
newPid := fmt.Sprintf("MAINPID=%d\n", pid1)
client.Write([]byte(newPid))
return nil
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (h *signalHandler) forward(process *libcontainer.Process, tty *tty, detach
h.notifySocket.run(pid1)
return 0, nil
}
h.notifySocket.run(os.Getpid())
go h.notifySocket.run(0)
}

Expand Down Expand Up @@ -97,9 +98,6 @@ func (h *signalHandler) forward(process *libcontainer.Process, tty *tty, detach
// status because we must ensure that any of the go specific process
// fun such as flushing pipes are complete before we return.
process.Wait()
if h.notifySocket != nil {
h.notifySocket.Close()
}
return e.status, nil
}
}
Expand Down
13 changes: 12 additions & 1 deletion start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"errors"
"fmt"
"os"

"github.com/opencontainers/runc/libcontainer"
"github.com/urfave/cli"
Expand Down Expand Up @@ -31,7 +32,17 @@ your host.`,
}
switch status {
case libcontainer.Created:
return container.Exec()
notifySocket, err := notifySocketStart(context, os.Getenv("NOTIFY_SOCKET"), container.ID())
if err != nil {
return err
}
if err := container.Exec(); err != nil {
return err
}
if notifySocket != nil {
return notifySocket.waitForContainer(container)
}
return nil
case libcontainer.Stopped:
return errors.New("cannot start a container that has stopped")
case libcontainer.Running:
Expand Down
12 changes: 10 additions & 2 deletions utils_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ func startContainer(context *cli.Context, spec *specs.Spec, action CtAct, criuOp

notifySocket := newNotifySocket(context, os.Getenv("NOTIFY_SOCKET"), id)
if notifySocket != nil {
notifySocket.setupSpec(context, spec)
if err := notifySocket.setupSpec(context, spec); err != nil {
return -1, err
}
}

container, err := createContainer(context, id, spec)
Expand All @@ -415,10 +417,16 @@ func startContainer(context *cli.Context, spec *specs.Spec, action CtAct, criuOp
}

if notifySocket != nil {
err := notifySocket.setupSocket()
err := notifySocket.setupSocketDirectory()
if err != nil {
return -1, err
}
if action == CT_ACT_RUN {
err := notifySocket.bindSocket()
if err != nil {
return -1, err
}
}
}

// Support on-demand socket activation by passing file descriptors into the container init process.
Expand Down

0 comments on commit e3b3789

Please sign in to comment.