Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sd-notify: do not hang when NOTIFY_SOCKET is used with create #1807

Merged
merged 1 commit into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 93 additions & 39 deletions notify_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"fmt"
"net"
"os"
"path"
"path/filepath"
"strconv"
"time"

"github.com/opencontainers/runc/libcontainer"
"github.com/opencontainers/runtime-spec/specs-go"

"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)

Expand All @@ -27,12 +29,12 @@ func newNotifySocket(context *cli.Context, notifySocketHost string, id string) *
}

root := filepath.Join(context.GlobalString("root"), id)
path := filepath.Join(root, "notify.sock")
socketPath := filepath.Join(root, "notify", "notify.sock")

notifySocket := &notifySocket{
socket: nil,
host: notifySocketHost,
socketPath: path,
socketPath: socketPath,
}

return notifySocket
Expand All @@ -44,13 +46,19 @@ func (s *notifySocket) Close() error {

// If systemd is supporting sd_notify protocol, this function will add support
// for sd_notify protocol from within the container.
func (s *notifySocket) setupSpec(context *cli.Context, spec *specs.Spec) {
mount := specs.Mount{Destination: s.host, Source: s.socketPath, Options: []string{"bind"}}
func (s *notifySocket) setupSpec(context *cli.Context, spec *specs.Spec) error {
pathInContainer := filepath.Join("/run/notify", path.Base(s.socketPath))
mount := specs.Mount{
Destination: path.Dir(pathInContainer),
Source: path.Dir(s.socketPath),
Options: []string{"bind", "nosuid", "noexec", "nodev", "ro"},
}
spec.Mounts = append(spec.Mounts, mount)
spec.Process.Env = append(spec.Process.Env, fmt.Sprintf("NOTIFY_SOCKET=%s", s.host))
spec.Process.Env = append(spec.Process.Env, fmt.Sprintf("NOTIFY_SOCKET=%s", pathInContainer))
return nil
}

func (s *notifySocket) setupSocket() error {
func (s *notifySocket) bindSocket() error {
addr := net.UnixAddr{
Name: s.socketPath,
Net: "unixgram",
Expand All @@ -71,46 +79,92 @@ func (s *notifySocket) setupSocket() error {
return nil
}

// pid1 must be set only with -d, as it is used to set the new process as the main process
// for the service in systemd
func (s *notifySocket) run(pid1 int) {
buf := make([]byte, 512)
notifySocketHostAddr := net.UnixAddr{Name: s.host, Net: "unixgram"}
func (s *notifySocket) setupSocketDirectory() error {
return os.Mkdir(path.Dir(s.socketPath), 0755)
}

func notifySocketStart(context *cli.Context, notifySocketHost, id string) (*notifySocket, error) {
notifySocket := newNotifySocket(context, notifySocketHost, id)
if notifySocket == nil {
return nil, nil
}

if err := notifySocket.bindSocket(); err != nil {
return nil, err
}
return notifySocket, nil
}

func (n *notifySocket) waitForContainer(container libcontainer.Container) error {
s, err := container.State()
if err != nil {
return err
}
return n.run(s.InitProcessPid)
}

func (n *notifySocket) run(pid1 int) error {
if n.socket == nil {
return nil
}
notifySocketHostAddr := net.UnixAddr{Name: n.host, Net: "unixgram"}
client, err := net.DialUnix("unixgram", nil, &notifySocketHostAddr)
if err != nil {
logrus.Error(err)
return
return err
}
for {
r, err := s.socket.Read(buf)
if err != nil {
break
}
var out bytes.Buffer
for _, line := range bytes.Split(buf[0:r], []byte{'\n'}) {
if bytes.HasPrefix(line, []byte("READY=")) {
_, err = out.Write(line)
if err != nil {
return
}

_, err = out.Write([]byte{'\n'})
if err != nil {
return
}
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()

_, err = client.Write(out.Bytes())
if err != nil {
fileChan := make(chan []byte)
go func() {
for {
buf := make([]byte, 4096)
r, err := n.socket.Read(buf)
if err != nil {
return
}
got := buf[0:r]
// systemd-ready sends a single datagram with the state string as payload,
// so we don't need to worry about partial messages.
for _, line := range bytes.Split(got, []byte{'\n'}) {
giuseppe marked this conversation as resolved.
Show resolved Hide resolved
if bytes.HasPrefix(got, []byte("READY=")) {
fileChan <- line
return
}
}

// now we can inform systemd to use pid1 as the pid to monitor
if pid1 > 0 {
newPid := fmt.Sprintf("MAINPID=%d\n", pid1)
client.Write([]byte(newPid))
}
return
}
}()

for {
select {
case <-ticker.C:
_, err := os.Stat(filepath.Join("/proc", strconv.Itoa(pid1)))
giuseppe marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil
}
case b := <-fileChan:
var out bytes.Buffer
_, err = out.Write(b)
if err != nil {
return err
}

_, err = out.Write([]byte{'\n'})
if err != nil {
return err
}

_, err = client.Write(out.Bytes())
if err != nil {
return err
}

// now we can inform systemd to use pid1 as the pid to monitor
newPid := fmt.Sprintf("MAINPID=%d\n", pid1)
client.Write([]byte(newPid))
return nil
}
}
}
4 changes: 1 addition & 3 deletions signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (h *signalHandler) forward(process *libcontainer.Process, tty *tty, detach
h.notifySocket.run(pid1)
return 0, nil
}
h.notifySocket.run(os.Getpid())
go h.notifySocket.run(0)
}

Expand Down Expand Up @@ -97,9 +98,6 @@ func (h *signalHandler) forward(process *libcontainer.Process, tty *tty, detach
// status because we must ensure that any of the go specific process
// fun such as flushing pipes are complete before we return.
process.Wait()
if h.notifySocket != nil {
h.notifySocket.Close()
}
return e.status, nil
}
}
Expand Down
13 changes: 12 additions & 1 deletion start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"errors"
"fmt"
"os"

"github.com/opencontainers/runc/libcontainer"
"github.com/urfave/cli"
Expand Down Expand Up @@ -31,7 +32,17 @@ your host.`,
}
switch status {
case libcontainer.Created:
return container.Exec()
notifySocket, err := notifySocketStart(context, os.Getenv("NOTIFY_SOCKET"), container.ID())
if err != nil {
return err
}
if err := container.Exec(); err != nil {
return err
}
if notifySocket != nil {
return notifySocket.waitForContainer(container)
}
return nil
case libcontainer.Stopped:
return errors.New("cannot start a container that has stopped")
case libcontainer.Running:
Expand Down
12 changes: 10 additions & 2 deletions utils_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,9 @@ func startContainer(context *cli.Context, spec *specs.Spec, action CtAct, criuOp

notifySocket := newNotifySocket(context, os.Getenv("NOTIFY_SOCKET"), id)
if notifySocket != nil {
notifySocket.setupSpec(context, spec)
if err := notifySocket.setupSpec(context, spec); err != nil {
return -1, err
}
}

container, err := createContainer(context, id, spec)
Expand All @@ -417,10 +419,16 @@ func startContainer(context *cli.Context, spec *specs.Spec, action CtAct, criuOp
}

if notifySocket != nil {
err := notifySocket.setupSocket()
err := notifySocket.setupSocketDirectory()
if err != nil {
return -1, err
}
if action == CT_ACT_RUN {
err := notifySocket.bindSocket()
if err != nil {
return -1, err
}
}
}

// Support on-demand socket activation by passing file descriptors into the container init process.
Expand Down