diff --git a/daemon/containerio/jsonfile.go b/daemon/containerio/jsonfile.go index 44f5855d72..f9f98cc216 100644 --- a/daemon/containerio/jsonfile.go +++ b/daemon/containerio/jsonfile.go @@ -21,7 +21,7 @@ func init() { var jsonFilePathName = "json.log" -// TODO(fuwei): add compress/logrotate configurate +// TODO(fuwei): add compress/logrotate configuration type jsonFile struct { closed bool diff --git a/daemon/logger/jsonfile/jsonfile_read.go b/daemon/logger/jsonfile/jsonfile_read.go index c48ce279d7..832de76d08 100644 --- a/daemon/logger/jsonfile/jsonfile_read.go +++ b/daemon/logger/jsonfile/jsonfile_read.go @@ -13,7 +13,7 @@ func (lf *JSONLogFile) ReadLogMessages(cfg *logger.ReadConfig) *logger.LogWatche go func() { // NOTE: We cannot close the channel in the JSONLogFile.read - // function because we cannot gurantee that watcher will be + // function because we cannot guarantee that watcher will be // close the channel. Since the watcher is created in the // JSONLogFile.ReadLogMessages, we make sure that watcher.Msgs // can be closed after the JSONLogFile.read. diff --git a/daemon/mgr/container.go b/daemon/mgr/container.go index 668b17c241..7bb7d98f15 100644 --- a/daemon/mgr/container.go +++ b/daemon/mgr/container.go @@ -362,9 +362,6 @@ func (mgr *ContainerManager) Create(ctx context.Context, name string, config *ty container.Snapshotter.Data["UpperDir"] = upperDir } - container.Lock() - defer container.Unlock() - // store disk if err := container.Write(mgr.Store); err != nil { logrus.Errorf("failed to update meta: %v", err) @@ -388,9 +385,6 @@ func (mgr *ContainerManager) Get(ctx context.Context, name string) (*Container, return nil, err } - c.Lock() - defer c.Unlock() - return c, nil } @@ -408,10 +402,11 @@ func (mgr *ContainerManager) List(ctx context.Context, filter ContainerFilter, o if !ok { return nil, fmt.Errorf("failed to get container list, invalid meta type") } + // TODO: make func filter with no data race if filter != nil && filter(c) { if option.All { cons = append(cons, c) - } else if c.State.Status == types.StatusRunning || c.State.Status == types.StatusPaused { + } else if c.IsRunningOrPaused() { cons = append(cons, c) } } @@ -431,31 +426,33 @@ func (mgr *ContainerManager) Start(ctx context.Context, id, detachKeys string) ( return err } - c.Lock() - defer c.Unlock() - return mgr.start(ctx, c, detachKeys) } func (mgr *ContainerManager) start(ctx context.Context, c *Container, detachKeys string) error { + c.Lock() if c.Config == nil || c.State == nil { + c.Unlock() return errors.Wrapf(errtypes.ErrNotfound, "container %s", c.ID) } c.DetachKeys = detachKeys // initialise container network mode networkMode := c.HostConfig.NetworkMode + c.Unlock() if IsContainer(networkMode) { origContainer, err := mgr.Get(ctx, strings.SplitN(networkMode, ":", 2)[1]) if err != nil { return err } + c.Lock() c.HostnamePath = origContainer.HostnamePath c.HostsPath = origContainer.HostsPath c.ResolvConfPath = origContainer.ResolvConfPath c.Config.Hostname = origContainer.Config.Hostname c.Config.Domainname = origContainer.Config.Domainname + c.Unlock() } // initialise host network mode @@ -468,17 +465,20 @@ func (mgr *ContainerManager) start(ctx context.Context, c *Container, detachKeys } // initialise network endpoint + c.Lock() if c.NetworkSettings != nil { for name, endpointSetting := range c.NetworkSettings.Networks { endpoint := mgr.buildContainerEndpoint(c) endpoint.Name = name endpoint.EndpointConfig = endpointSetting if _, err := mgr.NetworkMgr.EndpointCreate(ctx, endpoint); err != nil { + c.Unlock() logrus.Errorf("failed to create endpoint: %v", err) return err } } } + c.Unlock() return mgr.createContainerdContainer(ctx, c) } @@ -486,9 +486,11 @@ func (mgr *ContainerManager) start(ctx context.Context, c *Container, detachKeys func (mgr *ContainerManager) createContainerdContainer(ctx context.Context, c *Container) error { // CgroupParent from HostConfig will be first priority to use, // then will be value from mgr.Config.CgroupParent + c.Lock() if c.HostConfig.CgroupParent == "" { c.HostConfig.CgroupParent = mgr.Config.CgroupParent } + c.Unlock() var ( err error @@ -496,6 +498,7 @@ func (mgr *ContainerManager) createContainerdContainer(ctx context.Context, c *C argsArr [][]string ) if mgr.containerPlugin != nil { + // TODO: make func PreStart with no data race prioArr, argsArr, err = mgr.containerPlugin.PreStart(c) if err != nil { return errors.Wrapf(err, "get pre-start hook error from container plugin") @@ -520,13 +523,17 @@ func (mgr *ContainerManager) createContainerdContainer(ctx context.Context, c *C return errors.Wrap(err, "failed to open io") } - if err := mgr.Client.CreateContainer(ctx, &ctrd.Container{ + c.Lock() + ctrdContainer := &ctrd.Container{ ID: c.ID, Image: c.Config.Image, Runtime: c.HostConfig.Runtime, Spec: sw.s, IO: io, - }); err != nil { + } + c.Unlock() + + if err := mgr.Client.CreateContainer(ctx, ctrdContainer); err != nil { logrus.Errorf("failed to create new containerd container: %v", err) // TODO(ziren): markStoppedAndRelease may failed @@ -536,36 +543,30 @@ func (mgr *ContainerManager) createContainerdContainer(ctx context.Context, c *C } // Create containerd container success. - c.State.Status = types.StatusRunning - c.State.StartedAt = time.Now().UTC().Format(utils.TimeLayout) + pid, err := mgr.Client.ContainerPID(ctx, c.ID) if err != nil { return errors.Wrapf(err, "failed to get PID of container %s", c.ID) } - c.State.Pid = int64(pid) - c.State.ExitCode = 0 + + c.SetStatusRunning(int64(pid)) // set Snapshot MergedDir + c.Lock() c.Snapshotter.Data["MergedDir"] = c.BaseFS + c.Unlock() return c.Write(mgr.Store) } // Stop stops a running container. func (mgr *ContainerManager) Stop(ctx context.Context, name string, timeout int64) error { - var ( - err error - c *Container - ) - - if c, err = mgr.container(name); err != nil { + c, err := mgr.container(name) + if err != nil { return err } - c.Lock() - defer c.Unlock() - - if !c.IsRunning() && !c.IsPaused() { + if !c.IsRunningOrPaused() { // stopping a non-running container is valid. return nil } @@ -578,9 +579,10 @@ func (mgr *ContainerManager) Stop(ctx context.Context, name string, timeout int6 } func (mgr *ContainerManager) stop(ctx context.Context, c *Container, timeout int64) error { - msg, err := mgr.Client.DestroyContainer(ctx, c.ID, timeout) + id := c.ID + msg, err := mgr.Client.DestroyContainer(ctx, id, timeout) if err != nil { - return errors.Wrapf(err, "failed to destroy container %s", c.ID) + return errors.Wrapf(err, "failed to destroy container %s", id) } return mgr.markStoppedAndRelease(c, msg) @@ -593,18 +595,16 @@ func (mgr *ContainerManager) Restart(ctx context.Context, name string, timeout i return err } - c.Lock() - defer c.Unlock() - if timeout == 0 { timeout = c.StopTimeout() } - if c.IsRunning() || c.IsPaused() { + if c.IsRunningOrPaused() { // stop container if it is running or paused. if err := mgr.stop(ctx, c, timeout); err != nil { - logrus.Errorf("failed to stop container %s when restarting: %v", c.ID, err) - return errors.Wrapf(err, fmt.Sprintf("failed to stop container %s", c.ID)) + ex := fmt.Errorf("failed to stop container %s when restarting: %v", c.ID, err) + logrus.Errorf(ex.Error()) + return ex } } @@ -615,22 +615,11 @@ func (mgr *ContainerManager) Restart(ctx context.Context, name string, timeout i // Pause pauses a running container. func (mgr *ContainerManager) Pause(ctx context.Context, name string) error { - var ( - err error - c *Container - ) - - if c, err = mgr.container(name); err != nil { + c, err := mgr.container(name) + if err != nil { return err } - c.Lock() - defer c.Unlock() - - if c.Config == nil || c.State == nil { - return errors.Wrapf(errtypes.ErrNotfound, "container %s", c.ID) - } - if !c.IsRunning() { return fmt.Errorf("container's status is not running: %s", c.State.Status) } @@ -639,10 +628,10 @@ func (mgr *ContainerManager) Pause(ctx context.Context, name string) error { return errors.Wrapf(err, "failed to pause container %s", c.ID) } - c.State.Status = types.StatusPaused + c.SetStatusPaused() if err := c.Write(mgr.Store); err != nil { - logrus.Errorf("failed to update meta: %v", err) + logrus.Errorf("failed to update meta of container %s: %v", c.ID, err) return err } @@ -651,34 +640,23 @@ func (mgr *ContainerManager) Pause(ctx context.Context, name string) error { // Unpause unpauses a paused container. func (mgr *ContainerManager) Unpause(ctx context.Context, name string) error { - var ( - err error - c *Container - ) - - if c, err = mgr.container(name); err != nil { + c, err := mgr.container(name) + if err != nil { return err } - c.Lock() - defer c.Unlock() - - if c.Config == nil || c.State == nil { - return errors.Wrap(errtypes.ErrNotfound, "container "+c.ID) - } - if !c.IsPaused() { - return fmt.Errorf("container's status is not paused: %v", c.State.Status) + return fmt.Errorf("status(%s) of container %s is not paused", c.State.Status, c.ID) } if err := mgr.Client.UnpauseContainer(ctx, c.ID); err != nil { - return errors.Wrapf(err, "failed to unpause container: %s", c.ID) + return errors.Wrapf(err, "failed to unpause container %s", c.ID) } - c.State.Status = types.StatusRunning + c.SetStatusUnpaused() if err := c.Write(mgr.Store); err != nil { - logrus.Errorf("failed to update meta: %v", err) + logrus.Errorf("failed to update meta of container %s: %v", c.ID, err) return err } @@ -702,28 +680,25 @@ func (mgr *ContainerManager) Attach(ctx context.Context, name string, attach *At // Rename renames a container. func (mgr *ContainerManager) Rename(ctx context.Context, oldName, newName string) error { - var ( - c *Container - err error - ) - if mgr.NameToID.Get(newName).Exist() { return errors.Wrap(errtypes.ErrAlreadyExisted, "container name: "+newName) } - if c, err = mgr.container(oldName); err != nil { - return errors.Wrap(err, "failed to rename container") + c, err := mgr.container(oldName) + if err != nil { + return errors.Wrapf(err, "failed to rename container %s", oldName) } + c.Lock() - defer c.Unlock() + name := c.Name + c.Name = newName + c.Unlock() - mgr.NameToID.Remove(c.Name) + mgr.NameToID.Remove(name) mgr.NameToID.Put(newName, c.ID) - c.Name = newName - if err := c.Write(mgr.Store); err != nil { - logrus.Errorf("failed to update meta: %v", err) + logrus.Errorf("failed to update meta of container %s: %v", c.ID, err) return err } @@ -737,13 +712,12 @@ func (mgr *ContainerManager) Update(ctx context.Context, name string, config *ty return err } - c.Lock() - defer c.Unlock() - if c.IsRunning() && config.Resources.KernelMemory != 0 { - return errors.Wrapf(nil, fmt.Sprintf("failed to update container %s: can not update kernel memory to a running container, please stop it first", c.ID)) + return fmt.Errorf("failed to update container %s: can not update kernel memory to a running container, please stop it first", c.ID) } + c.Lock() + // init Container Labels if c.Config.Labels == nil { c.Config.Labels = map[string]string{} @@ -776,24 +750,28 @@ func (mgr *ContainerManager) Update(ctx context.Context, name string, config *ty } } + c.Unlock() + // update container disk quota if err := mgr.updateContainerDiskQuota(ctx, c, config.DiskQuota); err != nil { - return errors.Wrapf(err, fmt.Sprintf("failed to update container %s diskquota", c.ID)) + return errors.Wrapf(err, "failed to update diskquota of container %s", c.ID) } // update Resources of a container. if err := mgr.updateContainerResources(c, config.Resources); err != nil { - return errors.Wrapf(err, fmt.Sprintf("failed to update container %s resources", c.ID)) + return errors.Wrapf(err, "failed to update resource of container %s", c.ID) } + c.Lock() // TODO update restartpolicy when container is running. if config.RestartPolicy.Name != "" { c.HostConfig.RestartPolicy = config.RestartPolicy } + c.Unlock() // update env when container is running, default snapshotter driver // is overlayfs - if (c.IsRunning() || c.IsPaused()) && len(config.Env) > 0 && c.Snapshotter != nil { + if c.IsRunningOrPaused() && len(config.Env) > 0 && c.Snapshotter != nil { if mergedDir, exists := c.Snapshotter.Data["MergedDir"]; exists { if err := mgr.updateContainerEnv(c.Config.Env, mergedDir); err != nil { return errors.Wrapf(err, "failed to update env of running container") @@ -827,21 +805,19 @@ func (mgr *ContainerManager) Remove(ctx context.Context, name string, options *t if err != nil { return err } - c.Lock() - defer c.Unlock() if !c.IsStopped() && !c.IsExited() && !c.IsCreated() && !options.Force { - return fmt.Errorf("container: %s is not stopped, can't remove it without flag force", c.ID) + return fmt.Errorf("container %s is not stopped, cannot remove it without flag force", c.ID) } // if the container is running, force to stop it. if c.IsRunning() && options.Force { msg, err := mgr.Client.DestroyContainer(ctx, c.ID, c.StopTimeout()) if err != nil && !errtypes.IsNotfound(err) { - return errors.Wrapf(err, "failed to destroy container: %s", c.ID) + return errors.Wrapf(err, "failed to destroy container %s", c.ID) } if err := mgr.markStoppedAndRelease(c, msg); err != nil { - return errors.Wrapf(err, "failed to mark container: %s stop status", c.ID) + return errors.Wrapf(err, "failed to mark container %s stop status", c.ID) } } @@ -850,11 +826,13 @@ func (mgr *ContainerManager) Remove(ctx context.Context, name string, options *t } // remove name + c.Lock() mgr.NameToID.Remove(c.Name) + c.Unlock() // remove meta data if err := mgr.Store.Remove(c.Key()); err != nil { - logrus.Errorf("failed to remove container: %s meta store, %v", c.ID, err) + logrus.Errorf("failed to remove container %s from meta store: %v", c.ID, err) } // remove container cache @@ -862,7 +840,7 @@ func (mgr *ContainerManager) Remove(ctx context.Context, name string, options *t // remove snapshot if err := mgr.Client.RemoveSnapshot(ctx, c.ID); err != nil { - logrus.Errorf("failed to remove container: %s snapshot, %v", c.ID, err) + logrus.Errorf("failed to remove snapshot of container %s: %v", c.ID, err) } return nil @@ -878,22 +856,26 @@ func (mgr *ContainerManager) updateContainerDiskQuota(ctx context.Context, c *Co return errors.Wrapf(err, "failed to parse disk quota") } + c.Lock() c.Config.DiskQuota = quotaMap + c.Unlock() // set mount point disk quota if err := mgr.setMountPointDiskQuota(ctx, c); err != nil { return errors.Wrapf(err, "failed to set mount point disk quota") } + c.Lock() var qid uint32 if c.Config.QuotaID != "" { id, err := strconv.Atoi(c.Config.QuotaID) if err != nil { - return errors.Wrapf(err, "invalid argument, QuotaID: %s", c.Config.QuotaID) + return errors.Wrapf(err, "failed to convert QuotaID %s", c.Config.QuotaID) } - // if QuotaID is < 0, it means pouchd alloc a unique quota id. + qid = uint32(id) if id < 0 { + // QuotaID is < 0, it means pouchd alloc a unique quota id. qid, err = quota.GetNextQuatoID() if err != nil { return errors.Wrap(err, "failed to get next quota id") @@ -901,10 +883,9 @@ func (mgr *ContainerManager) updateContainerDiskQuota(ctx context.Context, c *Co // update QuotaID c.Config.QuotaID = strconv.Itoa(int(qid)) - } else { - qid = uint32(id) } } + c.Unlock() // get rootfs quota defaultQuota := quota.GetDefaultQuota(quotaMap) @@ -912,8 +893,8 @@ func (mgr *ContainerManager) updateContainerDiskQuota(ctx context.Context, c *Co return fmt.Errorf("set quota id but have no set default quota size") } // update container rootfs disk quota - status := c.State.Status - if (status == types.StatusRunning || status == types.StatusPaused) && c.Snapshotter != nil { + // TODO: add lock for container? + if c.IsRunningOrPaused() && c.Snapshotter != nil { basefs, ok := c.Snapshotter.Data["MergedDir"] if !ok || basefs == "" { return fmt.Errorf("Container is running, but MergedDir is missing") @@ -929,6 +910,8 @@ func (mgr *ContainerManager) updateContainerDiskQuota(ctx context.Context, c *Co // updateContainerResources update container's resources parameters. func (mgr *ContainerManager) updateContainerResources(c *Container, resources types.Resources) error { + c.Lock() + defer c.Unlock() // update resources of container. cResources := &c.HostConfig.Resources if resources.BlkioWeight != 0 { @@ -1101,9 +1084,6 @@ func (mgr *ContainerManager) Upgrade(ctx context.Context, name string, config *t return err } - c.Lock() - defer c.Unlock() - // check the image existed or not, and convert image id to image ref _, _, primaryRef, err := mgr.ImageMgr.CheckReference(ctx, config.Image) if err != nil { @@ -1112,12 +1092,16 @@ func (mgr *ContainerManager) Upgrade(ctx context.Context, name string, config *t config.Image = primaryRef.String() // Nothing changed, no need upgrade. + c.Lock() if config.Image == c.Config.Image { + c.Unlock() return fmt.Errorf("failed to upgrade container: image not changed") } + c.Unlock() var ( - needRollback = false + needRollback = false + // FIXME: do a deep copy of container? backupContainer = c ) @@ -1208,7 +1192,7 @@ func (mgr *ContainerManager) Upgrade(ctx context.Context, name string, config *t // Works fine, store new container info to disk. if err := c.Write(mgr.Store); err != nil { - logrus.Errorf("failed to update meta: %v", err) + logrus.Errorf("failed to update container %s in meta store: %v", c.ID, err) return err } @@ -1220,31 +1204,29 @@ func (mgr *ContainerManager) Top(ctx context.Context, name string, psArgs string if psArgs == "" { psArgs = "-ef" } + c, err := mgr.container(name) if err != nil { return nil, err } - c.Lock() - defer c.Unlock() - if !c.IsRunning() { - return nil, fmt.Errorf("container is not running, can not execute top command") + return nil, fmt.Errorf("container %s is not running, cannot execute top command", c.ID) } pids, err := mgr.Client.ContainerPIDs(ctx, c.ID) if err != nil { - return nil, errors.Wrapf(err, "failed to get pids of container") + return nil, errors.Wrapf(err, "failed to get pids of container %s", c.ID) } output, err := exec.Command("ps", strings.Split(psArgs, " ")...).Output() if err != nil { - return nil, errors.Wrapf(err, "error running ps") + return nil, errors.Wrapf(err, "failed to run ps command") } procList, err := parsePSOutput(output, pids) if err != nil { - return nil, errors.Wrapf(err, "parsePSOutput failed") + return nil, errors.Wrapf(err, "failed parsePSOutput") } return procList, nil @@ -1257,10 +1239,7 @@ func (mgr *ContainerManager) Resize(ctx context.Context, name string, opts types return err } - c.Lock() - defer c.Unlock() - - if !c.IsRunning() && !c.IsPaused() { + if !c.IsRunningOrPaused() { return fmt.Errorf("failed to resize container %s: container is not running", c.ID) } @@ -1272,26 +1251,22 @@ func (mgr *ContainerManager) Connect(ctx context.Context, name string, networkID c, err := mgr.container(name) if err != nil { return errors.Wrapf(err, "failed to get container: %s", name) - } else if c == nil { - return fmt.Errorf("container: %s is not exist", name) } n, err := mgr.NetworkMgr.Get(context.Background(), networkIDOrName) if err != nil { - return errors.Wrapf(err, "failed to get network: %s", networkIDOrName) - } else if n == nil { - return fmt.Errorf("network: %s is not exist", networkIDOrName) + return errors.Wrapf(err, "failed to get network %s", networkIDOrName) + } + if n == nil { + return fmt.Errorf("networ %s does not exist", networkIDOrName) } if epConfig == nil { epConfig = &types.EndpointSettings{} } - c.Lock() - defer c.Unlock() - - if c.State.Status != types.StatusRunning { - if c.State.Status == types.StatusDead { + if c.IsRunning() { + if c.IsDead() { return fmt.Errorf("Container %s is marked for removal and cannot be connected or disconnected to the network", c.ID) } @@ -1321,12 +1296,18 @@ func (mgr *ContainerManager) Disconnect(ctx context.Context, containerName, netw } // container cannot be disconnected from host network + c.Lock() networkMode := c.HostConfig.NetworkMode + c.Unlock() + if IsHost(networkMode) && IsHost(network.Mode) { return fmt.Errorf("container cannot be disconnected from host network or connected to hostnetwork ") } + c.Lock() networkSettings := c.NetworkSettings + c.Unlock() + if networkSettings == nil { return nil } @@ -1337,7 +1318,9 @@ func (mgr *ContainerManager) Disconnect(ctx context.Context, containerName, netw return fmt.Errorf("failed to disconnect container from network: container %s not attach to %s", c.Name, networkName) } + c.Lock() endpoint := mgr.buildContainerEndpoint(c) + c.Unlock() endpoint.Name = network.Name endpoint.EndpointConfig = epConfig if err := mgr.NetworkMgr.EndpointRemove(ctx, endpoint); err != nil { @@ -1350,16 +1333,18 @@ func (mgr *ContainerManager) Disconnect(ctx context.Context, containerName, netw // if container has no network attached any more, set NetworkDisabled to true // so that not setup Network Namespace when restart the container + c.Lock() if len(networkSettings.Networks) == 0 { c.Config.NetworkDisabled = true } // container meta changed, refresh the cache mgr.cache.Put(c.ID, c) + c.Unlock() // update container meta json if err := c.Write(mgr.Store); err != nil { - logrus.Errorf("failed to update meta: %v", err) + logrus.Errorf("failed to update container %s in meta store: %v", c.ID, err) return err } @@ -1440,7 +1425,9 @@ func (mgr *ContainerManager) connectToNetwork(ctx context.Context, container *Co return errors.Wrap(err, "failed to get network") } + container.Lock() endpoint := mgr.buildContainerEndpoint(container) + container.Unlock() endpoint.Name = network.Name endpoint.EndpointConfig = epConfig if _, err := mgr.NetworkMgr.EndpointCreate(ctx, endpoint); err != nil { @@ -1563,23 +1550,26 @@ func attachConfigToOptions(attach *AttachConfig) []func(*containerio.Option) { } func (mgr *ContainerManager) markStoppedAndRelease(c *Container, m *ctrd.Message) error { - c.State.Pid = -1 - c.State.FinishedAt = time.Now().UTC().Format(utils.TimeLayout) - c.State.Status = types.StatusStopped - + var ( + code int64 // container exit code used for container state setting + errMsg string // container exit error message used for container state setting + ) if m != nil { - c.State.ExitCode = int64(m.ExitCode()) + code = int64(m.ExitCode()) if err := m.RawError(); err != nil { - c.State.Error = err.Error() + errMsg = err.Error() } } + c.SetStatusStopped(code, errMsg) + // release resource if io := mgr.IOs.Get(c.ID); io != nil { io.Close() mgr.IOs.Remove(c.ID) } + c.Lock() // release network if c.NetworkSettings != nil { for name, epConfig := range c.NetworkSettings.Networks { @@ -1588,6 +1578,7 @@ func (mgr *ContainerManager) markStoppedAndRelease(c *Container, m *ctrd.Message endpoint.EndpointConfig = epConfig if err := mgr.NetworkMgr.EndpointRemove(context.Background(), endpoint); err != nil { logrus.Errorf("failed to remove endpoint: %v", err) + c.Unlock() return err } } @@ -1600,10 +1591,11 @@ func (mgr *ContainerManager) markStoppedAndRelease(c *Container, m *ctrd.Message if c.Snapshotter != nil && c.Snapshotter.Data != nil { c.Snapshotter.Data["MergedDir"] = "" } + c.Unlock() // update meta if err := c.Write(mgr.Store); err != nil { - logrus.Errorf("failed to update meta: %v", err) + logrus.Errorf("failed to update meta of container %s: %v", c.ID, err) return err } @@ -1613,43 +1605,38 @@ func (mgr *ContainerManager) markStoppedAndRelease(c *Container, m *ctrd.Message // exitedAndRelease be register into ctrd as a callback function, when the running container suddenly // exited, "ctrd" will call it to set the container's state and release resouce and so on. func (mgr *ContainerManager) exitedAndRelease(id string, m *ctrd.Message) error { - // update container info c, err := mgr.container(id) if err != nil { return err } - c.Lock() - defer c.Unlock() - if err := mgr.markStoppedAndRelease(c, m); err != nil { return err } - c.State.Status = types.StatusExited + c.SetStatusExited() if err := c.Write(mgr.Store); err != nil { logrus.Errorf("failed to update meta: %v", err) return err } // send exit event to monitor - mgr.monitor.PostEvent(ContainerExitEvent(c).WithHandle(func(container *Container) error { + mgr.monitor.PostEvent(ContainerExitEvent(c).WithHandle(func(c *Container) error { // check status and restart policy - container.Lock() - - if !container.IsExited() { - container.Unlock() + if !c.IsExited() { return nil } - policy := (*ContainerRestartPolicy)(container.HostConfig.RestartPolicy) + + c.Lock() + policy := (*ContainerRestartPolicy)(c.HostConfig.RestartPolicy) + keys := c.DetachKeys + c.Unlock() + if policy == nil || policy.IsNone() { - container.Unlock() return nil } - container.Unlock() - - return mgr.Start(context.TODO(), container.ID, container.DetachKeys) + return mgr.Start(context.TODO(), c.ID, keys) })) return nil @@ -1670,7 +1657,7 @@ func (mgr *ContainerManager) execExitedAndRelease(id string, m *ctrd.Message) er v, ok := mgr.ExecProcesses.Get(id).Result() if !ok { - return errors.Wrap(errtypes.ErrNotfound, "to be exec process: "+id) + return errors.Wrapf(errtypes.ErrNotfound, "exec process %s", id) } execConfig, ok := v.(*ContainerExecConfig) if !ok { @@ -1684,14 +1671,14 @@ func (mgr *ContainerManager) execExitedAndRelease(id string, m *ctrd.Message) er return nil } -func (mgr *ContainerManager) attachVolume(ctx context.Context, name string, meta *Container) (string, string, error) { +func (mgr *ContainerManager) attachVolume(ctx context.Context, name string, c *Container) (string, string, error) { driver := volumetypes.DefaultBackend v, err := mgr.VolumeMgr.Get(ctx, name) if err != nil || v == nil { opts := map[string]string{ "backend": driver, } - if err := mgr.VolumeMgr.Create(ctx, name, meta.HostConfig.VolumeDriver, opts, nil); err != nil { + if err := mgr.VolumeMgr.Create(ctx, name, c.HostConfig.VolumeDriver, opts, nil); err != nil { logrus.Errorf("failed to create volume: %s, err: %v", name, err) return "", "", errors.Wrap(err, "failed to create volume") } @@ -1699,7 +1686,7 @@ func (mgr *ContainerManager) attachVolume(ctx context.Context, name string, meta driver = v.Driver() } - if _, err := mgr.VolumeMgr.Attach(ctx, name, map[string]string{volumetypes.OptionRef: meta.ID}); err != nil { + if _, err := mgr.VolumeMgr.Attach(ctx, name, map[string]string{volumetypes.OptionRef: c.ID}); err != nil { logrus.Errorf("failed to attach volume: %s, err: %v", name, err) return "", "", errors.Wrap(err, "failed to attach volume") } @@ -1713,15 +1700,15 @@ func (mgr *ContainerManager) attachVolume(ctx context.Context, name string, meta return mountPath, driver, nil } -func (mgr *ContainerManager) generateMountPoints(ctx context.Context, meta *Container) error { +func (mgr *ContainerManager) generateMountPoints(ctx context.Context, c *Container) error { var err error - if meta.Config.Volumes == nil { - meta.Config.Volumes = make(map[string]interface{}) + if c.Config.Volumes == nil { + c.Config.Volumes = make(map[string]interface{}) } - if meta.Mounts == nil { - meta.Mounts = make([]*types.MountPoint, 0) + if c.Mounts == nil { + c.Mounts = make([]*types.MountPoint, 0) } // define a volume map to duplicate removal @@ -1729,28 +1716,28 @@ func (mgr *ContainerManager) generateMountPoints(ctx context.Context, meta *Cont defer func() { if err != nil { - if err := mgr.detachVolumes(ctx, meta, false); err != nil { + if err := mgr.detachVolumes(ctx, c, false); err != nil { logrus.Errorf("failed to detach volume, err: %v", err) } } }() - err = mgr.getMountPointFromVolumes(ctx, meta, volumeSet) + err = mgr.getMountPointFromVolumes(ctx, c, volumeSet) if err != nil { return errors.Wrap(err, "failed to get mount point from volumes") } - err = mgr.getMountPointFromImage(ctx, meta, volumeSet) + err = mgr.getMountPointFromImage(ctx, c, volumeSet) if err != nil { return errors.Wrap(err, "failed to get mount point from image") } - err = mgr.getMountPointFromBinds(ctx, meta, volumeSet) + err = mgr.getMountPointFromBinds(ctx, c, volumeSet) if err != nil { return errors.Wrap(err, "failed to get mount point from binds") } - err = mgr.getMountPointFromContainers(ctx, meta, volumeSet) + err = mgr.getMountPointFromContainers(ctx, c, volumeSet) if err != nil { return errors.Wrap(err, "failed to get mount point from containers") } @@ -1758,13 +1745,13 @@ func (mgr *ContainerManager) generateMountPoints(ctx context.Context, meta *Cont return nil } -func (mgr *ContainerManager) getMountPointFromBinds(ctx context.Context, meta *Container, volumeSet map[string]struct{}) error { +func (mgr *ContainerManager) getMountPointFromBinds(ctx context.Context, c *Container, volumeSet map[string]struct{}) error { var err error - logrus.Debugf("bind volumes: %v", meta.HostConfig.Binds) + logrus.Debugf("bind volumes: %v", c.HostConfig.Binds) // parse binds - for _, b := range meta.HostConfig.Binds { + for _, b := range c.HostConfig.Binds { var parts []string parts, err = opts.CheckBind(b) if err != nil { @@ -1791,7 +1778,7 @@ func (mgr *ContainerManager) getMountPointFromBinds(ctx context.Context, meta *C return errors.Errorf("unknown bind: %s", b) } - if opts.CheckDuplicateMountPoint(meta.Mounts, mp.Destination) { + if opts.CheckDuplicateMountPoint(c.Mounts, mp.Destination) { logrus.Warnf("duplicate mount point: %s", mp.Destination) continue } @@ -1800,8 +1787,8 @@ func (mgr *ContainerManager) getMountPointFromBinds(ctx context.Context, meta *C mp.Source = randomid.Generate() // Source is empty, anonymouse volume - if _, exist := meta.Config.Volumes[mp.Destination]; !exist { - meta.Config.Volumes[mp.Destination] = struct{}{} + if _, exist := c.Config.Volumes[mp.Destination]; !exist { + c.Config.Volumes[mp.Destination] = struct{}{} } } @@ -1816,7 +1803,7 @@ func (mgr *ContainerManager) getMountPointFromBinds(ctx context.Context, meta *C name := mp.Source if _, exist := volumeSet[name]; !exist { mp.Name = name - mp.Source, mp.Driver, err = mgr.attachVolume(ctx, name, meta) + mp.Source, mp.Driver, err = mgr.attachVolume(ctx, name, c) if err != nil { logrus.Errorf("failed to bind volume: %s, err: %v", name, err) return errors.Wrap(err, "failed to bind volume") @@ -1855,18 +1842,18 @@ func (mgr *ContainerManager) getMountPointFromBinds(ctx context.Context, meta *C } } - meta.Mounts = append(meta.Mounts, mp) + c.Mounts = append(c.Mounts, mp) } return nil } -func (mgr *ContainerManager) getMountPointFromVolumes(ctx context.Context, meta *Container, volumeSet map[string]struct{}) error { +func (mgr *ContainerManager) getMountPointFromVolumes(ctx context.Context, c *Container, volumeSet map[string]struct{}) error { var err error // parse volumes - for dest := range meta.Config.Volumes { - if opts.CheckDuplicateMountPoint(meta.Mounts, dest) { + for dest := range c.Config.Volumes { + if opts.CheckDuplicateMountPoint(c.Mounts, dest) { logrus.Warnf("duplicate mount point: %s from volumes", dest) continue } @@ -1881,7 +1868,7 @@ func (mgr *ContainerManager) getMountPointFromVolumes(ctx context.Context, meta mp.Name = name mp.Destination = dest - mp.Source, mp.Driver, err = mgr.attachVolume(ctx, mp.Name, meta) + mp.Source, mp.Driver, err = mgr.attachVolume(ctx, mp.Name, c) if err != nil { logrus.Errorf("failed to bind volume: %s, err: %v", mp.Name, err) return errors.Wrap(err, "failed to bind volume") @@ -1894,23 +1881,23 @@ func (mgr *ContainerManager) getMountPointFromVolumes(ctx context.Context, meta } volumeSet[mp.Name] = struct{}{} - meta.Mounts = append(meta.Mounts, mp) + c.Mounts = append(c.Mounts, mp) } return nil } -func (mgr *ContainerManager) getMountPointFromImage(ctx context.Context, meta *Container, volumeSet map[string]struct{}) error { +func (mgr *ContainerManager) getMountPointFromImage(ctx context.Context, c *Container, volumeSet map[string]struct{}) error { var err error // parse volumes from image - image, err := mgr.ImageMgr.GetImage(ctx, strings.TrimPrefix(meta.Image, digest.Canonical.String()+":")) + image, err := mgr.ImageMgr.GetImage(ctx, strings.TrimPrefix(c.Image, digest.Canonical.String()+":")) if err != nil { - return errors.Wrapf(err, "failed to get image: %s", meta.Image) + return errors.Wrapf(err, "failed to get image: %s", c.Image) } for dest := range image.Config.Volumes { - if _, exist := meta.Config.Volumes[dest]; !exist { - meta.Config.Volumes[dest] = struct{}{} + if _, exist := c.Config.Volumes[dest]; !exist { + c.Config.Volumes[dest] = struct{}{} } // check if volume has been created @@ -1919,7 +1906,7 @@ func (mgr *ContainerManager) getMountPointFromImage(ctx context.Context, meta *C continue } - if opts.CheckDuplicateMountPoint(meta.Mounts, dest) { + if opts.CheckDuplicateMountPoint(c.Mounts, dest) { logrus.Warnf("duplicate mount point: %s from image", dest) continue } @@ -1928,7 +1915,7 @@ func (mgr *ContainerManager) getMountPointFromImage(ctx context.Context, meta *C mp.Name = name mp.Destination = dest - mp.Source, mp.Driver, err = mgr.attachVolume(ctx, mp.Name, meta) + mp.Source, mp.Driver, err = mgr.attachVolume(ctx, mp.Name, c) if err != nil { logrus.Errorf("failed to bind volume: %s, err: %v", mp.Name, err) return errors.Wrap(err, "failed to bind volume") @@ -1941,7 +1928,7 @@ func (mgr *ContainerManager) getMountPointFromImage(ctx context.Context, meta *C } volumeSet[mp.Name] = struct{}{} - meta.Mounts = append(meta.Mounts, mp) + c.Mounts = append(c.Mounts, mp) } return nil @@ -2132,6 +2119,8 @@ func (mgr *ContainerManager) detachVolumes(ctx context.Context, c *Container, re return nil } +// buildContainerEndpoint builds Endpoints according to container +// caller should lock container when calling this func. func (mgr *ContainerManager) buildContainerEndpoint(c *Container) *networktypes.Endpoint { ep := &networktypes.Endpoint{ Owner: c.ID, @@ -2169,7 +2158,9 @@ func (mgr *ContainerManager) setBaseFS(ctx context.Context, c *Container, id str } // io.containerd.runtime.v1.linux as a const used by runc + c.Lock() c.BaseFS = filepath.Join(mgr.Config.HomeDir, "containerd/state", "io.containerd.runtime.v1.linux", namespaces.Default, info.Name, "rootfs") + c.Unlock() } // execProcessGC cleans unused exec processes config every 5 minutes. diff --git a/daemon/mgr/container_exec.go b/daemon/mgr/container_exec.go index 8ec3e58935..dcb57ccb72 100644 --- a/daemon/mgr/container_exec.go +++ b/daemon/mgr/container_exec.go @@ -61,6 +61,7 @@ func (mgr *ContainerManager) StartExec(ctx context.Context, execid string, confi return err } + c.Lock() process := &specs.Process{ Args: execConfig.Cmd, Terminal: execConfig.Tty, @@ -73,13 +74,16 @@ func (mgr *ContainerManager) StartExec(ctx context.Context, execid string, confi } if err = setupUser(ctx, c, &specs.Spec{Process: process}); err != nil { + c.Unlock() return err } // set exec process ulimit if err := setupRlimits(ctx, c.HostConfig, &specs.Spec{Process: process}); err != nil { + c.Unlock() return err } + c.Unlock() execConfig.Running = true defer func() { diff --git a/daemon/mgr/container_state.go b/daemon/mgr/container_state.go new file mode 100644 index 0000000000..89476308bd --- /dev/null +++ b/daemon/mgr/container_state.go @@ -0,0 +1,139 @@ +package mgr + +import ( + "time" + + "github.com/alibaba/pouch/apis/types" + "github.com/alibaba/pouch/pkg/utils" +) + +// IsRunning returns container is running or not. +func (c *Container) IsRunning() bool { + c.Lock() + defer c.Unlock() + return c.State.Status == types.StatusRunning +} + +// IsStopped returns container is stopped or not. +func (c *Container) IsStopped() bool { + c.Lock() + defer c.Unlock() + return c.State.Status == types.StatusStopped +} + +// IsExited returns container is exited or not. +func (c *Container) IsExited() bool { + c.Lock() + defer c.Unlock() + return c.State.Status == types.StatusExited +} + +// IsCreated returns container is created or not. +func (c *Container) IsCreated() bool { + c.Lock() + defer c.Unlock() + return c.State.Status == types.StatusCreated +} + +// IsPaused returns container is paused or not. +func (c *Container) IsPaused() bool { + c.Lock() + defer c.Unlock() + return c.State.Status == types.StatusPaused +} + +// IsDead returns container is dead or not. +func (c *Container) IsDead() bool { + c.Lock() + defer c.Unlock() + return c.State.Status == types.StatusDead +} + +// IsRunningOrPaused returns true of container is running or paused. +func (c *Container) IsRunningOrPaused() bool { + c.Lock() + defer c.Unlock() + if c.State.Status == types.StatusRunning { + return true + } + if c.State.Status == types.StatusPaused { + return true + } + return false +} + +// IsRestarting returns container is restarting or not. +func (c *Container) IsRestarting() bool { + c.Lock() + defer c.Unlock() + return c.State.Status == types.StatusRestarting +} + +// SetStatusRunning sets a container to be status running. +// When a container's status turns to StatusStopped, the following fields need updated: +// Status -> StatusRunning +// StartAt -> time.Now() +// Pid -> input param +// ExitCode -> 0 +func (c *Container) SetStatusRunning(pid int64) { + c.Lock() + defer c.Unlock() + if c.State == nil { + c.State = &types.ContainerState{} + } + c.State.Status = types.StatusRunning + c.State.StartedAt = time.Now().UTC().Format(utils.TimeLayout) + c.State.Pid = pid + c.State.ExitCode = 0 +} + +// SetStatusStopped sets a container to be status stopped. +// When a container's status turns to StatusStopped, the following fields need updated: +// Status -> StatusStopped +// FinishedAt -> time.Now() +// Pid -> -1 +// ExitCode -> input param +// Error -> input param +func (c *Container) SetStatusStopped(exitCode int64, errMsg string) { + c.Lock() + defer c.Unlock() + if c.State == nil { + c.State = &types.ContainerState{} + } + c.State.Status = types.StatusStopped + c.State.FinishedAt = time.Now().UTC().Format(utils.TimeLayout) + c.State.Pid = -1 + c.State.ExitCode = exitCode + c.State.Error = errMsg +} + +// SetStatusExited sets a container to be status exited. +func (c *Container) SetStatusExited() { + c.Lock() + defer c.Unlock() + if c.State == nil { + c.State = &types.ContainerState{} + } + c.State.Status = types.StatusExited +} + +// SetStatusPaused sets a container to be status paused. +func (c *Container) SetStatusPaused() { + c.Lock() + defer c.Unlock() + if c.State == nil { + c.State = &types.ContainerState{} + } + c.State.Status = types.StatusPaused +} + +// SetStatusUnpaused sets a container to be status running. +// Unpaused is treated running. +func (c *Container) SetStatusUnpaused() { + c.Lock() + defer c.Unlock() + if c.State == nil { + c.State = &types.ContainerState{} + } + c.State.Status = types.StatusRunning +} diff --git a/daemon/mgr/container_types.go b/daemon/mgr/container_types.go index 135f35f68d..e718ba14e4 100644 --- a/daemon/mgr/container_types.go +++ b/daemon/mgr/container_types.go @@ -184,39 +184,11 @@ type Container struct { // Key returns container's id. func (c *Container) Key() string { + c.Lock() + defer c.Unlock() return c.ID } -// IsRunning returns container is running or not. -func (c *Container) IsRunning() bool { - return c.State.Status == types.StatusRunning -} - -// IsStopped returns container is stopped or not. -func (c *Container) IsStopped() bool { - return c.State.Status == types.StatusStopped -} - -// IsExited returns container is exited or not. -func (c *Container) IsExited() bool { - return c.State.Status == types.StatusExited -} - -// IsCreated returns container is created or not. -func (c *Container) IsCreated() bool { - return c.State.Status == types.StatusCreated -} - -// IsPaused returns container is paused or not. -func (c *Container) IsPaused() bool { - return c.State.Status == types.StatusPaused -} - -// IsRestarting returns container is restarting or not. -func (c *Container) IsRestarting() bool { - return c.State.Status == types.StatusRestarting -} - // Write writes container's meta data into meta store. func (c *Container) Write(store *meta.Store) error { return store.Put(c) @@ -224,6 +196,8 @@ func (c *Container) Write(store *meta.Store) error { // StopTimeout returns the timeout (in seconds) used to stop the container. func (c *Container) StopTimeout() int64 { + c.Lock() + defer c.Unlock() if c.Config.StopTimeout != nil { return *c.Config.StopTimeout } @@ -231,6 +205,8 @@ func (c *Container) StopTimeout() int64 { } func (c *Container) merge(getconfig func() (v1.ImageConfig, error)) error { + c.Lock() + defer c.Unlock() config, err := getconfig() if err != nil { return err @@ -258,6 +234,8 @@ func (c *Container) merge(getconfig func() (v1.ImageConfig, error)) error { // FormatStatus format container status func (c *Container) FormatStatus() (string, error) { + c.Lock() + defer c.Unlock() var status string switch c.State.Status { diff --git a/daemon/mgr/spec.go b/daemon/mgr/spec.go index eefff58d98..5a916fc712 100644 --- a/daemon/mgr/spec.go +++ b/daemon/mgr/spec.go @@ -20,8 +20,14 @@ type SpecWrapper struct { argsArr [][]string } +// All the functions related to the spec is lock-free for container instance, +// so when calling functions here like createSpec, setupProcess, setupMounts, +// setupUser and so on, caller should explicitly add lock for container instance. + // createSpec create a runtime-spec. func createSpec(ctx context.Context, c *Container, specWrapper *SpecWrapper) error { + c.Lock() + defer c.Unlock() // new a default spec from containerd. s, err := ctrd.NewDefaultSpec(ctx, c.ID) if err != nil { diff --git a/test/cli_top_test.go b/test/cli_top_test.go index 284a569fe1..c77803e11d 100644 --- a/test/cli_top_test.go +++ b/test/cli_top_test.go @@ -41,7 +41,7 @@ func (suite *PouchTopSuite) TestTopStoppedContainer(c *check.C) { res := command.PouchRun("top", name) c.Assert(res.Error, check.NotNil) - expectString := "container is not running, can not execute top command" + expectString := " is not running, cannot execute top command" if out := res.Combined(); !strings.Contains(out, expectString) { // FIXME(ziren): for debug top error info is empty fmt.Printf("%+v", res)