Skip to content

Commit

Permalink
libct/cg/sd: use global dbus connection
Browse files Browse the repository at this point in the history
Using per cgroup manager dbus connection instances means
that every cgroup manager instance gets a new connection,
and those connections are never closed, ultimately resulting
in open file descriptions limit being hit.

Revert back to using a single global dbus connection for everything.

Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
  • Loading branch information
kolyshkin committed May 5, 2021
1 parent b5e05d0 commit 86bd34f
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 83 deletions.
32 changes: 16 additions & 16 deletions libcontainer/cgroups/systemd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,9 @@ func isUnitExists(err error) bool {
return isDbusError(err, "org.freedesktop.systemd1.UnitExists")
}

func startUnit(cm *dbusConnManager, unitName string, properties []systemdDbus.Property) error {
func startUnit(rootless bool, unitName string, properties []systemdDbus.Property) error {
statusChan := make(chan string, 1)
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
err := dbusCall(rootless, func(c *systemdDbus.Conn) error {
_, err := c.StartTransientUnitContext(context.TODO(), unitName, "replace", properties, statusChan)
return err
})
Expand All @@ -334,11 +334,11 @@ func startUnit(cm *dbusConnManager, unitName string, properties []systemdDbus.Pr
close(statusChan)
// Please refer to https://pkg.go.dev/github.com/coreos/go-systemd/v22/dbus#Conn.StartUnit
if s != "done" {
resetFailedUnit(cm, unitName)
resetFailedUnit(rootless, unitName)
return errors.Errorf("error creating systemd unit `%s`: got `%s`", unitName, s)
}
case <-timeout.C:
resetFailedUnit(cm, unitName)
resetFailedUnit(rootless, unitName)
return errors.New("Timeout waiting for systemd to create " + unitName)
}
} else if !isUnitExists(err) {
Expand All @@ -348,9 +348,9 @@ func startUnit(cm *dbusConnManager, unitName string, properties []systemdDbus.Pr
return nil
}

func stopUnit(cm *dbusConnManager, unitName string) error {
func stopUnit(rootless bool, unitName string) error {
statusChan := make(chan string, 1)
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
err := dbusCall(rootless, func(c *systemdDbus.Conn) error {
_, err := c.StopUnitContext(context.TODO(), unitName, "replace", statusChan)
return err
})
Expand All @@ -369,26 +369,26 @@ func stopUnit(cm *dbusConnManager, unitName string) error {
return nil
}

func resetFailedUnit(cm *dbusConnManager, name string) {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
func resetFailedUnit(rootless bool, name string) {
err := dbusCall(rootless, func(c *systemdDbus.Conn) error {
return c.ResetFailedUnitContext(context.TODO(), name)
})
if err != nil {
logrus.Warnf("unable to reset failed unit: %v", err)
}
}

func setUnitProperties(cm *dbusConnManager, name string, properties ...systemdDbus.Property) error {
return cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
func setUnitProperties(rootless bool, name string, properties ...systemdDbus.Property) error {
return dbusCall(rootless, func(c *systemdDbus.Conn) error {
return c.SetUnitPropertiesContext(context.TODO(), name, true, properties...)
})
}

func systemdVersion(cm *dbusConnManager) int {
func systemdVersion(rootless bool) int {
versionOnce.Do(func() {
version = -1
var verStr string
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
err := dbusCall(rootless, func(c *systemdDbus.Conn) error {
var err error
verStr, err = c.GetManagerProperty("Version")
return err
Expand Down Expand Up @@ -420,10 +420,10 @@ func systemdVersionAtoi(verStr string) (int, error) {
return ver, errors.Wrapf(err, "can't parse version %s", verStr)
}

func addCpuQuota(cm *dbusConnManager, properties *[]systemdDbus.Property, quota int64, period uint64) {
func addCpuQuota(rootless bool, properties *[]systemdDbus.Property, quota int64, period uint64) {
if period != 0 {
// systemd only supports CPUQuotaPeriodUSec since v242
sdVer := systemdVersion(cm)
sdVer := systemdVersion(rootless)
if sdVer >= 242 {
*properties = append(*properties,
newProp("CPUQuotaPeriodUSec", period))
Expand Down Expand Up @@ -454,13 +454,13 @@ func addCpuQuota(cm *dbusConnManager, properties *[]systemdDbus.Property, quota
}
}

func addCpuset(cm *dbusConnManager, props *[]systemdDbus.Property, cpus, mems string) error {
func addCpuset(rootless bool, props *[]systemdDbus.Property, cpus, mems string) error {
if cpus == "" && mems == "" {
return nil
}

// systemd only supports AllowedCPUs/AllowedMemoryNodes since v244
sdVer := systemdVersion(cm)
sdVer := systemdVersion(rootless)
if sdVer < 244 {
logrus.Debugf("systemd v%d is too old to support AllowedCPUs/AllowedMemoryNodes"+
" (settings will still be applied to cgroupfs)", sdVer)
Expand Down
79 changes: 35 additions & 44 deletions libcontainer/cgroups/systemd/dbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,81 +10,72 @@ import (
dbus "github.com/godbus/dbus/v5"
)

type dbusConnManager struct {
conn *systemdDbus.Conn
rootless bool
sync.RWMutex
}

// newDbusConnManager initializes systemd dbus connection manager.
func newDbusConnManager(rootless bool) *dbusConnManager {
return &dbusConnManager{
rootless: rootless,
}
}
var (
dbusC *systemdDbus.Conn
dbusMu sync.RWMutex
)

// getConnection lazily initializes and returns systemd dbus connection.
func (d *dbusConnManager) getConnection() (*systemdDbus.Conn, error) {
// In the case where d.conn != nil
// Use the read lock the first time to ensure
// that Conn can be acquired at the same time.
d.RLock()
if conn := d.conn; conn != nil {
d.RUnlock()
// dbusGet lazily initializes and returns systemd dbus connection.
func dbusGet(rootless bool) (*systemdDbus.Conn, error) {
// In the case where dbusC != nil, use the read lock the first time
// to ensure that the connection can be acquired at the same time.
dbusMu.RLock()
if conn := dbusC; conn != nil {
dbusMu.RUnlock()
return conn, nil
}
d.RUnlock()
dbusMu.RUnlock()

// In the case where d.conn == nil
// Use write lock to ensure that only one
// will be created
d.Lock()
defer d.Unlock()
if conn := d.conn; conn != nil {
// In the case where dbusC == nil, use write lock to ensure
// that only one will be created.
dbusMu.Lock()
defer dbusMu.Unlock()
if conn := dbusC; conn != nil {
return conn, nil
}

conn, err := d.newConnection()
conn, err := dbusNew(rootless)
if err != nil {
return nil, err
}
d.conn = conn
dbusC = conn
return conn, nil
}

func (d *dbusConnManager) newConnection() (*systemdDbus.Conn, error) {
if d.rootless {
func dbusNew(rootless bool) (*systemdDbus.Conn, error) {
if rootless {
return newUserSystemdDbus()
}
return systemdDbus.NewWithContext(context.TODO())
}

// resetConnection resets the connection to its initial state
// dbusReset resets the dbus connection to its initial state
// (so it can be reconnected if necessary).
func (d *dbusConnManager) resetConnection(conn *systemdDbus.Conn) {
d.Lock()
defer d.Unlock()
if d.conn != nil && d.conn == conn {
d.conn.Close()
d.conn = nil
func dbusReset(rootless bool, conn *systemdDbus.Conn) {
dbusMu.Lock()
defer dbusMu.Unlock()
if dbusC != nil && dbusC == conn {
dbusC.Close()
dbusC = nil
}
}

var errDbusConnClosed = dbus.ErrClosed.Error()

// retryOnDisconnect calls op, and if the error it returns is about closed dbus
// connection, the connection is re-established and the op is retried. This helps
// with the situation when dbus is restarted and we have a stale connection.
func (d *dbusConnManager) retryOnDisconnect(op func(*systemdDbus.Conn) error) error {
// dbusCall calls op with dbus connection as the argument, and if the error it
// returns is about closed dbus connection, the connection is re-established
// and the op is retried. This helps with the situation when dbus is restarted
// and we have a stale connection.
func dbusCall(rootless bool, op func(*systemdDbus.Conn) error) error {
for {
conn, err := d.getConnection()
conn, err := dbusGet(rootless)
if err != nil {
return err
}
err = op(conn)
if !isDbusError(err, errDbusConnClosed) {
return err
}
d.resetConnection(conn)
dbusReset(rootless, conn)
}
}
16 changes: 7 additions & 9 deletions libcontainer/cgroups/systemd/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ type legacyManager struct {
mu sync.Mutex
cgroups *configs.Cgroup
paths map[string]string
dbus *dbusConnManager
}

func NewLegacyManager(cg *configs.Cgroup, paths map[string]string) cgroups.Manager {
return &legacyManager{
cgroups: cg,
paths: paths,
dbus: newDbusConnManager(false),
}
}

Expand Down Expand Up @@ -58,7 +56,7 @@ var legacySubsystems = []subsystem{
&fs.NameGroup{GroupName: "name=systemd"},
}

func genV1ResourcesProperties(r *configs.Resources, cm *dbusConnManager) ([]systemdDbus.Property, error) {
func genV1ResourcesProperties(r *configs.Resources) ([]systemdDbus.Property, error) {
var properties []systemdDbus.Property

deviceProperties, err := generateDeviceProperties(r.Devices)
Expand All @@ -77,7 +75,7 @@ func genV1ResourcesProperties(r *configs.Resources, cm *dbusConnManager) ([]syst
newProp("CPUShares", r.CpuShares))
}

addCpuQuota(cm, &properties, r.CpuQuota, r.CpuPeriod)
addCpuQuota(false, &properties, r.CpuQuota, r.CpuPeriod)

if r.BlkioWeight != 0 {
properties = append(properties,
Expand All @@ -89,7 +87,7 @@ func genV1ResourcesProperties(r *configs.Resources, cm *dbusConnManager) ([]syst
newProp("TasksMax", uint64(r.PidsLimit)))
}

err = addCpuset(cm, &properties, r.CpusetCpus, r.CpusetMems)
err = addCpuset(false, &properties, r.CpusetCpus, r.CpusetMems)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,7 +165,7 @@ func (m *legacyManager) Apply(pid int) error {

properties = append(properties, c.SystemdProps...)

if err := startUnit(m.dbus, unitName, properties); err != nil {
if err := startUnit(false, unitName, properties); err != nil {
return err
}

Expand Down Expand Up @@ -205,7 +203,7 @@ func (m *legacyManager) Destroy() error {
m.mu.Lock()
defer m.mu.Unlock()

stopErr := stopUnit(m.dbus, getUnitName(m.cgroups))
stopErr := stopUnit(false, getUnitName(m.cgroups))

// Both on success and on error, cleanup all the cgroups we are aware of.
// Some of them were created directly by Apply() and are not managed by systemd.
Expand Down Expand Up @@ -333,7 +331,7 @@ func (m *legacyManager) Set(r *configs.Resources) error {
if r.Unified != nil {
return cgroups.ErrV1NoUnified
}
properties, err := genV1ResourcesProperties(r, m.dbus)
properties, err := genV1ResourcesProperties(r)
if err != nil {
return err
}
Expand Down Expand Up @@ -361,7 +359,7 @@ func (m *legacyManager) Set(r *configs.Resources) error {
}
}

if err := setUnitProperties(m.dbus, getUnitName(m.cgroups), properties...); err != nil {
if err := setUnitProperties(false, getUnitName(m.cgroups), properties...); err != nil {
_ = m.Freeze(targetFreezerState)
return err
}
Expand Down
Loading

0 comments on commit 86bd34f

Please sign in to comment.