From 86bd34f6bb116fb19fa145e24d418f9a6ad5ea39 Mon Sep 17 00:00:00 2001 From: Kir Kolyshkin Date: Tue, 4 May 2021 18:10:59 -0700 Subject: [PATCH] libct/cg/sd: use global dbus connection Using per cgroup manager dbus connection instances means that every cgroup manager instance gets a new connection, and those connections are never closed, ultimately resulting in open file descriptions limit being hit. Revert back to using a single global dbus connection for everything. Signed-off-by: Kir Kolyshkin --- libcontainer/cgroups/systemd/common.go | 32 +++++------ libcontainer/cgroups/systemd/dbus.go | 79 ++++++++++++-------------- libcontainer/cgroups/systemd/v1.go | 16 +++--- libcontainer/cgroups/systemd/v2.go | 26 ++++----- 4 files changed, 70 insertions(+), 83 deletions(-) diff --git a/libcontainer/cgroups/systemd/common.go b/libcontainer/cgroups/systemd/common.go index 8d2aa5da439..52e24a344c4 100644 --- a/libcontainer/cgroups/systemd/common.go +++ b/libcontainer/cgroups/systemd/common.go @@ -319,9 +319,9 @@ func isUnitExists(err error) bool { return isDbusError(err, "org.freedesktop.systemd1.UnitExists") } -func startUnit(cm *dbusConnManager, unitName string, properties []systemdDbus.Property) error { +func startUnit(rootless bool, unitName string, properties []systemdDbus.Property) error { statusChan := make(chan string, 1) - err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error { + err := dbusCall(rootless, func(c *systemdDbus.Conn) error { _, err := c.StartTransientUnitContext(context.TODO(), unitName, "replace", properties, statusChan) return err }) @@ -334,11 +334,11 @@ func startUnit(cm *dbusConnManager, unitName string, properties []systemdDbus.Pr close(statusChan) // Please refer to https://pkg.go.dev/github.com/coreos/go-systemd/v22/dbus#Conn.StartUnit if s != "done" { - resetFailedUnit(cm, unitName) + resetFailedUnit(rootless, unitName) return errors.Errorf("error creating systemd unit `%s`: got `%s`", unitName, s) } case <-timeout.C: - resetFailedUnit(cm, unitName) + resetFailedUnit(rootless, unitName) return errors.New("Timeout waiting for systemd to create " + unitName) } } else if !isUnitExists(err) { @@ -348,9 +348,9 @@ func startUnit(cm *dbusConnManager, unitName string, properties []systemdDbus.Pr return nil } -func stopUnit(cm *dbusConnManager, unitName string) error { +func stopUnit(rootless bool, unitName string) error { statusChan := make(chan string, 1) - err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error { + err := dbusCall(rootless, func(c *systemdDbus.Conn) error { _, err := c.StopUnitContext(context.TODO(), unitName, "replace", statusChan) return err }) @@ -369,8 +369,8 @@ func stopUnit(cm *dbusConnManager, unitName string) error { return nil } -func resetFailedUnit(cm *dbusConnManager, name string) { - err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error { +func resetFailedUnit(rootless bool, name string) { + err := dbusCall(rootless, func(c *systemdDbus.Conn) error { return c.ResetFailedUnitContext(context.TODO(), name) }) if err != nil { @@ -378,17 +378,17 @@ func resetFailedUnit(cm *dbusConnManager, name string) { } } -func setUnitProperties(cm *dbusConnManager, name string, properties ...systemdDbus.Property) error { - return cm.retryOnDisconnect(func(c *systemdDbus.Conn) error { +func setUnitProperties(rootless bool, name string, properties ...systemdDbus.Property) error { + return dbusCall(rootless, func(c *systemdDbus.Conn) error { return c.SetUnitPropertiesContext(context.TODO(), name, true, properties...) }) } -func systemdVersion(cm *dbusConnManager) int { +func systemdVersion(rootless bool) int { versionOnce.Do(func() { version = -1 var verStr string - err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error { + err := dbusCall(rootless, func(c *systemdDbus.Conn) error { var err error verStr, err = c.GetManagerProperty("Version") return err @@ -420,10 +420,10 @@ func systemdVersionAtoi(verStr string) (int, error) { return ver, errors.Wrapf(err, "can't parse version %s", verStr) } -func addCpuQuota(cm *dbusConnManager, properties *[]systemdDbus.Property, quota int64, period uint64) { +func addCpuQuota(rootless bool, properties *[]systemdDbus.Property, quota int64, period uint64) { if period != 0 { // systemd only supports CPUQuotaPeriodUSec since v242 - sdVer := systemdVersion(cm) + sdVer := systemdVersion(rootless) if sdVer >= 242 { *properties = append(*properties, newProp("CPUQuotaPeriodUSec", period)) @@ -454,13 +454,13 @@ func addCpuQuota(cm *dbusConnManager, properties *[]systemdDbus.Property, quota } } -func addCpuset(cm *dbusConnManager, props *[]systemdDbus.Property, cpus, mems string) error { +func addCpuset(rootless bool, props *[]systemdDbus.Property, cpus, mems string) error { if cpus == "" && mems == "" { return nil } // systemd only supports AllowedCPUs/AllowedMemoryNodes since v244 - sdVer := systemdVersion(cm) + sdVer := systemdVersion(rootless) if sdVer < 244 { logrus.Debugf("systemd v%d is too old to support AllowedCPUs/AllowedMemoryNodes"+ " (settings will still be applied to cgroupfs)", sdVer) diff --git a/libcontainer/cgroups/systemd/dbus.go b/libcontainer/cgroups/systemd/dbus.go index deca16b005b..55d86ef8519 100644 --- a/libcontainer/cgroups/systemd/dbus.go +++ b/libcontainer/cgroups/systemd/dbus.go @@ -10,74 +10,65 @@ import ( dbus "github.com/godbus/dbus/v5" ) -type dbusConnManager struct { - conn *systemdDbus.Conn - rootless bool - sync.RWMutex -} - -// newDbusConnManager initializes systemd dbus connection manager. -func newDbusConnManager(rootless bool) *dbusConnManager { - return &dbusConnManager{ - rootless: rootless, - } -} +var ( + dbusC *systemdDbus.Conn + dbusMu sync.RWMutex +) -// getConnection lazily initializes and returns systemd dbus connection. -func (d *dbusConnManager) getConnection() (*systemdDbus.Conn, error) { - // In the case where d.conn != nil - // Use the read lock the first time to ensure - // that Conn can be acquired at the same time. - d.RLock() - if conn := d.conn; conn != nil { - d.RUnlock() +// dbusGet lazily initializes and returns systemd dbus connection. +func dbusGet(rootless bool) (*systemdDbus.Conn, error) { + // In the case where dbusC != nil, use the read lock the first time + // to ensure that the connection can be acquired at the same time. + dbusMu.RLock() + if conn := dbusC; conn != nil { + dbusMu.RUnlock() return conn, nil } - d.RUnlock() + dbusMu.RUnlock() - // In the case where d.conn == nil - // Use write lock to ensure that only one - // will be created - d.Lock() - defer d.Unlock() - if conn := d.conn; conn != nil { + // In the case where dbusC == nil, use write lock to ensure + // that only one will be created. + dbusMu.Lock() + defer dbusMu.Unlock() + if conn := dbusC; conn != nil { return conn, nil } - conn, err := d.newConnection() + conn, err := dbusNew(rootless) if err != nil { return nil, err } - d.conn = conn + dbusC = conn return conn, nil } -func (d *dbusConnManager) newConnection() (*systemdDbus.Conn, error) { - if d.rootless { +func dbusNew(rootless bool) (*systemdDbus.Conn, error) { + if rootless { return newUserSystemdDbus() } return systemdDbus.NewWithContext(context.TODO()) } -// resetConnection resets the connection to its initial state +// dbusReset resets the dbus connection to its initial state // (so it can be reconnected if necessary). -func (d *dbusConnManager) resetConnection(conn *systemdDbus.Conn) { - d.Lock() - defer d.Unlock() - if d.conn != nil && d.conn == conn { - d.conn.Close() - d.conn = nil +func dbusReset(rootless bool, conn *systemdDbus.Conn) { + dbusMu.Lock() + defer dbusMu.Unlock() + if dbusC != nil && dbusC == conn { + dbusC.Close() + dbusC = nil } } var errDbusConnClosed = dbus.ErrClosed.Error() -// retryOnDisconnect calls op, and if the error it returns is about closed dbus -// connection, the connection is re-established and the op is retried. This helps -// with the situation when dbus is restarted and we have a stale connection. -func (d *dbusConnManager) retryOnDisconnect(op func(*systemdDbus.Conn) error) error { +// dbusCall calls op with dbus connection as the argument, and if the error it +// returns is about closed dbus connection, the connection is re-established +// and the op is retried. This helps with the situation when dbus is restarted +// and we have a stale connection. +func dbusCall(rootless bool, op func(*systemdDbus.Conn) error) error { for { - conn, err := d.getConnection() + conn, err := dbusGet(rootless) if err != nil { return err } @@ -85,6 +76,6 @@ func (d *dbusConnManager) retryOnDisconnect(op func(*systemdDbus.Conn) error) er if !isDbusError(err, errDbusConnClosed) { return err } - d.resetConnection(conn) + dbusReset(rootless, conn) } } diff --git a/libcontainer/cgroups/systemd/v1.go b/libcontainer/cgroups/systemd/v1.go index 41de6e8b70f..78bfc45b5de 100644 --- a/libcontainer/cgroups/systemd/v1.go +++ b/libcontainer/cgroups/systemd/v1.go @@ -20,14 +20,12 @@ type legacyManager struct { mu sync.Mutex cgroups *configs.Cgroup paths map[string]string - dbus *dbusConnManager } func NewLegacyManager(cg *configs.Cgroup, paths map[string]string) cgroups.Manager { return &legacyManager{ cgroups: cg, paths: paths, - dbus: newDbusConnManager(false), } } @@ -58,7 +56,7 @@ var legacySubsystems = []subsystem{ &fs.NameGroup{GroupName: "name=systemd"}, } -func genV1ResourcesProperties(r *configs.Resources, cm *dbusConnManager) ([]systemdDbus.Property, error) { +func genV1ResourcesProperties(r *configs.Resources) ([]systemdDbus.Property, error) { var properties []systemdDbus.Property deviceProperties, err := generateDeviceProperties(r.Devices) @@ -77,7 +75,7 @@ func genV1ResourcesProperties(r *configs.Resources, cm *dbusConnManager) ([]syst newProp("CPUShares", r.CpuShares)) } - addCpuQuota(cm, &properties, r.CpuQuota, r.CpuPeriod) + addCpuQuota(false, &properties, r.CpuQuota, r.CpuPeriod) if r.BlkioWeight != 0 { properties = append(properties, @@ -89,7 +87,7 @@ func genV1ResourcesProperties(r *configs.Resources, cm *dbusConnManager) ([]syst newProp("TasksMax", uint64(r.PidsLimit))) } - err = addCpuset(cm, &properties, r.CpusetCpus, r.CpusetMems) + err = addCpuset(false, &properties, r.CpusetCpus, r.CpusetMems) if err != nil { return nil, err } @@ -167,7 +165,7 @@ func (m *legacyManager) Apply(pid int) error { properties = append(properties, c.SystemdProps...) - if err := startUnit(m.dbus, unitName, properties); err != nil { + if err := startUnit(false, unitName, properties); err != nil { return err } @@ -205,7 +203,7 @@ func (m *legacyManager) Destroy() error { m.mu.Lock() defer m.mu.Unlock() - stopErr := stopUnit(m.dbus, getUnitName(m.cgroups)) + stopErr := stopUnit(false, getUnitName(m.cgroups)) // Both on success and on error, cleanup all the cgroups we are aware of. // Some of them were created directly by Apply() and are not managed by systemd. @@ -333,7 +331,7 @@ func (m *legacyManager) Set(r *configs.Resources) error { if r.Unified != nil { return cgroups.ErrV1NoUnified } - properties, err := genV1ResourcesProperties(r, m.dbus) + properties, err := genV1ResourcesProperties(r) if err != nil { return err } @@ -361,7 +359,7 @@ func (m *legacyManager) Set(r *configs.Resources) error { } } - if err := setUnitProperties(m.dbus, getUnitName(m.cgroups), properties...); err != nil { + if err := setUnitProperties(false, getUnitName(m.cgroups), properties...); err != nil { _ = m.Freeze(targetFreezerState) return err } diff --git a/libcontainer/cgroups/systemd/v2.go b/libcontainer/cgroups/systemd/v2.go index 3dc1f1cc08f..48de16b3787 100644 --- a/libcontainer/cgroups/systemd/v2.go +++ b/libcontainer/cgroups/systemd/v2.go @@ -26,7 +26,6 @@ type unifiedManager struct { // path is like "/sys/fs/cgroup/user.slice/user-1001.slice/session-1.scope" path string rootless bool - dbus *dbusConnManager } func NewUnifiedManager(config *configs.Cgroup, path string, rootless bool) cgroups.Manager { @@ -34,7 +33,6 @@ func NewUnifiedManager(config *configs.Cgroup, path string, rootless bool) cgrou cgroups: config, path: path, rootless: rootless, - dbus: newDbusConnManager(rootless), } } @@ -47,7 +45,7 @@ func NewUnifiedManager(config *configs.Cgroup, path string, rootless bool) cgrou // For the list of keys, see https://www.kernel.org/doc/Documentation/cgroup-v2.txt // // For the list of systemd unit properties, see systemd.resource-control(5). -func unifiedResToSystemdProps(cm *dbusConnManager, res map[string]string) (props []systemdDbus.Property, _ error) { +func unifiedResToSystemdProps(rootless bool, res map[string]string) (props []systemdDbus.Property, _ error) { var err error for k, v := range res { @@ -85,7 +83,7 @@ func unifiedResToSystemdProps(cm *dbusConnManager, res map[string]string) (props return nil, fmt.Errorf("unified resource %q quota value conversion error: %w", k, err) } } - addCpuQuota(cm, &props, quota, period) + addCpuQuota(rootless, &props, quota, period) case "cpu.weight": num, err := strconv.ParseUint(v, 10, 64) @@ -105,7 +103,7 @@ func unifiedResToSystemdProps(cm *dbusConnManager, res map[string]string) (props "cpuset.mems": "AllowedMemoryNodes", } // systemd only supports these properties since v244 - sdVer := systemdVersion(cm) + sdVer := systemdVersion(rootless) if sdVer >= 244 { props = append(props, newProp(m[k], bits)) @@ -164,7 +162,7 @@ func unifiedResToSystemdProps(cm *dbusConnManager, res map[string]string) (props return props, nil } -func genV2ResourcesProperties(r *configs.Resources, cm *dbusConnManager) ([]systemdDbus.Property, error) { +func genV2ResourcesProperties(r *configs.Resources, rootless bool) ([]systemdDbus.Property, error) { var properties []systemdDbus.Property // NOTE: This is of questionable correctness because we insert our own @@ -201,14 +199,14 @@ func genV2ResourcesProperties(r *configs.Resources, cm *dbusConnManager) ([]syst newProp("CPUWeight", r.CpuWeight)) } - addCpuQuota(cm, &properties, r.CpuQuota, r.CpuPeriod) + addCpuQuota(rootless, &properties, r.CpuQuota, r.CpuPeriod) if r.PidsLimit > 0 || r.PidsLimit == -1 { properties = append(properties, newProp("TasksMax", uint64(r.PidsLimit))) } - err = addCpuset(cm, &properties, r.CpusetCpus, r.CpusetMems) + err = addCpuset(rootless, &properties, r.CpusetCpus, r.CpusetMems) if err != nil { return nil, err } @@ -217,7 +215,7 @@ func genV2ResourcesProperties(r *configs.Resources, cm *dbusConnManager) ([]syst // convert Resources.Unified map to systemd properties if r.Unified != nil { - unifiedProps, err := unifiedResToSystemdProps(cm, r.Unified) + unifiedProps, err := unifiedResToSystemdProps(rootless, r.Unified) if err != nil { return nil, err } @@ -282,7 +280,7 @@ func (m *unifiedManager) Apply(pid int) error { properties = append(properties, c.SystemdProps...) - if err := startUnit(m.dbus, unitName, properties); err != nil { + if err := startUnit(m.rootless, unitName, properties); err != nil { return errors.Wrapf(err, "error while starting unit %q with properties %+v", unitName, properties) } @@ -303,7 +301,7 @@ func (m *unifiedManager) Destroy() error { defer m.mu.Unlock() unitName := getUnitName(m.cgroups) - if err := stopUnit(m.dbus, unitName); err != nil { + if err := stopUnit(m.rootless, unitName); err != nil { return err } @@ -338,7 +336,7 @@ func (m *unifiedManager) getSliceFull() (string, error) { if m.rootless { var managerCGQuoted string - err := m.dbus.retryOnDisconnect(func(c *systemdDbus.Conn) error { + err := dbusCall(m.rootless, func(c *systemdDbus.Conn) error { var err error managerCGQuoted, err = c.GetManagerProperty("ControlGroup") return err @@ -420,7 +418,7 @@ func (m *unifiedManager) GetStats() (*cgroups.Stats, error) { } func (m *unifiedManager) Set(r *configs.Resources) error { - properties, err := genV2ResourcesProperties(r, m.dbus) + properties, err := genV2ResourcesProperties(r, m.rootless) if err != nil { return err } @@ -448,7 +446,7 @@ func (m *unifiedManager) Set(r *configs.Resources) error { } } - if err := setUnitProperties(m.dbus, getUnitName(m.cgroups), properties...); err != nil { + if err := setUnitProperties(m.rootless, getUnitName(m.cgroups), properties...); err != nil { _ = m.Freeze(targetFreezerState) return errors.Wrap(err, "error while setting unit properties") }