Skip to content

Commit

Permalink
Don't use named pipes in windows.
Browse files Browse the repository at this point in the history
Microsoft's Go implementation of named pipes sometimes causes the
Telepresence daemons to hang indefinitely. This is the culprit:

microsoft/go-winio#281

This commit removes the use of named pipes (once implemented due to
buggy Go implementation of unix sockets), in favor of using the same
socket implementation (almost) as we do on other platforms.

Signed-off-by: Thomas Hallgren <thomas@datawire.io>
  • Loading branch information
thallgren committed Mar 30, 2023
1 parent a588b6d commit e596761
Show file tree
Hide file tree
Showing 15 changed files with 196 additions and 146 deletions.
2 changes: 1 addition & 1 deletion integration_test/itest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s *cluster) ensureQuit(ctx context.Context) {
_, _, _ = Telepresence(ctx, "quit", "-s") //nolint:dogsled // don't care about any of the returns

// Ensure that the daemon-socket is non-existent.
_ = rmAsRoot(socket.DaemonName)
_ = rmAsRoot(socket.RootDaemonPath(ctx))
}

func (s *cluster) ensureExecutable(ctx context.Context, errs chan<- error, wg *sync.WaitGroup) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/cli/cmd/quit.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func quit() *cobra.Command {
if quitDaemons && daemon.GetUserClient(ctx) == nil {
// User daemon isn't running. If the root daemon is running, we must
// kill it from here.
if conn, err := socket.Dial(ctx, socket.DaemonName); err == nil {
if conn, err := socket.Dial(ctx, socket.RootDaemonPath(ctx)); err == nil {
_, _ = daemon2.NewDaemonClient(conn).Quit(ctx, &emptypb.Empty{})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/cli/cmd/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func printVersion(cmd *cobra.Command, _ []string) error {
}

func daemonVersion(ctx context.Context) (*common.VersionInfo, error) {
if conn, err := socket.Dial(ctx, socket.DaemonName); err == nil {
if conn, err := socket.Dial(ctx, socket.RootDaemonPath(ctx)); err == nil {
defer conn.Close()
return daemonRpc.NewDaemonClient(conn).Version(ctx, &empty.Empty{})
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/client/cli/connect/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func UserDaemonDisconnect(ctx context.Context, quitDaemons bool) (err error) {
// Disconnect is not implemented so daemon predates 2.4.9. Force a quit
}
if _, err = ud.Quit(ctx, &emptypb.Empty{}); err == nil || status.Code(err) == codes.Unavailable {
err = socket.WaitUntilVanishes("user daemon", socket.ConnectorName, 5*time.Second)
err = socket.WaitUntilVanishes("user daemon", socket.UserDaemonPath(ctx), 5*time.Second)
}
if err != nil && status.Code(err) == codes.Unavailable {
if quitDaemons {
Expand Down Expand Up @@ -88,7 +88,7 @@ func RunConnect(cmd *cobra.Command, args []string) error {

func launchConnectorDaemon(ctx context.Context, connectorDaemon string, required bool) (*daemon.UserClient, error) {
cr := daemon.GetRequest(ctx)
conn, err := socket.Dial(ctx, socket.ConnectorName)
conn, err := socket.Dial(ctx, socket.UserDaemonPath(ctx))
if err == nil {
if cr.Docker {
return nil, errcat.User.New("option --docker cannot be used as long as a daemon is running on the host. Try telepresence quit -s")
Expand Down Expand Up @@ -137,10 +137,10 @@ func launchConnectorDaemon(ctx context.Context, connectorDaemon string, required
if err = proc.StartInBackground(false, args...); err != nil {
return nil, errcat.NoDaemonLogs.Newf("failed to launch the connector service: %w", err)
}
if err = socket.WaitUntilAppears("connector", socket.ConnectorName, 10*time.Second); err != nil {
if err = socket.WaitUntilAppears("connector", socket.UserDaemonPath(ctx), 10*time.Second); err != nil {
return nil, errcat.NoDaemonLogs.Newf("connector service did not start: %w", err)
}
conn, err = socket.Dial(ctx, socket.ConnectorName)
conn, err = socket.Dial(ctx, socket.UserDaemonPath(ctx))
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/client/cli/connect/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ func ensureRootDaemonRunning(ctx context.Context) error {
// Always assume that root daemon is running when a user daemon address is provided
return nil
}
running, err := socket.IsRunning(ctx, socket.DaemonName)
running, err := socket.IsRunning(ctx, socket.RootDaemonPath(ctx))
if err != nil || running {
return err
}
if err = launchDaemon(ctx, cr); err != nil {
return fmt.Errorf("failed to launch the daemon service: %w", err)
}
if err = socket.WaitUntilRunning(ctx, "daemon", socket.DaemonName, 10*time.Second); err != nil {
if err = socket.WaitUntilRunning(ctx, "daemon", socket.RootDaemonPath(ctx), 10*time.Second); err != nil {
return fmt.Errorf("daemon service did not start: %w", err)
}
return nil
Expand All @@ -95,9 +95,9 @@ func Disconnect(ctx context.Context, quitDaemons bool) error {
if quitDaemons {
// User daemon is responsible for killing the root daemon, but we kill it here too to cater for
// the fact that the user daemon might have been killed ungracefully.
if err = socket.WaitUntilVanishes("root daemon", socket.DaemonName, 5*time.Second); err != nil {
if err = socket.WaitUntilVanishes("root daemon", socket.RootDaemonPath(ctx), 5*time.Second); err != nil {
var conn *grpc.ClientConn
if conn, err = socket.Dial(ctx, socket.DaemonName); err == nil {
if conn, err = socket.Dial(ctx, socket.RootDaemonPath(ctx)); err == nil {
if _, err = rpc.NewDaemonClient(conn).Quit(ctx, &empty.Empty{}); err != nil {
err = fmt.Errorf("error when quitting root daemon: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/rootd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func run(cmd *cobra.Command, args []string) error {
// Listen on domain unix domain socket or windows named pipe. The listener must be opened
// before other tasks because the CLI client will only wait for a short period of time for
// the socket/pipe to appear before it gives up.
grpcListener, err := socket.Listen(c, ProcessName, socket.DaemonName)
grpcListener, err := socket.Listen(c, ProcessName, socket.RootDaemonPath(c))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/rootd/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func connectToUserDaemon(c context.Context) (*grpc.ClientConn, connector.Manager
defer cancel()

var conn *grpc.ClientConn
conn, err := socket.Dial(tc, socket.ConnectorName,
conn, err := socket.Dial(tc, socket.UserDaemonPath(c),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
)
Expand Down
28 changes: 24 additions & 4 deletions pkg/client/socket/sockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,42 @@ import (
"google.golang.org/grpc"
)

// UserDaemonPath is the path used when communicating to the user daemon process.
func UserDaemonPath(ctx context.Context) string {
return userDaemonPath(ctx)
}

// RootDaemonPath is the path used when communicating to the root daemon process.
func RootDaemonPath(ctx context.Context) string {
return rootDaemonPath(ctx)
}

// Dial dials the given socket and returns the resulting connection.
func Dial(ctx context.Context, socketName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return dial(ctx, socketName, opts...)
}

// Listen returns a listener for the given socket and returns the resulting connection.
func Listen(ctx context.Context, processName, socketName string) (net.Listener, error) {
return listen(ctx, processName, socketName)
listener, err := net.Listen("unix", socketName)
if err != nil {
if err != nil {
err = fmt.Errorf("socket %q exists so the %s is either already running or terminated ungracefully: %T, %w", socketName, processName, err, err)
}
return nil, err
}
// Don't have dhttp.ServerConfig.Serve unlink the socket; defer unlinking the socket
// until the process exits.
listener.(*net.UnixListener).SetUnlinkOnClose(false)
return listener, nil
}

// RemoveSocket removes any representation of the socket from the filesystem.
// Remove removes any representation of the socket from the filesystem.
func Remove(listener net.Listener) error {
return remove(listener)
return os.Remove(listener.Addr().String())
}

// SocketExists returns true if a socket is found with the given name.
// Exists returns true if a socket is found with the given name.
func Exists(name string) (bool, error) {
return exists(name)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
//go:build !windows
// +build !windows

package socket_test

import (
Expand All @@ -11,6 +8,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/datawire/dlib/dgroup"
Expand All @@ -20,9 +18,8 @@ import (
)

func TestDialSocket(t *testing.T) {
tmpdir := t.TempDir()
t.Run("OK", func(t *testing.T) {
sockname := filepath.Join(tmpdir, "ok.sock")
sockname := filepath.Join(t.TempDir(), "ok.sock")
listener, err := net.Listen("unix", sockname)
if !assert.NoError(t, err) {
return
Expand Down Expand Up @@ -55,7 +52,7 @@ func TestDialSocket(t *testing.T) {
assert.NoError(t, grp.Wait())
})
t.Run("Hang", func(t *testing.T) {
sockname := filepath.Join(tmpdir, "hang.sock")
sockname := filepath.Join(t.TempDir(), "hang.sock")
listener, err := net.Listen("unix", sockname)
if !assert.NoError(t, err) {
return
Expand All @@ -72,7 +69,7 @@ func TestDialSocket(t *testing.T) {
assert.Contains(t, err.Error(), "this usually means that the process has locked up")
})
t.Run("Orphan", func(t *testing.T) {
sockname := filepath.Join(tmpdir, "orphan.sock")
sockname := filepath.Join(t.TempDir(), "orphan.sock")
listener, err := net.Listen("unix", sockname)
if !assert.NoError(t, err) {
return
Expand All @@ -83,15 +80,15 @@ func TestDialSocket(t *testing.T) {
ctx := dlog.NewTestContext(t, false)
conn, err := socket.Dial(ctx, sockname)
assert.Nil(t, conn)
assert.Error(t, err)
require.Error(t, err)
t.Log(err)
assert.ErrorIs(t, err, os.ErrNotExist)
assert.Contains(t, err.Error(), "dial unix "+sockname)
assert.Contains(t, err.Error(), "this usually means that the process is not running")
})
t.Run("NotExist", func(t *testing.T) {
ctx := dlog.NewTestContext(t, false)
sockname := filepath.Join(tmpdir, "not-exist.sock")
sockname := filepath.Join(t.TempDir(), "not-exist.sock")
conn, err := socket.Dial(ctx, sockname)
assert.Nil(t, conn)
assert.Error(t, err)
Expand Down
38 changes: 8 additions & 30 deletions pkg/client/socket/sockets_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ import (
"golang.org/x/sys/unix"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/telepresenceio/telepresence/v2/pkg/proc"
)

const (
// ConnectorName is the path used when communicating to the connector process.
ConnectorName = "/tmp/telepresence-connector.socket"
// userDaemonPath is the path used when communicating to the user daemon process.
func userDaemonUnixSocket(ctx context.Context) string {
return "/tmp/telepresence-connector.socket"
}

// DaemonName is the path used when communicating to the daemon process.
DaemonName = "/var/run/telepresence-daemon.socket"
)
// rootDaemonPath is the path used when communicating to the root daemon process.
func rootDaemonUnixSocket(ctx context.Context) string {
return "/var/run/telepresence-daemon.socket"
}

func dial(ctx context.Context, socketName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // FIXME(lukeshu): Make this configurable
Expand Down Expand Up @@ -77,28 +77,6 @@ func dial(ctx context.Context, socketName string, opts ...grpc.DialOption) (*grp
}
}

func listen(_ context.Context, processName, socketName string) (net.Listener, error) {
if proc.IsAdmin() {
origUmask := unix.Umask(0)
defer unix.Umask(origUmask)
}
listener, err := net.Listen("unix", socketName)
if err != nil {
if errors.Is(err, unix.EADDRINUSE) {
err = fmt.Errorf("socket %q exists so the %s is either already running or terminated ungracefully", socketName, processName)
}
return nil, err
}
// Don't have dhttp.ServerConfig.Serve unlink the socket; defer unlinking the socket
// until the process exits.
listener.(*net.UnixListener).SetUnlinkOnClose(false)
return listener, nil
}

func remove(listener net.Listener) error {
return os.Remove(listener.Addr().String())
}

// exists returns true if a socket is found at the given path.
func exists(path string) (bool, error) {
s, err := os.Stat(path)
Expand Down
Loading

0 comments on commit e596761

Please sign in to comment.