Skip to content

Commit

Permalink
refactor: make CRI Stream Server share port(http server) with pouchd
Browse files Browse the repository at this point in the history
Signed-off-by: YaoZengzeng <yaozengzeng@zju.edu.cn>
  • Loading branch information
YaoZengzeng committed Sep 11, 2018
1 parent 7d8d6fc commit fb10eab
Show file tree
Hide file tree
Showing 17 changed files with 328 additions and 75 deletions.
18 changes: 17 additions & 1 deletion apis/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func initRoute(s *Server) http.Handler {
s.addRoute(r, http.MethodDelete, "/volumes/{name:.*}", s.removeVolume)

// network

s.addRoute(r, http.MethodGet, "/networks", s.listNetwork)
s.addRoute(r, http.MethodPost, "/networks/create", s.createNetwork)
s.addRoute(r, http.MethodGet, "/networks/{id:.*}", s.getNetwork)
Expand All @@ -91,6 +90,23 @@ func initRoute(s *Server) http.Handler {
r.Path(versionMatcher + "/metrics").Methods(http.MethodGet).Handler(prometheus.Handler())
r.Path("/metrics").Methods(http.MethodGet).Handler(prometheus.Handler())

// CRI stream server related handlers
if s.StreamRouter != nil {
endpoints := []struct {
path string
handler http.HandlerFunc
}{
{"/exec/{token}", s.StreamRouter.ServeExec},
{"/attach/{token}", s.StreamRouter.ServeAttach},
{"/portforward/{token}", s.StreamRouter.ServePortForward},
}
for _, e := range endpoints {
for _, method := range []string{http.MethodGet, http.MethodPost} {
r.Path(e.path).Methods(method).Handler(e.handler)
}
}
}

if s.Config.Debug || s.Config.EnableProfiler {
profilerSetup(r)
}
Expand Down
2 changes: 2 additions & 0 deletions apis/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/alibaba/pouch/apis/plugins"
"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/httputils"
Expand All @@ -24,6 +25,7 @@ type Server struct {
ImageMgr mgr.ImageMgr
VolumeMgr mgr.VolumeMgr
NetworkMgr mgr.NetworkMgr
StreamRouter stream.Router
listeners []net.Listener
ContainerPlugin plugins.ContainerPlugin
ManagerWhiteList map[string]struct{}
Expand Down
4 changes: 4 additions & 0 deletions cri/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ type Config struct {
SandboxImage string `json:"sandbox-image,omitempty"`
// CriVersion is the cri version
CriVersion string `json:"cri-version,omitempty"`
// StreamServerPort is the port which cri stream server is listening on.
StreamServerPort string `json:"stream-server-port,omitempty"`
// StreamServerReusePort specify whether cri stream server share port with pouchd.
StreamServerReusePort bool
}
51 changes: 35 additions & 16 deletions cri/criservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cri
import (
"fmt"

"github.com/alibaba/pouch/cri/stream"
criv1alpha1 "github.com/alibaba/pouch/cri/v1alpha1"
servicev1alpha1 "github.com/alibaba/pouch/cri/v1alpha1/service"
criv1alpha2 "github.com/alibaba/pouch/cri/v1alpha2"
Expand All @@ -14,32 +15,34 @@ import (
)

// RunCriService start cri service if pouchd is specified with --enable-cri.
func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, volumeMgr mgr.VolumeMgr, stopCh chan error, readyCh chan bool) {
func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, volumeMgr mgr.VolumeMgr, streamRouterCh chan stream.Router, stopCh chan error, readyCh chan bool) {
var err error

defer func() {
stopCh <- err
close(stopCh)
}()
if !daemonconfig.IsCriEnabled {
// the CriService has been disabled, so send Ready
// the CriService has been disabled, so send Ready and empty Stream Router
streamRouterCh <- nil
readyCh <- true
return
}
switch daemonconfig.CriConfig.CriVersion {
case "v1alpha1":
err = runv1alpha1(daemonconfig, containerMgr, imageMgr, readyCh)
err = runv1alpha1(daemonconfig, containerMgr, imageMgr, streamRouterCh, readyCh)
case "v1alpha2":
err = runv1alpha2(daemonconfig, containerMgr, imageMgr, volumeMgr, readyCh)
err = runv1alpha2(daemonconfig, containerMgr, imageMgr, volumeMgr, streamRouterCh, readyCh)
default:
streamRouterCh <- nil
readyCh <- false
err = fmt.Errorf("failed to start CRI service: invalid CRI version %s, expected to be v1alpha1 or v1alpha2", daemonconfig.CriConfig.CriVersion)
}
return
}

// Start CRI service with CRI version: v1alpha1
func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, readyCh chan bool) error {
func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, streamRouterCh chan stream.Router, readyCh chan bool) error {
logrus.Infof("Start CRI service with CRI version: v1alpha1")
criMgr, err := criv1alpha1.NewCriManager(daemonconfig, containerMgr, imageMgr)
if err != nil {
Expand All @@ -54,16 +57,24 @@ func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, ima
}

errChan := make(chan error, 2)
// If the cri stream server share the port with pouchd,
// export the its router. Otherwise launch it.
if daemonconfig.CriConfig.StreamServerReusePort {
errChan = make(chan error, 1)
streamRouterCh <- criMgr.StreamRouter()
} else {
go func() {
errChan <- criMgr.StreamServerStart()
logrus.Infof("CRI Stream server stopped")
}()
streamRouterCh <- nil
}

go func() {
errChan <- service.Serve()
logrus.Infof("CRI GRPC server stopped")
}()

go func() {
errChan <- criMgr.StreamServerStart()
logrus.Infof("CRI Stream server stopped")
}()

// the criservice has set up, send Ready
readyCh <- true

Expand All @@ -79,7 +90,7 @@ func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, ima
}

// Start CRI service with CRI version: v1alpha2
func runv1alpha2(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, volumeMgr mgr.VolumeMgr, readyCh chan bool) error {
func runv1alpha2(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, volumeMgr mgr.VolumeMgr, streamRouterCh chan stream.Router, readyCh chan bool) error {
logrus.Infof("Start CRI service with CRI version: v1alpha2")
criMgr, err := criv1alpha2.NewCriManager(daemonconfig, containerMgr, imageMgr, volumeMgr)
if err != nil {
Expand All @@ -94,16 +105,24 @@ func runv1alpha2(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, ima
}

errChan := make(chan error, 2)
// If the cri stream server share the port with pouchd,
// export the its router. Otherwise launch it.
if daemonconfig.CriConfig.StreamServerReusePort {
errChan = make(chan error, 1)
streamRouterCh <- criMgr.StreamRouter()
} else {
go func() {
errChan <- criMgr.StreamServerStart()
logrus.Infof("CRI Stream server stopped")
}()
streamRouterCh <- nil
}

go func() {
errChan <- service.Serve()
logrus.Infof("CRI GRPC server stopped")
}()

go func() {
errChan <- criMgr.StreamServerStart()
logrus.Infof("CRI Stream server stopped")
}()

// the criservice has set up, send Ready
readyCh <- true

Expand Down
15 changes: 15 additions & 0 deletions cri/stream/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package stream

import (
"net/http"
)

// Router exports a set of CRI Stream Server's handlers.
// We could reuse the pouchd's http server to handle
// the Stream Server's requests, so pouchd only has to
// export one port.
type Router interface {
ServeExec(w http.ResponseWriter, r *http.Request)
ServeAttach(w http.ResponseWriter, r *http.Request)
ServePortForward(w http.ResponseWriter, r *http.Request)
}
26 changes: 22 additions & 4 deletions cri/v1alpha1/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
apitypes "github.com/alibaba/pouch/apis/types"
anno "github.com/alibaba/pouch/cri/annotations"
cni "github.com/alibaba/pouch/cri/ocicni"
"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/errtypes"
Expand Down Expand Up @@ -54,10 +55,8 @@ const (
// nameDelimiter is used to construct pouch container names.
nameDelimiter = "_"

// Address and port of stream server.
// TODO: specify them in the parameters of pouchd.
// Address of stream server.
streamServerAddress = ""
streamServerPort = "10010"

namespaceModeHost = "host"
namespaceModeNone = "none"
Expand Down Expand Up @@ -93,6 +92,9 @@ type CriMgr interface {

// StreamServerStart starts the stream server of CRI.
StreamServerStart() error

// StreamRouter returns the router of Stream Server.
StreamRouter() stream.Router
}

// CriManager is an implementation of interface CriMgr.
Expand Down Expand Up @@ -121,7 +123,18 @@ type CriManager struct {

// NewCriManager creates a brand new cri manager.
func NewCriManager(config *config.Config, ctrMgr mgr.ContainerMgr, imgMgr mgr.ImageMgr) (CriMgr, error) {
streamServer, err := newStreamServer(ctrMgr, streamServerAddress, streamServerPort)
streamServerPort := config.CriConfig.StreamServerPort
// If stream server reuse the pouchd's port, extract the port from pouchd's listening addresses.
if config.CriConfig.StreamServerReusePort {
streamServerPort = extractPortFromAddresses(config.Listen)
if streamServerPort == "" {
return nil, fmt.Errorf("failed to extract stream server's port from pouchd's listening addresses")
}
}

// If the reused pouchd's port is https, the url that stream server return should be with https scheme.
reuseHTTPSPort := config.CriConfig.StreamServerReusePort && config.TLS.Key != "" && config.TLS.Cert != ""
streamServer, err := newStreamServer(ctrMgr, streamServerAddress, streamServerPort, reuseHTTPSPort)
if err != nil {
return nil, fmt.Errorf("failed to create stream server for cri manager: %v", err)
}
Expand Down Expand Up @@ -173,6 +186,11 @@ func (c *CriManager) StreamServerStart() error {
return c.StreamServer.Start()
}

// StreamRouter returns the router of Stream Server.
func (c *CriManager) StreamRouter() stream.Router {
return c.StreamServer
}

// TODO: Move the underlying functions to their respective files in the future.

// Version returns the runtime name, runtime version and runtime API version.
Expand Down
39 changes: 38 additions & 1 deletion cri/v1alpha1/cri_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package v1alpha1
import (
"fmt"
"net"
"net/url"
"strings"

"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/netutils"

"github.com/sirupsen/logrus"
)

func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) {
func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string, reuseHTTPSPort bool) (Server, error) {
if address == "" {
a, err := netutils.ChooseBindAddress(nil)
if err != nil {
Expand All @@ -19,6 +23,39 @@ func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Serv
}
config := stream.DefaultConfig
config.Address = net.JoinHostPort(address, port)
config.BaseURL = &url.URL{
Scheme: "http",
Host: config.Address,
}
if reuseHTTPSPort {
config.BaseURL.Scheme = "https"
}
runtime := stream.NewStreamRuntime(ctrMgr)
return NewServer(config, runtime)
}

// extractPortFromAddresses extract first valid port from addresses.
func extractPortFromAddresses(addresses []string) string {
for _, addr := range addresses {
addrParts := strings.SplitN(addr, "://", 2)
if len(addrParts) != 2 {
logrus.Errorf("invalid listening address %s: must be in format [protocol]://[address]", addr)
continue
}

switch addrParts[0] {
case "tcp":
_, port, err := net.SplitHostPort(addrParts[1])
if err != nil {
logrus.Errorf("failed to split host and port from address: %v", err)
continue
}
return port
case "unix":
continue
default:
logrus.Errorf("only unix socket or tcp address is support")
}
}
return ""
}
37 changes: 37 additions & 0 deletions cri/v1alpha1/cri_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package v1alpha1

import (
"testing"
)

func Test_extractPortFromAddresses(t *testing.T) {
tests := []struct {
name string
args []string
want string
}{
{
name: "listening addresses are nil",
args: nil,
want: "",
},
{
name: "listening addresses have no tcp address",
args: []string{"unix:///var/run/pouchd.sock"},
want: "",
},
{
name: "listening addresses have valid address",
args: []string{"unix:///var/run/pouchd.sock", "tcp://0.0.0.0:4345"},
want: "4345",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := extractPortFromAddresses(tt.args)
if got != tt.want {
t.Errorf("extractPortFromAddresses() = %v, want %v", got, tt.want)
}
})
}
}
7 changes: 7 additions & 0 deletions cri/v1alpha1/cri_wrapper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package v1alpha1

import (
"github.com/alibaba/pouch/cri/stream"

"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
Expand Down Expand Up @@ -29,6 +31,11 @@ func (c *CriWrapper) StreamServerStart() (err error) {
return c.CriManager.StreamServerStart()
}

// StreamRouter returns the router of Stream Server.
func (c *CriWrapper) StreamRouter() stream.Router {
return c.CriManager.StreamRouter()
}

// Version returns the runtime name, runtime version and runtime API version.
func (c *CriWrapper) Version(ctx context.Context, r *runtime.VersionRequest) (res *runtime.VersionResponse, err error) {
logrus.Debugf("Version shows the basic information of cri Manager")
Expand Down
Loading

0 comments on commit fb10eab

Please sign in to comment.