diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f9ef81bfc2f..c20e6bccb22 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -134,6 +134,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - system/socket: Fixed tracking of long-running connections. {pull}19033[19033] - auditd: Fix an error condition causing a lot of `audit_send_reply` kernel threads being created. {pull}22673[22673] - system/socket: Fixed start failure when run under config reloader. {issue}20851[20851] {pull}21693[21693] +- system/socket: Having some CPUs unavailable to Auditbeat could cause startup errors or event loss. {pull}22827[22827] *Filebeat* diff --git a/x-pack/auditbeat/tracing/cpu.go b/x-pack/auditbeat/tracing/cpu.go new file mode 100644 index 00000000000..260067a8ab3 --- /dev/null +++ b/x-pack/auditbeat/tracing/cpu.go @@ -0,0 +1,139 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build linux + +package tracing + +import ( + "bytes" + "io/ioutil" + "strconv" + "strings" + + "github.com/pkg/errors" +) + +const ( + // OnlineCPUsPath is the path to the system file listing the online CPUs. + OnlineCPUsPath = "/sys/devices/system/cpu/online" + + // OfflineCPUsPath is the path to the system file listing the offline CPUs. + OfflineCPUsPath = "/sys/devices/system/cpu/offline" + + // PossibleCPUsPath is the path to the system file listing the CPUs that can be brought online. + PossibleCPUsPath = "/sys/devices/system/cpu/possible" + + // PresentCPUsPath is the path to the system file listing the CPUs that are identified as present. + PresentCPUsPath = "/sys/devices/system/cpu/present" + + // See `Documentation/admin-guide/cputopology.rst` in the Linux kernel docs for more information + // on the above files. + + // IsolatedCPUsPath is only present when CPU isolation is active, for example using the `isolcpus` + // kernel argument. + IsolatedCPUsPath = "/sys/devices/system/cpu/isolated" +) + +// CPUSet represents a group of CPUs. +type CPUSet struct { + mask []bool + count int +} + +// Mask returns a bitmask where each bit is set if the given CPU is present in the set. +func (s CPUSet) Mask() []bool { + return s.mask +} + +// NumCPU returns the number of CPUs in the set. +func (s CPUSet) NumCPU() int { + return s.count +} + +// Contains allows to query if a given CPU exists in the set. +func (s CPUSet) Contains(cpu int) bool { + if cpu < 0 || cpu >= len(s.mask) { + return false + } + return s.mask[cpu] +} + +// AsList returns the list of CPUs in the set. +func (s CPUSet) AsList() []int { + list := make([]int, 0, s.count) + for num, bit := range s.mask { + if bit { + list = append(list, num) + } + } + return list +} + +// NewCPUSetFromFile creates a new CPUSet from the contents of a file. +func NewCPUSetFromFile(path string) (cpus CPUSet, err error) { + contents, err := ioutil.ReadFile(path) + if err != nil { + return cpus, err + } + return NewCPUSetFromExpression(string(bytes.TrimRight(contents, "\n\r"))) +} + +// NewCPUSetFromExpression creates a new CPUSet from a range expression. +// Expression: RANGE ( ',' RANGE )* +// Where: +// RANGE := | - +func NewCPUSetFromExpression(contents string) (CPUSet, error) { + var ranges [][]int + var max, count int + for _, expr := range strings.Split(contents, ",") { + if len(expr) == 0 { + continue + } + parts := strings.Split(expr, "-") + r := make([]int, 0, len(parts)) + for _, numStr := range parts { + num16, err := strconv.ParseInt(numStr, 10, 16) + if err != nil || num16 < 0 { + return CPUSet{}, errors.Errorf("failed to parse integer '%s' from range '%s' at '%s'", numStr, expr, contents) + } + num := int(num16) + r = append(r, num) + if num+1 > max { + max = num + 1 + } + } + ranges = append(ranges, r) + } + if max == 0 { + return CPUSet{}, nil + } + mask := make([]bool, max) + for _, r := range ranges { + from, to := -1, -1 + switch len(r) { + case 0: + continue // Ignore empty range. + case 1: + from = r[0] + to = r[0] + case 2: + from = r[0] + to = r[1] + } + if from == -1 || to < from { + return CPUSet{}, errors.Errorf("invalid cpu range %v in '%s'", r, contents) + } + for i := from; i <= to; i++ { + if !mask[i] { + count++ + mask[i] = true + } + } + } + return CPUSet{ + mask: mask, + count: count, + }, nil +} diff --git a/x-pack/auditbeat/tracing/cpu_test.go b/x-pack/auditbeat/tracing/cpu_test.go new file mode 100644 index 00000000000..9b5b7019967 --- /dev/null +++ b/x-pack/auditbeat/tracing/cpu_test.go @@ -0,0 +1,152 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build linux + +package tracing + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewCPUSetFromExpression(t *testing.T) { + for _, testCase := range []struct { + content string + result CPUSet + fail bool + }{ + { + content: "0", + result: CPUSet{ + mask: []bool{true}, + count: 1, + }, + }, + { + content: "0-3", + result: CPUSet{ + mask: []bool{true, true, true, true}, + count: 4, + }, + }, + { + content: "5-0", + fail: true, + }, + { + content: "5-2147483648", + fail: true, + }, + { + content: "0,2-2", + result: CPUSet{ + mask: []bool{true, false, true}, + count: 2, + }, + }, + { + content: "7", + result: CPUSet{ + mask: []bool{false, false, false, false, false, false, false, true}, + count: 1, + }, + }, + { + content: "-1", + fail: true, + }, + { + content: "", + }, + { + content: ",", + }, + { + content: "-", + fail: true, + }, + { + content: "3,-", + fail: true, + }, + { + content: "3-4-5", + fail: true, + }, + { + content: "0-4,5,6-6,,,,15", + result: CPUSet{ + mask: []bool{ + true, true, true, true, true, true, true, false, + false, false, false, false, false, false, false, true, + }, + count: 8, + }, + }, + } { + mask, err := NewCPUSetFromExpression(testCase.content) + if !assert.Equal(t, testCase.fail, err != nil, testCase.content) { + t.Fatal(err) + } + assert.Equal(t, testCase.result, mask, testCase.content) + } +} + +func TestCPUSet(t *testing.T) { + for _, test := range []struct { + expr string + num int + isSet func(int) bool + list []int + }{ + { + expr: "0-2,5", + num: 4, + isSet: func(i int) bool { return i == 5 || (i >= 0 && i < 3) }, + list: []int{0, 1, 2, 5}, + }, + { + expr: "0", + num: 1, + isSet: func(i int) bool { return i == 0 }, + list: []int{0}, + }, + { + expr: "2", + num: 1, + isSet: func(i int) bool { return i == 2 }, + list: []int{2}, + }, + { + expr: "0-7", + num: 8, + isSet: func(i int) bool { return i >= 0 && i < 8 }, + list: []int{0, 1, 2, 3, 4, 5, 6, 7}, + }, + { + expr: "", + num: 0, + isSet: func(i int) bool { return false }, + list: []int{}, + }, + { + expr: "1-2,0,2,0-0,0-1", + num: 3, + isSet: func(i int) bool { return i >= 0 && i < 3 }, + list: []int{0, 1, 2}, + }, + } { + set, err := NewCPUSetFromExpression(test.expr) + if !assert.NoError(t, err, test.expr) { + t.Fatal(err) + } + assert.Equal(t, test.num, set.NumCPU(), test.expr) + for i := -1; i < 10; i++ { + assert.Equal(t, test.isSet(i), set.Contains(i), test.expr) + } + assert.Equal(t, test.list, set.AsList(), test.expr) + } +} diff --git a/x-pack/auditbeat/tracing/perfevent.go b/x-pack/auditbeat/tracing/perfevent.go index 83f40260286..4189abda59a 100644 --- a/x-pack/auditbeat/tracing/perfevent.go +++ b/x-pack/auditbeat/tracing/perfevent.go @@ -10,7 +10,6 @@ import ( "context" "fmt" "os" - "runtime" "sync" "sync/atomic" "time" @@ -55,7 +54,7 @@ type PerfChannel struct { running uintptr wg sync.WaitGroup - numCPUs int + cpus CPUSet // Settings attr perf.Attr @@ -98,7 +97,6 @@ func NewPerfChannel(cfg ...PerfChannelConf) (channel *PerfChannel, err error) { done: make(chan struct{}, 0), streams: make(map[uint64]stream), pid: perf.AllThreads, - numCPUs: runtime.NumCPU(), attr: perf.Attr{ Type: perf.TracepointEvent, SampleFormat: perf.SampleFormat{ @@ -112,6 +110,19 @@ func NewPerfChannel(cfg ...PerfChannelConf) (channel *PerfChannel, err error) { channel.attr.SetSamplePeriod(1) channel.attr.SetWakeupEvents(1) + // Load the list of online CPUs from /sys/devices/system/cpu/online. + // This is necessary in order to to install each kprobe on all online CPUs. + // + // Note: + // There's currently no mechanism to adapt to CPUs being added or removed + // at runtime (CPU hotplug). + channel.cpus, err = NewCPUSetFromFile(OnlineCPUsPath) + if err != nil { + return nil, errors.Wrap(err, "error listing online CPUs") + } + if channel.cpus.NumCPU() < 1 { + return nil, errors.New("couldn't list online CPUs") + } // Set configuration for _, fun := range cfg { if err = fun(channel); err != nil { @@ -210,14 +221,15 @@ func WithPollTimeout(timeout time.Duration) PerfChannelConf { func (c *PerfChannel) MonitorProbe(format ProbeFormat, decoder Decoder) error { c.attr.Config = uint64(format.ID) doGroup := len(c.events) > 0 - for idx := 0; idx < c.numCPUs; idx++ { + cpuList := c.cpus.AsList() + for idx, cpu := range cpuList { var group *perf.Event var flags int if doGroup { group = c.events[idx] flags = unix.PERF_FLAG_FD_NO_GROUP | unix.PERF_FLAG_FD_OUTPUT } - ev, err := perf.OpenWithFlags(&c.attr, c.pid, idx, group, flags) + ev, err := perf.OpenWithFlags(&c.attr, c.pid, cpu, group, flags) if err != nil { return err } @@ -352,7 +364,7 @@ func makeMetadata(eventID int, record *perf.SampleRecord) Metadata { func (c *PerfChannel) channelLoop() { defer c.wg.Done() ctx := doneWrapperContext(c.done) - merger := newRecordMerger(c.events[:c.numCPUs], c, c.pollTimeout) + merger := newRecordMerger(c.events[:c.cpus.NumCPU()], c, c.pollTimeout) for { // Read the available event from all the monitored ring-buffers that // has the smallest timestamp.