diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 5ca1ae005..385c7dcba 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -43,6 +43,7 @@ Runrioter Wung Sam Xie skoo skoo87 +soarpenguin Starnop Tao Qingyun <845767657@qq.com> Tiramisu 1993 diff --git a/apis/swagger.yml b/apis/swagger.yml index ac3aa9147..5aa316cb4 100644 --- a/apis/swagger.yml +++ b/apis/swagger.yml @@ -2894,7 +2894,10 @@ definitions: AppArmorProfile: type: "string" ExecIDs: - type: "string" + description: "exec ids of container" + type: "array" + items: + type: "string" HostConfig: $ref: "#/definitions/HostConfig" SizeRw: diff --git a/apis/types/container_json.go b/apis/types/container_json.go index 20671e588..20d0372a9 100644 --- a/apis/types/container_json.go +++ b/apis/types/container_json.go @@ -34,8 +34,8 @@ type ContainerJSON struct { // driver Driver string `json:"Driver,omitempty"` - // exec ids - ExecIds string `json:"ExecIDs,omitempty"` + // exec ids of container + ExecIds []string `json:"ExecIDs"` // graph driver GraphDriver *GraphDriverData `json:"GraphDriver,omitempty"` @@ -159,6 +159,11 @@ func (m *ContainerJSON) Validate(formats strfmt.Registry) error { res = append(res, err) } + if err := m.validateExecIds(formats); err != nil { + // prop + res = append(res, err) + } + if err := m.validateGraphDriver(formats); err != nil { // prop res = append(res, err) @@ -218,6 +223,15 @@ func (m *ContainerJSON) validateConfig(formats strfmt.Registry) error { return nil } +func (m *ContainerJSON) validateExecIds(formats strfmt.Registry) error { + + if swag.IsZero(m.ExecIds) { // not required + return nil + } + + return nil +} + func (m *ContainerJSON) validateGraphDriver(formats strfmt.Registry) error { if swag.IsZero(m.GraphDriver) { // not required diff --git a/cli/exec.go b/cli/exec.go index 4ff2961c8..8dff3d7cb 100644 --- a/cli/exec.go +++ b/cli/exec.go @@ -138,7 +138,10 @@ func (e *ExecCommand) runExec(args []string) error { } // execExample shows examples in exec command, and is used in auto-generated cli docs. -// TODO: add example func execExample() string { - return "" + return `$ pouch exec -it 25bf50 ps +PID USER TIME COMMAND + 1 root 0:00 /bin/sh + 38 root 0:00 ps +` } diff --git a/cli/top.go b/cli/top.go index 1137407e3..1ac495e2f 100644 --- a/cli/top.go +++ b/cli/top.go @@ -62,7 +62,7 @@ func (top *TopCommand) runTop(args []string) error { // topExamples shows examples in top command, and is used in auto-generated cli docs. func topExamples() string { return `$ pouch top 44f675 - UID PID PPID C STIME TTY TIME CMD - root 28725 28714 0 3月14 ? 00:00:00 sh - ` +UID PID PPID C STIME TTY TIME CMD +root 28725 28714 0 3月14 ? 00:00:00 sh +` } diff --git a/cri/stream/request_cache.go b/cri/stream/request_cache.go index d4cdccc30..31a468e49 100644 --- a/cri/stream/request_cache.go +++ b/cri/stream/request_cache.go @@ -19,7 +19,7 @@ var ( TokenLen = 8 ) -// requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use +// RequestCache caches streaming (exec/attach/port-forward) requests and generates a single-use // random token for their retrieval. The requestCache is used for building streaming URLs without // the need to encode every request parameter in the URL. type RequestCache struct { @@ -31,15 +31,16 @@ type RequestCache struct { lock sync.Mutex } -// Type representing an *ExecRequest, *AttachRequest, or *PortForwardRequest. -type request interface{} +// Request representing an *ExecRequest, *AttachRequest, or *PortForwardRequest Type. +type Request interface{} type cacheEntry struct { token string - req request + req Request expireTime time.Time } +// NewRequestCache return a RequestCache func NewRequestCache() *RequestCache { return &RequestCache{ ll: list.New(), @@ -48,7 +49,7 @@ func NewRequestCache() *RequestCache { } // Insert the given request into the cache and returns the token used for fetching it out. -func (c *RequestCache) Insert(req request) (token string, err error) { +func (c *RequestCache) Insert(req Request) (token string, err error) { c.lock.Lock() defer c.lock.Unlock() @@ -69,7 +70,7 @@ func (c *RequestCache) Insert(req request) (token string, err error) { } // Consume the token (remove it from the cache) and return the cached request, if found. -func (c *RequestCache) Consume(token string) (req request, found bool) { +func (c *RequestCache) Consume(token string) (req Request, found bool) { c.lock.Lock() defer c.lock.Unlock() ele, ok := c.tokens[token] diff --git a/cri/v1alpha1/cri.go b/cri/v1alpha1/cri.go index 57ee4527b..36e276796 100644 --- a/cri/v1alpha1/cri.go +++ b/cri/v1alpha1/cri.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "os" "path" "path/filepath" @@ -695,55 +696,74 @@ func (c *CriManager) UpdateContainerResources(ctx context.Context, r *runtime.Up // ExecSync executes a command in the container, and returns the stdout output. // If command exits with a non-zero exit code, an error is returned. func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) { - // TODO: handle timeout. id := r.GetContainerId() + timeout := time.Duration(r.GetTimeout()) * time.Second + var cancel context.CancelFunc + if timeout == 0 { + ctx, cancel = context.WithCancel(ctx) + } else { + ctx, cancel = context.WithTimeout(ctx, timeout) + } + defer cancel() + createConfig := &apitypes.ExecCreateConfig{ Cmd: r.GetCmd(), } - execid, err := c.ContainerMgr.CreateExec(ctx, id, createConfig) if err != nil { return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err) } - var output bytes.Buffer - startConfig := &apitypes.ExecStartConfig{} + reader, writer := io.Pipe() + defer writer.Close() + attachConfig := &mgr.AttachConfig{ Stdout: true, Stderr: true, - MemBuffer: &output, + Pipe: writer, MuxDisabled: true, } + startConfig := &apitypes.ExecStartConfig{} + err = c.ContainerMgr.StartExec(ctx, execid, startConfig, attachConfig) if err != nil { return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err) } - var execConfig *mgr.ContainerExecConfig - for { - execConfig, err = c.ContainerMgr.GetExecConfig(ctx, execid) + readWaitCh := make(chan error, 1) + var recv bytes.Buffer + go func() { + defer reader.Close() + _, err = io.Copy(&recv, reader) + readWaitCh <- err + }() + + select { + case <-ctx.Done(): + //TODO maybe stop the execution? + return nil, fmt.Errorf("timeout %v exceeded", timeout) + case readWaitErr := <-readWaitCh: + if readWaitErr != nil { + return nil, fmt.Errorf("failed to read data from the pipe: %v", err) + } + execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid) if err != nil { return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err) } - // Loop until exec finished. - if !execConfig.Running { - break + + var stderr []byte + if execConfig.Error != nil { + stderr = []byte(execConfig.Error.Error()) } - time.Sleep(100 * time.Millisecond) - } - var stderr []byte - if execConfig.Error != nil { - stderr = []byte(execConfig.Error.Error()) + return &runtime.ExecSyncResponse{ + Stdout: recv.Bytes(), + Stderr: stderr, + ExitCode: int32(execConfig.ExitCode), + }, nil } - - return &runtime.ExecSyncResponse{ - Stdout: output.Bytes(), - Stderr: stderr, - ExitCode: int32(execConfig.ExitCode), - }, nil } // Exec prepares a streaming endpoint to execute a command in the container, and returns the address. diff --git a/cri/v1alpha1/cri_utils_test.go b/cri/v1alpha1/cri_utils_test.go index f26fa0dcc..3dd5a9268 100644 --- a/cri/v1alpha1/cri_utils_test.go +++ b/cri/v1alpha1/cri_utils_test.go @@ -282,7 +282,7 @@ func Test_makeSandboxPouchConfig(t *testing.T) { want *apitypes.ContainerCreateConfig wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -342,7 +342,7 @@ func Test_toCriSandbox(t *testing.T) { want *runtime.PodSandbox wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -558,7 +558,7 @@ func Test_makeContainerName(t *testing.T) { args args want string }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -579,7 +579,7 @@ func Test_modifyContainerNamespaceOptions(t *testing.T) { name string args args }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -600,7 +600,7 @@ func Test_applyContainerSecurityContext(t *testing.T) { args args wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -628,7 +628,7 @@ func TestCriManager_updateCreateConfig(t *testing.T) { args args wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -653,7 +653,7 @@ func Test_toCriContainer(t *testing.T) { want *runtime.Container wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -680,7 +680,7 @@ func Test_imageToCriImage(t *testing.T) { want *runtime.Image wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -711,7 +711,7 @@ func TestCriManager_ensureSandboxImageExists(t *testing.T) { args args wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -736,7 +736,7 @@ func Test_getUserFromImageUser(t *testing.T) { want *int64 want1 string }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -760,7 +760,7 @@ func Test_parseUserFromImageUser(t *testing.T) { args args want string }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/cri/v1alpha1/service/cri.go b/cri/v1alpha1/service/cri.go index eca59068a..f5c327bc6 100644 --- a/cri/v1alpha1/service/cri.go +++ b/cri/v1alpha1/service/cri.go @@ -10,7 +10,6 @@ import ( "google.golang.org/grpc" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" - ) // Service serves the kubelet runtime grpc api which will be consumed by kubelet. diff --git a/cri/v1alpha2/cri.go b/cri/v1alpha2/cri.go index 8ec4f59e2..4a6a5f4e9 100644 --- a/cri/v1alpha2/cri.go +++ b/cri/v1alpha2/cri.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "os" "path" "path/filepath" @@ -709,55 +710,74 @@ func (c *CriManager) ReopenContainerLog(ctx context.Context, r *runtime.ReopenCo // ExecSync executes a command in the container, and returns the stdout output. // If command exits with a non-zero exit code, an error is returned. func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) { - // TODO: handle timeout. id := r.GetContainerId() + timeout := time.Duration(r.GetTimeout()) * time.Second + var cancel context.CancelFunc + if timeout == 0 { + ctx, cancel = context.WithCancel(ctx) + } else { + ctx, cancel = context.WithTimeout(ctx, timeout) + } + defer cancel() + createConfig := &apitypes.ExecCreateConfig{ Cmd: r.GetCmd(), } - execid, err := c.ContainerMgr.CreateExec(ctx, id, createConfig) if err != nil { return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err) } - var output bytes.Buffer - startConfig := &apitypes.ExecStartConfig{} + reader, writer := io.Pipe() + defer writer.Close() + attachConfig := &mgr.AttachConfig{ Stdout: true, Stderr: true, - MemBuffer: &output, + Pipe: writer, MuxDisabled: true, } + startConfig := &apitypes.ExecStartConfig{} + err = c.ContainerMgr.StartExec(ctx, execid, startConfig, attachConfig) if err != nil { return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err) } - var execConfig *mgr.ContainerExecConfig - for { - execConfig, err = c.ContainerMgr.GetExecConfig(ctx, execid) + readWaitCh := make(chan error, 1) + var recv bytes.Buffer + go func() { + defer reader.Close() + _, err = io.Copy(&recv, reader) + readWaitCh <- err + }() + + select { + case <-ctx.Done(): + //TODO maybe stop the execution? + return nil, fmt.Errorf("timeout %v exceeded", timeout) + case readWaitErr := <-readWaitCh: + if readWaitErr != nil { + return nil, fmt.Errorf("failed to read data from the pipe: %v", err) + } + execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid) if err != nil { return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err) } - // Loop until exec finished. - if !execConfig.Running { - break + + var stderr []byte + if execConfig.Error != nil { + stderr = []byte(execConfig.Error.Error()) } - time.Sleep(100 * time.Millisecond) - } - var stderr []byte - if execConfig.Error != nil { - stderr = []byte(execConfig.Error.Error()) + return &runtime.ExecSyncResponse{ + Stdout: recv.Bytes(), + Stderr: stderr, + ExitCode: int32(execConfig.ExitCode), + }, nil } - - return &runtime.ExecSyncResponse{ - Stdout: output.Bytes(), - Stderr: stderr, - ExitCode: int32(execConfig.ExitCode), - }, nil } // Exec prepares a streaming endpoint to execute a command in the container, and returns the address. diff --git a/cri/v1alpha2/cri_utils_test.go b/cri/v1alpha2/cri_utils_test.go index 18c60811d..6089eda7c 100644 --- a/cri/v1alpha2/cri_utils_test.go +++ b/cri/v1alpha2/cri_utils_test.go @@ -282,7 +282,7 @@ func Test_makeSandboxPouchConfig(t *testing.T) { want *apitypes.ContainerCreateConfig wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -342,7 +342,7 @@ func Test_toCriSandbox(t *testing.T) { want *runtime.PodSandbox wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -558,7 +558,7 @@ func Test_makeContainerName(t *testing.T) { args args want string }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -579,7 +579,7 @@ func Test_modifyContainerNamespaceOptions(t *testing.T) { name string args args }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -600,7 +600,7 @@ func Test_applyContainerSecurityContext(t *testing.T) { args args wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -628,7 +628,7 @@ func TestCriManager_updateCreateConfig(t *testing.T) { args args wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -653,7 +653,7 @@ func Test_toCriContainer(t *testing.T) { want *runtime.Container wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -680,7 +680,7 @@ func Test_imageToCriImage(t *testing.T) { want *runtime.Image wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -711,7 +711,7 @@ func TestCriManager_ensureSandboxImageExists(t *testing.T) { args args wantErr bool }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -736,7 +736,7 @@ func Test_getUserFromImageUser(t *testing.T) { want *int64 want1 string }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -760,7 +760,7 @@ func Test_parseUserFromImageUser(t *testing.T) { args args want string }{ - // TODO: Add test cases. + // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/daemon/containerio/cri_log_file.go b/daemon/containerio/cri_log_file.go index caf95108b..d1e242008 100644 --- a/daemon/containerio/cri_log_file.go +++ b/daemon/containerio/cri_log_file.go @@ -14,7 +14,7 @@ import ( const ( // delimiter used in cri logging format. delimiter = ' ' - // eof is end-of-line. + // eol is end-of-line. eol = '\n' // timestampFormat is the timestamp format used in cri logging format. timestampFormat = time.RFC3339Nano diff --git a/daemon/containerio/jsonfile.go b/daemon/containerio/jsonfile.go index 312f68df5..0c5c3b9f6 100644 --- a/daemon/containerio/jsonfile.go +++ b/daemon/containerio/jsonfile.go @@ -5,6 +5,7 @@ import ( "io" "os" "path/filepath" + "strings" "time" "github.com/alibaba/pouch/daemon/logger" @@ -130,6 +131,10 @@ func (jf *jsonFile) copy(source string, reader io.ReadCloser) { Line: bs, Timestamp: createdTime, }); err != nil { + if strings.Contains(err.Error(), os.ErrClosed.Error()) { + logrus.Warnf("failed to copy %v message into jsonfile: the container may be stopped: %v", source, err) + return + } logrus.Errorf("failed to copy %v message into jsonfile: %v", source, err) return } diff --git a/daemon/containerio/mem_buffer.go b/daemon/containerio/mem_buffer.go deleted file mode 100644 index aaa4bf99d..000000000 --- a/daemon/containerio/mem_buffer.go +++ /dev/null @@ -1,42 +0,0 @@ -package containerio - -import ( - "bytes" - "io" -) - -func init() { - Register(func() Backend { - return &memBuffer{} - }) -} - -type memBuffer struct { - buffer *bytes.Buffer -} - -func (b *memBuffer) Name() string { - return "memBuffer" -} - -func (b *memBuffer) Init(opt *Option) error { - b.buffer = opt.memBuffer - return nil -} - -func (b *memBuffer) Out() io.Writer { - return b.buffer -} - -func (b *memBuffer) In() io.Reader { - return b.buffer -} - -func (b *memBuffer) Err() io.Writer { - return b.buffer -} - -func (b *memBuffer) Close() error { - // Don't need to close bytes.Buffer. - return nil -} diff --git a/daemon/containerio/options.go b/daemon/containerio/options.go index ed3558f9b..58b085460 100644 --- a/daemon/containerio/options.go +++ b/daemon/containerio/options.go @@ -1,7 +1,7 @@ package containerio import ( - "bytes" + "io" "net/http" "os" @@ -18,7 +18,7 @@ type Option struct { hijack http.Hijacker hijackUpgrade bool stdinBackend string - memBuffer *bytes.Buffer + pipe *io.PipeWriter streams *remotecommand.Streams criLogFile *os.File } @@ -101,14 +101,14 @@ func WithStdinHijack() func(*Option) { } } -// WithMemBuffer specified the memory buffer backend. -func WithMemBuffer(memBuffer *bytes.Buffer) func(*Option) { +// WithPipe specified the pipe backend. +func WithPipe(pipe *io.PipeWriter) func(*Option) { return func(opt *Option) { if opt.backends == nil { opt.backends = make(map[string]struct{}) } - opt.backends["memBuffer"] = struct{}{} - opt.memBuffer = memBuffer + opt.backends["pipe"] = struct{}{} + opt.pipe = pipe } } diff --git a/daemon/containerio/pipe.go b/daemon/containerio/pipe.go new file mode 100644 index 000000000..25b5bf6a5 --- /dev/null +++ b/daemon/containerio/pipe.go @@ -0,0 +1,40 @@ +package containerio + +import ( + "io" +) + +func init() { + Register(func() Backend { + return &pipe{} + }) +} + +type pipe struct { + pipeWriter *io.PipeWriter +} + +func (p *pipe) Name() string { + return "pipe" +} + +func (p *pipe) Init(opt *Option) error { + p.pipeWriter = opt.pipe + return nil +} + +func (p *pipe) Out() io.Writer { + return p.pipeWriter +} + +func (p *pipe) In() io.Reader { + return nil +} + +func (p *pipe) Err() io.Writer { + return p.pipeWriter +} + +func (p *pipe) Close() error { + return p.pipeWriter.Close() +} diff --git a/daemon/daemon.go b/daemon/daemon.go index 5ed41aadb..ce53a6592 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -163,17 +163,18 @@ func (d *Daemon) Run() error { } d.volumeMgr = volumeMgr - networkMgr, err := internal.GenNetworkMgr(d.config, d) + containerMgr, err := internal.GenContainerMgr(ctx, d) if err != nil { return err } - d.networkMgr = networkMgr + d.containerMgr = containerMgr - containerMgr, err := internal.GenContainerMgr(ctx, d) + networkMgr, err := internal.GenNetworkMgr(d.config, d) if err != nil { return err } - d.containerMgr = containerMgr + d.networkMgr = networkMgr + containerMgr.(*mgr.ContainerManager).NetworkMgr = networkMgr if err := d.addSystemLabels(); err != nil { return err diff --git a/daemon/logger/jsonfile/jsonfile_read_test.go b/daemon/logger/jsonfile/jsonfile_read_test.go index 44160024c..926732707 100644 --- a/daemon/logger/jsonfile/jsonfile_read_test.go +++ b/daemon/logger/jsonfile/jsonfile_read_test.go @@ -10,7 +10,7 @@ import ( ) func TestReadLogMessagesWithRemoveFileInFollowMode(t *testing.T) { - f, err := ioutil.TempFile("/root", "tail-file") + f, err := ioutil.TempFile("", "tail-file") if err != nil { t.Fatalf("unexpected error during create tempfile: %v", err) } diff --git a/daemon/mgr/container.go b/daemon/mgr/container.go index 668b17c24..38c261b01 100644 --- a/daemon/mgr/container.go +++ b/daemon/mgr/container.go @@ -157,14 +157,13 @@ type ContainerManager struct { } // NewContainerManager creates a brand new container manager. -func NewContainerManager(ctx context.Context, store *meta.Store, cli ctrd.APIClient, imgMgr ImageMgr, volMgr VolumeMgr, netMgr NetworkMgr, cfg *config.Config, contPlugin plugins.ContainerPlugin) (*ContainerManager, error) { +func NewContainerManager(ctx context.Context, store *meta.Store, cli ctrd.APIClient, imgMgr ImageMgr, volMgr VolumeMgr, cfg *config.Config, contPlugin plugins.ContainerPlugin) (*ContainerManager, error) { mgr := &ContainerManager{ Store: store, NameToID: collect.NewSafeMap(), Client: cli, ImageMgr: imgMgr, VolumeMgr: volMgr, - NetworkMgr: netMgr, IOs: containerio.NewCache(), ExecProcesses: collect.NewSafeMap(), cache: collect.NewSafeMap(), @@ -1341,8 +1340,12 @@ func (mgr *ContainerManager) Disconnect(ctx context.Context, containerName, netw endpoint.Name = network.Name endpoint.EndpointConfig = epConfig if err := mgr.NetworkMgr.EndpointRemove(ctx, endpoint); err != nil { - logrus.Errorf("failed to remove endpoint: %v", err) - return err + // TODO(ziren): it is a trick, we should wrapper sanbox + // not found as an error type + if !strings.Contains(err.Error(), "not found") { + logrus.Errorf("failed to remove endpoint: %v", err) + return err + } } // disconnect an endpoint success, delete endpoint info from container json @@ -1544,9 +1547,9 @@ func attachConfigToOptions(attach *AttachConfig) []func(*containerio.Option) { if attach.Stdin { options = append(options, containerio.WithStdinHijack()) } - } else if attach.MemBuffer != nil { - // Attaching using memory buffer. - options = append(options, containerio.WithMemBuffer(attach.MemBuffer)) + } else if attach.Pipe != nil { + // Attaching using pipe. + options = append(options, containerio.WithPipe(attach.Pipe)) } else if attach.Streams != nil { // Attaching using streams. options = append(options, containerio.WithStreams(attach.Streams)) @@ -1574,33 +1577,46 @@ func (mgr *ContainerManager) markStoppedAndRelease(c *Container, m *ctrd.Message } } + // unset Snapshot MergedDir. Stop a container will + // delete the containerd container, the merged dir + // will also be deleted, so we should unset the + // container's MergedDir. + if c.Snapshotter != nil && c.Snapshotter.Data != nil { + c.Snapshotter.Data["MergedDir"] = "" + } + + // Remove io and network config may occur error, so we should update + // container's status on disk as soon as possible. + if err := c.Write(mgr.Store); err != nil { + logrus.Errorf("failed to update meta: %v", err) + return err + } + // release resource if io := mgr.IOs.Get(c.ID); io != nil { io.Close() mgr.IOs.Remove(c.ID) } - // release network - if c.NetworkSettings != nil { - for name, epConfig := range c.NetworkSettings.Networks { - endpoint := mgr.buildContainerEndpoint(c) - endpoint.Name = name - endpoint.EndpointConfig = epConfig - if err := mgr.NetworkMgr.EndpointRemove(context.Background(), endpoint); err != nil { + // No network binded, just return + if c.NetworkSettings == nil { + return nil + } + + for name, epConfig := range c.NetworkSettings.Networks { + endpoint := mgr.buildContainerEndpoint(c) + endpoint.Name = name + endpoint.EndpointConfig = epConfig + if err := mgr.NetworkMgr.EndpointRemove(context.Background(), endpoint); err != nil { + // TODO(ziren): it is a trick, we should wrapper "sanbox + // not found"" as an error type + if !strings.Contains(err.Error(), "not found") { logrus.Errorf("failed to remove endpoint: %v", err) return err } } } - // unset Snapshot MergedDir. Stop a container will - // delete the containerd container, the merged dir - // will also be deleted, so we should unset the - // container's MergedDir. - if c.Snapshotter != nil && c.Snapshotter.Data != nil { - c.Snapshotter.Data["MergedDir"] = "" - } - // update meta if err := c.Write(mgr.Store); err != nil { logrus.Errorf("failed to update meta: %v", err) @@ -2133,25 +2149,7 @@ func (mgr *ContainerManager) detachVolumes(ctx context.Context, c *Container, re } func (mgr *ContainerManager) buildContainerEndpoint(c *Container) *networktypes.Endpoint { - ep := &networktypes.Endpoint{ - Owner: c.ID, - Hostname: c.Config.Hostname, - Domainname: c.Config.Domainname, - HostsPath: c.HostsPath, - ExtraHosts: c.HostConfig.ExtraHosts, - HostnamePath: c.HostnamePath, - ResolvConfPath: c.ResolvConfPath, - NetworkDisabled: c.Config.NetworkDisabled, - NetworkMode: c.HostConfig.NetworkMode, - DNS: c.HostConfig.DNS, - DNSOptions: c.HostConfig.DNSOptions, - DNSSearch: c.HostConfig.DNSSearch, - MacAddress: c.Config.MacAddress, - PublishAllPorts: c.HostConfig.PublishAllPorts, - ExposedPorts: c.Config.ExposedPorts, - PortBindings: c.HostConfig.PortBindings, - NetworkConfig: c.NetworkSettings, - } + ep := BuildContainerEndpoint(c) if mgr.containerPlugin != nil { ep.Priority, ep.DisableResolver, ep.GenericParams = mgr.containerPlugin.PreCreateEndpoint(c.ID, c.Config.Env) diff --git a/daemon/mgr/container_types.go b/daemon/mgr/container_types.go index 135f35f68..6976961ae 100644 --- a/daemon/mgr/container_types.go +++ b/daemon/mgr/container_types.go @@ -1,8 +1,8 @@ package mgr import ( - "bytes" "fmt" + "io" "net/http" "os" "sync" @@ -71,8 +71,8 @@ type AttachConfig struct { Hijack http.Hijacker Upgrade bool - // Attach using memory buffer. - MemBuffer *bytes.Buffer + // Attach using pipe. + Pipe *io.PipeWriter // Attach using streams. Streams *remotecommand.Streams diff --git a/daemon/mgr/container_utils.go b/daemon/mgr/container_utils.go index aaaaafede..e3ea7fb40 100644 --- a/daemon/mgr/container_utils.go +++ b/daemon/mgr/container_utils.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/alibaba/pouch/apis/types" + networktypes "github.com/alibaba/pouch/network/types" "github.com/alibaba/pouch/pkg/errtypes" "github.com/alibaba/pouch/pkg/meta" "github.com/alibaba/pouch/pkg/randomid" @@ -103,6 +104,29 @@ func (mgr *ContainerManager) generateName(id string) string { return name } +// BuildContainerEndpoint is used to build container's endpoint config. +func BuildContainerEndpoint(c *Container) *networktypes.Endpoint { + return &networktypes.Endpoint{ + Owner: c.ID, + Hostname: c.Config.Hostname, + Domainname: c.Config.Domainname, + HostsPath: c.HostsPath, + ExtraHosts: c.HostConfig.ExtraHosts, + HostnamePath: c.HostnamePath, + ResolvConfPath: c.ResolvConfPath, + NetworkDisabled: c.Config.NetworkDisabled, + NetworkMode: c.HostConfig.NetworkMode, + DNS: c.HostConfig.DNS, + DNSOptions: c.HostConfig.DNSOptions, + DNSSearch: c.HostConfig.DNSSearch, + MacAddress: c.Config.MacAddress, + PublishAllPorts: c.HostConfig.PublishAllPorts, + ExposedPorts: c.Config.ExposedPorts, + PortBindings: c.HostConfig.PortBindings, + NetworkConfig: c.NetworkSettings, + } +} + func parseSecurityOpts(c *Container, securityOpts []string) error { var ( labelOpts []string diff --git a/daemon/mgr/network.go b/daemon/mgr/network.go index 0feb6dcb4..17f91c9fd 100644 --- a/daemon/mgr/network.go +++ b/daemon/mgr/network.go @@ -65,13 +65,32 @@ type NetworkManager struct { } // NewNetworkManager creates a brand new network manager. -func NewNetworkManager(cfg *config.Config, store *meta.Store) (*NetworkManager, error) { +func NewNetworkManager(cfg *config.Config, store *meta.Store, ctrMgr ContainerMgr) (*NetworkManager, error) { // Create a new controller instance cfg.NetworkConfg.MetaPath = path.Dir(store.BaseDir) cfg.NetworkConfg.ExecRoot = network.DefaultExecRoot initNetworkLog(cfg) + // get active sandboxes + ctrs, err := ctrMgr.List(context.Background(), + func(c *Container) bool { + return (c.IsRunning() || c.IsPaused()) && !isContainer(c.HostConfig.NetworkMode) + }, &ContainerListOption{All: true}) + if err != nil { + logrus.Errorf("failed to new network manager, can not get container list") + return nil, errors.Wrap(err, "failed to get container list") + } + cfg.NetworkConfg.ActiveSandboxes = make(map[string]interface{}) + for _, c := range ctrs { + endpoint := BuildContainerEndpoint(c) + sbOptions, err := buildSandboxOptions(cfg.NetworkConfg, endpoint) + if err != nil { + return nil, errors.Wrap(err, "failed to build sandbox options") + } + cfg.NetworkConfg.ActiveSandboxes[c.NetworkSettings.SandboxID] = sbOptions + } + ctlOptions, err := controllerOptions(cfg.NetworkConfg) if err != nil { return nil, errors.Wrap(err, "failed to build network options") @@ -280,7 +299,7 @@ func (nm *NetworkManager) EndpointCreate(ctx context.Context, endpoint *types.En // create sandbox sb := nm.getNetworkSandbox(containerID) if sb == nil { - sandboxOptions, err := nm.sandboxOptions(endpoint) + sandboxOptions, err := buildSandboxOptions(nm.config, endpoint) if err != nil { return "", fmt.Errorf("failed to build sandbox options(%v)", err) } @@ -422,6 +441,10 @@ func controllerOptions(cfg network.Config) ([]nwconfig.Option, error) { options = append(options, nwconfig.OptionExecRoot(cfg.ExecRoot)) } + if len(cfg.ActiveSandboxes) != 0 { + options = append(options, nwconfig.OptionActiveSandboxes(cfg.ActiveSandboxes)) + } + options = append(options, nwconfig.OptionDefaultDriver("bridge")) options = append(options, nwconfig.OptionDefaultNetwork("bridge")) @@ -562,7 +585,7 @@ func endpointOptions(n libnetwork.Network, endpoint *types.Endpoint) ([]libnetwo return createOptions, nil } -func (nm *NetworkManager) sandboxOptions(endpoint *types.Endpoint) ([]libnetwork.SandboxOption, error) { +func buildSandboxOptions(config network.Config, endpoint *types.Endpoint) ([]libnetwork.SandboxOption, error) { var ( sandboxOptions []libnetwork.SandboxOption dns []string @@ -577,9 +600,9 @@ func (nm *NetworkManager) sandboxOptions(endpoint *types.Endpoint) ([]libnetwork if len(endpoint.ExtraHosts) == 0 { sandboxOptions = append(sandboxOptions, libnetwork.OptionOriginHostsPath("/etc/hosts")) } - if len(endpoint.DNS) == 0 && len(nm.config.DNS) == 0 && - len(endpoint.DNSSearch) == 0 && len(nm.config.DNSSearch) == 0 && - len(endpoint.DNSOptions) == 0 && len(nm.config.DNSOptions) == 0 { + if len(endpoint.DNS) == 0 && len(config.DNS) == 0 && + len(endpoint.DNSSearch) == 0 && len(config.DNSSearch) == 0 && + len(endpoint.DNSOptions) == 0 && len(config.DNSOptions) == 0 { sandboxOptions = append(sandboxOptions, libnetwork.OptionOriginResolvConfPath("/etc/resolv.conf")) } } else { @@ -592,8 +615,8 @@ func (nm *NetworkManager) sandboxOptions(endpoint *types.Endpoint) ([]libnetwork // parse DNS if len(endpoint.DNS) > 0 { dns = endpoint.DNS - } else if len(nm.config.DNS) > 0 { - dns = nm.config.DNS + } else if len(config.DNS) > 0 { + dns = config.DNS } for _, d := range dns { sandboxOptions = append(sandboxOptions, libnetwork.OptionDNS(d)) @@ -602,8 +625,8 @@ func (nm *NetworkManager) sandboxOptions(endpoint *types.Endpoint) ([]libnetwork // parse DNS Search if len(endpoint.DNSSearch) > 0 { dnsSearch = endpoint.DNSSearch - } else if len(nm.config.DNSSearch) > 0 { - dnsSearch = nm.config.DNSSearch + } else if len(config.DNSSearch) > 0 { + dnsSearch = config.DNSSearch } for _, ds := range dnsSearch { sandboxOptions = append(sandboxOptions, libnetwork.OptionDNSSearch(ds)) @@ -612,8 +635,8 @@ func (nm *NetworkManager) sandboxOptions(endpoint *types.Endpoint) ([]libnetwork // parse DNS Options if len(endpoint.DNSOptions) > 0 { dnsOptions = endpoint.DNSOptions - } else if len(nm.config.DNSOptions) > 0 { - dnsOptions = nm.config.DNSOptions + } else if len(config.DNSOptions) > 0 { + dnsOptions = config.DNSOptions } for _, ds := range dnsOptions { sandboxOptions = append(sandboxOptions, libnetwork.OptionDNSOptions(ds)) diff --git a/docs/commandline/pouch_exec.md b/docs/commandline/pouch_exec.md index 626cce1dd..b92aeeb6d 100644 --- a/docs/commandline/pouch_exec.md +++ b/docs/commandline/pouch_exec.md @@ -10,6 +10,16 @@ Exec a process in a running container pouch exec [OPTIONS] CONTAINER COMMAND [ARG...] ``` +### Examples + +``` +$ pouch exec -it 25bf50 ps +PID USER TIME COMMAND + 1 root 0:00 /bin/sh + 38 root 0:00 ps + +``` + ### Options ``` diff --git a/docs/commandline/pouch_top.md b/docs/commandline/pouch_top.md index ca7336728..9b8fb720f 100644 --- a/docs/commandline/pouch_top.md +++ b/docs/commandline/pouch_top.md @@ -14,9 +14,9 @@ pouch top CONTAINER [ps OPTIONS] ``` $ pouch top 44f675 - UID PID PPID C STIME TTY TIME CMD - root 28725 28714 0 3月14 ? 00:00:00 sh - +UID PID PPID C STIME TTY TIME CMD +root 28725 28714 0 3月14 ? 00:00:00 sh + ``` ### Options diff --git a/internal/generator.go b/internal/generator.go index b418e47a5..e2c72ae17 100644 --- a/internal/generator.go +++ b/internal/generator.go @@ -25,7 +25,7 @@ type DaemonProvider interface { // GenContainerMgr generates a ContainerMgr instance according to config cfg. func GenContainerMgr(ctx context.Context, d DaemonProvider) (mgr.ContainerMgr, error) { - return mgr.NewContainerManager(ctx, d.MetaStore(), d.Containerd(), d.ImgMgr(), d.VolMgr(), d.NetMgr(), d.Config(), d.ContainerPlugin()) + return mgr.NewContainerManager(ctx, d.MetaStore(), d.Containerd(), d.ImgMgr(), d.VolMgr(), d.Config(), d.ContainerPlugin()) } // GenSystemMgr generates a SystemMgr instance according to config cfg. @@ -47,10 +47,5 @@ func GenVolumeMgr(cfg *config.Config, d DaemonProvider) (mgr.VolumeMgr, error) { // GenNetworkMgr generates a NetworkMgr instance according to config cfg. func GenNetworkMgr(cfg *config.Config, d DaemonProvider) (mgr.NetworkMgr, error) { - return mgr.NewNetworkManager(cfg, d.MetaStore()) + return mgr.NewNetworkManager(cfg, d.MetaStore(), d.CtrMgr()) } - -//// GenCriMgr generates a CriMgr instance. -//func GenCriMgr(d DaemonProvider) (cri.CriMgr, error) { -// return cri.NewCriManager(d.Config(), d.CtrMgr(), d.ImgMgr()) -//} diff --git a/network/config.go b/network/config.go index 08bc02aab..c9b12e0d4 100644 --- a/network/config.go +++ b/network/config.go @@ -15,6 +15,8 @@ type Config struct { // bridge config BridgeConfig BridgeConfig + + ActiveSandboxes map[string]interface{} } // BridgeConfig defines the bridge network configuration. diff --git a/network/mode/init.go b/network/mode/init.go index 7d9671712..800522757 100644 --- a/network/mode/init.go +++ b/network/mode/init.go @@ -12,6 +12,12 @@ import ( // NetworkModeInit is used to initilize network mode, include host and none network. func NetworkModeInit(ctx context.Context, config network.Config, manager mgr.NetworkMgr) error { + // if it has old containers, don't to intialize network. + if len(config.ActiveSandboxes) > 0 { + logrus.Warnf("There are old containers, don't to initialize network") + return nil + } + // init none network if n, _ := manager.Get(ctx, "none"); n == nil { logrus.Debugf("create none network") diff --git a/test/api_container_restart_test.go b/test/api_container_restart_test.go index 5b675aa93..840607c3c 100644 --- a/test/api_container_restart_test.go +++ b/test/api_container_restart_test.go @@ -74,4 +74,6 @@ func (suite *APIContainerRestartSuite) TestAPIRestartPausedContainer(c *check.C) resp, err := request.Post("/containers/"+cname+"/restart", query) c.Assert(err, check.IsNil) CheckRespStatus(c, resp, 204) + + DelContainerForceOk(c, cname) } diff --git a/test/cli_ps_test.go b/test/cli_ps_test.go index 38ee7a9c9..e03261cea 100644 --- a/test/cli_ps_test.go +++ b/test/cli_ps_test.go @@ -76,7 +76,7 @@ func (suite *PouchPsSuite) TestPsWorks(c *check.C) { func (suite *PouchPsSuite) TestPsAll(c *check.C) { name := "ps-all" - command.PouchRun("create", "--name", name, busyboxImage).Assert(c, icmd.Success) + command.PouchRun("create", "--name", name, busyboxImage, "top").Assert(c, icmd.Success) defer DelContainerForceMultyTime(c, name) res := command.PouchRun("ps").Assert(c, icmd.Success) @@ -95,7 +95,7 @@ func (suite *PouchPsSuite) TestPsAll(c *check.C) { func (suite *PouchPsSuite) TestPsQuiet(c *check.C) { name := "ps-quiet" - command.PouchRun("create", "--name", name, busyboxImage).Assert(c, icmd.Success) + command.PouchRun("create", "--name", name, busyboxImage, "top").Assert(c, icmd.Success) defer DelContainerForceMultyTime(c, name) res := command.PouchRun("ps", "-q", "-a").Assert(c, icmd.Success) @@ -113,7 +113,7 @@ func (suite *PouchPsSuite) TestPsQuiet(c *check.C) { func (suite *PouchPsSuite) TestPsNoTrunc(c *check.C) { name := "ps-noTrunc" - command.PouchRun("create", "--name", name, busyboxImage).Assert(c, icmd.Success) + command.PouchRun("create", "--name", name, busyboxImage, "top").Assert(c, icmd.Success) defer DelContainerForceMultyTime(c, name) command.PouchRun("start", name).Assert(c, icmd.Success) diff --git a/test/z_cli_daemon_test.go b/test/z_cli_daemon_test.go index 88aac4b81..838deb98b 100644 --- a/test/z_cli_daemon_test.go +++ b/test/z_cli_daemon_test.go @@ -288,7 +288,7 @@ func (suite *PouchDaemonSuite) TestDaemonRestartWithPausedContainer(c *check.C) cname := "TestDaemonRestartWithPausedContainer" { result := RunWithSpecifiedDaemon(dcfg, "run", "-d", "--name", cname, - "-p", "1234:80", busyboxImage, "top") + "-p", "5678:80", busyboxImage, "top") if result.ExitCode != 0 { dcfg.DumpLog() c.Fatalf("run container failed, err: %v", result)