Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: move function getListener() to pkg/net/listener.go #2040

Merged
merged 1 commit into from
Aug 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 2 additions & 73 deletions apis/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@ package server

import (
"crypto/tls"
"fmt"
"net"
"net/http"
"os"
"strings"
"sync"
"syscall"

"github.com/alibaba/pouch/apis/plugins"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/httputils"
"github.com/alibaba/pouch/pkg/user"
"github.com/alibaba/pouch/pkg/netutils"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -60,7 +57,7 @@ func (s *Server) Start(readyCh chan bool) (err error) {
}

for _, one := range s.Config.Listen {
l, err := getListener(one, tlsConfig)
l, err := netutils.GetListener(one, tlsConfig)
if err != nil {
readyCh <- false
return err
Expand Down Expand Up @@ -100,71 +97,3 @@ func (s *Server) Stop() error {
}
return nil
}

func getListener(addr string, tlsConfig *tls.Config) (net.Listener, error) {
addrParts := strings.SplitN(addr, "://", 2)
if len(addrParts) != 2 {
return nil, fmt.Errorf("invalid listening address: %s", addr)
}

switch addrParts[0] {
case "tcp":
l, err := net.Listen("tcp", addrParts[1])
if err != nil {
return l, err
}
if tlsConfig != nil {
l = tls.NewListener(l, tlsConfig)
}
return l, err
case "unix":
return newUnixSocket(addrParts[1])

default:
return nil, fmt.Errorf("only unix socket or tcp address is support")
}
}

func newUnixSocket(path string) (net.Listener, error) {
if err := syscall.Unlink(path); err != nil && !os.IsNotExist(err) {
return nil, err
}
oldmask := syscall.Umask(0777)
defer syscall.Umask(oldmask)
l, err := net.Listen("unix", path)
if err != nil {
return nil, err
}

// chmod unix socket, make other group writable
if err := os.Chmod(path, 0660); err != nil {
l.Close()
return nil, fmt.Errorf("failed to chmod %s: %s", path, err)
}

gid, err := user.ParseID(user.GroupFile, "pouch", func(line, str string, idInt int, idErr error) (uint32, bool) {
var (
name, placeholder string
id int
)

user.ParseString(line, &name, &placeholder, &id)
if str == name {
return uint32(id), true
}
return 0, false
})
if err != nil {
// ignore error when group pouch not exist, group pouch should to be
// created before pouchd started, it means code not create pouch group
logrus.Warnf("failed to find group pouch, cannot change unix socket %s to pouch group", path)
return l, nil
}

// chown unix socket with group pouch
if err := os.Chown(path, 0, int(gid)); err != nil {
l.Close()
return nil, fmt.Errorf("failed to chown %s: %s", path, err)
}
return l, nil
}
4 changes: 2 additions & 2 deletions cri/v1alpha1/cri_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (

"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/daemon/mgr"
pouchnet "github.com/alibaba/pouch/pkg/net"
"github.com/alibaba/pouch/pkg/netutils"
)

func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) {
if address == "" {
a, err := pouchnet.ChooseBindAddress(nil)
a, err := netutils.ChooseBindAddress(nil)
if err != nil {
return nil, fmt.Errorf("failed to get stream server address: %v", err)
}
Expand Down
14 changes: 2 additions & 12 deletions cri/v1alpha1/service/cri.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package service

import (
"net"
"os"
"syscall"

cri "github.com/alibaba/pouch/cri/v1alpha1"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/pkg/netutils"

"google.golang.org/grpc"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
Expand All @@ -27,8 +24,6 @@ func NewService(cfg *config.Config, criMgr cri.CriMgr) (*Service, error) {
criMgr: criMgr,
}

// TODO: Prepare streaming server.

runtime.RegisterRuntimeServiceServer(s.server, s.criMgr)
runtime.RegisterImageServiceServer(s.server, s.criMgr)

Expand All @@ -37,12 +32,7 @@ func NewService(cfg *config.Config, criMgr cri.CriMgr) (*Service, error) {

// Serve starts grpc server.
func (s *Service) Serve() error {
// Unlink to cleanup the previous socket file.
if err := syscall.Unlink(s.config.CriConfig.Listen); err != nil && !os.IsNotExist(err) {
return err
}

l, err := net.Listen("unix", s.config.CriConfig.Listen)
l, err := netutils.GetListener(s.config.CriConfig.Listen, nil)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cri/v1alpha2/cri_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (

"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/daemon/mgr"
pouchnet "github.com/alibaba/pouch/pkg/net"
"github.com/alibaba/pouch/pkg/netutils"
)

func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) {
if address == "" {
a, err := pouchnet.ChooseBindAddress(nil)
a, err := netutils.ChooseBindAddress(nil)
if err != nil {
return nil, fmt.Errorf("failed to get stream server address: %v", err)
}
Expand Down
14 changes: 2 additions & 12 deletions cri/v1alpha2/service/cri.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package service

import (
"net"
"os"
"syscall"

runtime "github.com/alibaba/pouch/cri/apis/v1alpha2"
cri "github.com/alibaba/pouch/cri/v1alpha2"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/pkg/netutils"

"google.golang.org/grpc"
)
Expand All @@ -27,8 +24,6 @@ func NewService(cfg *config.Config, criMgr cri.CriMgr) (*Service, error) {
criMgr: criMgr,
}

// TODO: Prepare streaming server.

runtime.RegisterRuntimeServiceServer(s.server, s.criMgr)
runtime.RegisterImageServiceServer(s.server, s.criMgr)

Expand All @@ -37,12 +32,7 @@ func NewService(cfg *config.Config, criMgr cri.CriMgr) (*Service, error) {

// Serve starts grpc server.
func (s *Service) Serve() error {
// Unlink to cleanup the previous socket file.
if err := syscall.Unlink(s.config.CriConfig.Listen); err != nil && !os.IsNotExist(err) {
return err
}

l, err := net.Listen("unix", s.config.CriConfig.Listen)
l, err := netutils.GetListener(s.config.CriConfig.Listen, nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func setupFlags(cmd *cobra.Command) {
flagSet.StringArrayVarP(&cfg.Listen, "listen", "l", []string{"unix:///var/run/pouchd.sock"}, "Specify listening addresses of Pouchd")
flagSet.BoolVar(&cfg.IsCriEnabled, "enable-cri", false, "Specify whether enable the cri part of pouchd which is used to support Kubernetes")
flagSet.StringVar(&cfg.CriConfig.CriVersion, "cri-version", "v1alpha2", "Specify the version of cri which is used to support Kubernetes")
flagSet.StringVar(&cfg.CriConfig.Listen, "listen-cri", "/var/run/pouchcri.sock", "Specify listening address of CRI")
flagSet.StringVar(&cfg.CriConfig.Listen, "listen-cri", "unix:///var/run/pouchcri.sock", "Specify listening address of CRI")
flagSet.StringVar(&cfg.CriConfig.NetworkPluginBinDir, "cni-bin-dir", "/opt/cni/bin", "The directory for putting cni plugin binaries.")
flagSet.StringVar(&cfg.CriConfig.NetworkPluginConfDir, "cni-conf-dir", "/etc/cni/net.d", "The directory for putting cni plugin configuration files.")
flagSet.StringVar(&cfg.CriConfig.SandboxImage, "sandbox-image", "registry.cn-hangzhou.aliyuncs.com/google-containers/pause-amd64:3.0", "The image used by sandbox container.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/net/doc.go → pkg/netutils/doc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package net
package netutils

// NOTE: The code in this package is directly copy from package "k8s.io/apimachinery/pkg/util/net".
// We do this because we don't want to vendor too many irrelevant packages and try to make
Expand Down
2 changes: 1 addition & 1 deletion pkg/net/interface.go → pkg/netutils/interface.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package net
package netutils

import (
"bufio"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package net
package netutils

import (
"fmt"
Expand Down
83 changes: 83 additions & 0 deletions pkg/netutils/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package netutils

import (
"crypto/tls"
"fmt"
"net"
"os"
"strings"
"syscall"

"github.com/alibaba/pouch/pkg/user"

"github.com/sirupsen/logrus"
)

// GetListener get a listener for an address.
func GetListener(addr string, tlsConfig *tls.Config) (net.Listener, error) {
addrParts := strings.SplitN(addr, "://", 2)
if len(addrParts) != 2 {
return nil, fmt.Errorf("invalid listening address %s: must be in format [protocol]://[address]", addr)
}

switch addrParts[0] {
case "tcp":
l, err := net.Listen("tcp", addrParts[1])
if err != nil {
return l, err
}
if tlsConfig != nil {
l = tls.NewListener(l, tlsConfig)
}
return l, err
case "unix":
return newUnixSocket(addrParts[1])

default:
return nil, fmt.Errorf("only unix socket or tcp address is support")
}
}

func newUnixSocket(path string) (net.Listener, error) {
if err := syscall.Unlink(path); err != nil && !os.IsNotExist(err) {
return nil, err
}
oldmask := syscall.Umask(0777)
defer syscall.Umask(oldmask)
l, err := net.Listen("unix", path)
if err != nil {
return nil, err
}

// chmod unix socket, make other group writable
if err := os.Chmod(path, 0660); err != nil {
l.Close()
return nil, fmt.Errorf("failed to chmod %s: %s", path, err)
}

gid, err := user.ParseID(user.GroupFile, "pouch", func(line, str string, idInt int, idErr error) (uint32, bool) {
var (
name, placeholder string
id int
)

user.ParseString(line, &name, &placeholder, &id)
if str == name {
return uint32(id), true
}
return 0, false
})
if err != nil {
// ignore error when group pouch not exist, group pouch should to be
// created before pouchd started, it means code not create pouch group
logrus.Warnf("failed to find group pouch, cannot change unix socket %s to pouch group", path)
return l, nil
}

// chown unix socket with group pouch
if err := os.Chown(path, 0, int(gid)); err != nil {
l.Close()
return nil, fmt.Errorf("failed to chown %s: %s", path, err)
}
return l, nil
}
51 changes: 51 additions & 0 deletions pkg/netutils/listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package netutils

import (
"fmt"
"reflect"
"testing"
)

func TestGetListenerBasic(t *testing.T) {
type args struct {
addr string
}
tests := []struct {
name string
args args
wantErr error
}{
{
"tcpAddressTest",
args{"tcp://127.0.0.1:12345"},
nil,
},
{
"unixAddressTest",
args{"unix:///tmp/pouchtest.sock"},
nil,
},
{
"otherProtocolTest",
args{"udp://127.0.0.1:12345"},
fmt.Errorf("only unix socket or tcp address is support"),
},
{
"invalidAddressTest",
args{"invalid address"},
fmt.Errorf("invalid listening address invalid address: must be in format [protocol]://[address]"),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l, err := GetListener(tt.args.addr, nil)
if !reflect.DeepEqual(err, tt.wantErr) {
t.Errorf("GetListener() return error %v, want %v", err, tt.wantErr)
}
if err == nil {
l.Close()
}
})
}
}
6 changes: 3 additions & 3 deletions test/z_cli_daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,14 +389,14 @@ func (suite *PouchDaemonSuite) TestDaemonDefaultRegistry(c *check.C) {
defer dcfg.KillDaemon()
}

// TestDaemonCriEnbaled tests enabling cri part in pouchd.
func (suite *PouchDaemonSuite) TestDaemonCriEnbaled(c *check.C) {
// TestDaemonCriEnabled tests enabling cri part in pouchd.
func (suite *PouchDaemonSuite) TestDaemonCriEnabled(c *check.C) {
dcfg, err := StartDefaultDaemonDebug(
"--enable-cri")
c.Assert(err, check.IsNil)

result := RunWithSpecifiedDaemon(dcfg, "info")
err = util.PartialEqual(result.Combined(), "CriEnabled: true")
err = util.PartialEqual(result.Combined(), "CriEnabled: true")
c.Assert(err, check.IsNil)

defer dcfg.KillDaemon()
Expand Down