From 3550969c7b66c71573ef77ca9eb59a7acdc443d6 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Wed, 5 Jan 2022 14:42:39 -0800 Subject: [PATCH 1/6] PoC for optional json encoding --- libbeat/opt/opt.go | 31 ++++++- .../internal/metrics/memory/marshalexp.go | 92 +++++++++++++++++++ metricbeat/internal/metrics/memory/memory.go | 41 ++++++++- .../internal/metrics/memory/memory_test.go | 89 ++++++++++++++++++ 4 files changed, 247 insertions(+), 6 deletions(-) create mode 100644 metricbeat/internal/metrics/memory/marshalexp.go diff --git a/libbeat/opt/opt.go b/libbeat/opt/opt.go index 37171e172724..0d332cfa60d0 100644 --- a/libbeat/opt/opt.go +++ b/libbeat/opt/opt.go @@ -17,7 +17,16 @@ package opt -import "github.com/elastic/go-structform" +import ( + "strconv" + + "github.com/elastic/go-structform" +) + +type OptType interface { + IsZero() bool + MarshalJSON() ([]byte, error) +} // Uint @@ -28,6 +37,16 @@ type Uint struct { value uint64 } +// MarshalJSON implements the marshal interface +func (v Uint) MarshalJSON() ([]byte, error) { + //fmt.Printf("In custom marshaller for Uint: %#v\n", v) + if v.exists { + return []byte(strconv.Itoa(int(v.value))), nil + } else { + return []byte(strconv.Itoa(int(0))), nil + } +} + // NewUintNone returns a new OptUint wrapper func NewUintNone() Uint { return Uint{ @@ -93,6 +112,16 @@ type Float struct { value float64 } +// MarshalJSON implements the marshal interface +func (v Float) MarshalJSON() ([]byte, error) { + //fmt.Printf("In custom marshaller for Float: %#v\n", v) + if v.exists { + return []byte(strconv.FormatFloat(v.value, 'f', 6, 64)), nil + } else { + return []byte(strconv.FormatFloat(v.value, 'f', 6, 64)), nil + } +} + // NewFloatNone returns a new uint wrapper func NewFloatNone() Float { return Float{ diff --git a/metricbeat/internal/metrics/memory/marshalexp.go b/metricbeat/internal/metrics/memory/marshalexp.go new file mode 100644 index 000000000000..f22ea5ba238d --- /dev/null +++ b/metricbeat/internal/metrics/memory/marshalexp.go @@ -0,0 +1,92 @@ +package memory + +import ( + "encoding/json" + "fmt" + "reflect" + + "github.com/elastic/beats/v7/libbeat/opt" +) + +type CustomFloat interface { + IsZero() bool +} + +type ZeroTest struct { + Zero bool +} + +func (z ZeroTest) MarshalJSON() ([]byte, error) { + return json.Marshal(z.Zero) +} + +func (z ZeroTest) IsZero() bool { + return z.Zero +} + +type UsedMemStatsTest struct { + Raw float64 `json:"raw,omitempty"` + Iface opt.OptType `json:"iface,omitempty"` +} + +// func (s UsedMemStatsTest) MarshalJSON() ([]byte, error) { + +// type sAlias UsedMemStatsTest + +// if s.Iface.IsZero() { +// s.Iface = nil +// } + +// return json.Marshal(sAlias(s)) +// } + +type MarshalWrapper struct { + Butterfly interface{} +} + +func (m MarshalWrapper) MarshalJSON() ([]byte, error) { + + bv := reflect.ValueOf(m.Butterfly).Elem() + for i := 0; i < bv.NumField(); i++ { + if bv.Field(i).CanInterface() { + fiface := bv.Field(i).Interface() + zeroIface, ok := fiface.(CustomFloat) + if ok { + if zeroIface.IsZero() { + zeroField := reflect.ValueOf(m.Butterfly).Elem().Field(i) + fmt.Printf("===%v\n", zeroField.Type()) + if zeroField.CanSet() { + zeroField.Set(reflect.Zero(zeroField.Type())) + } else { + fmt.Printf("Can't Set field %v\n", zeroField.Type()) + } + } + + } + } + } + return json.Marshal(m.Butterfly) +} + +func runJsonMarshal(input UsedMemStatsTest) (string, error) { + + // testStat := UsedMemStats{ + // Pct: opt.FloatWith(2.3), + // Bytes: opt.UintWith(100), + // } + //zero := ZeroTest{Zero: false} + // testStat := UsedMemStatsTest{ + + // Raw: 1.0, + // Iface: opt.FloatWith(2), + // } + wrapper := MarshalWrapper{ + Butterfly: &input, + } + + val, err := json.MarshalIndent(&wrapper, " ", " ") + if err != nil { + return "", err + } + return string(val), nil +} diff --git a/metricbeat/internal/metrics/memory/memory.go b/metricbeat/internal/metrics/memory/memory.go index 79efbdd825e3..7695ba79311c 100644 --- a/metricbeat/internal/metrics/memory/memory.go +++ b/metricbeat/internal/metrics/memory/memory.go @@ -48,8 +48,8 @@ type Memory struct { // UsedMemStats wraps used.* memory metrics type UsedMemStats struct { - Pct opt.Float `struct:"pct,omitempty"` - Bytes opt.Uint `struct:"bytes,omitempty"` + Pct opt.Float `struct:"pct,omitempty" json:"pct,omitempty"` + Bytes opt.Uint `struct:"bytes,omitempty" json:"bytes,omitempty"` } // ActualMemoryMetrics wraps the actual.* memory metrics @@ -60,11 +60,42 @@ type ActualMemoryMetrics struct { // SwapMetrics wraps swap.* memory metrics type SwapMetrics struct { - Total opt.Uint `struct:"total,omitempty"` - Used UsedMemStats `struct:"used,omitempty"` - Free opt.Uint `struct:"free,omitempty"` + Total opt.Uint `struct:"total,omitempty" json:"total,omitempty"` + Used UsedMemStats `struct:"used,omitempty" json:"used,omitempty"` + Free opt.Uint `struct:"free,omitempty" json:"free,omitempty"` } +// func (m UsedMemStats) MarshalJSON() ([]byte, error) { + +// outMap := map[string]interface{}{} +// bv := reflect.ValueOf(m) +// bt := reflect.TypeOf(m) +// for i := 0; i < bv.NumField(); i++ { +// if bv.Field(i).CanInterface() { +// fiface := bv.Field(i).Interface() +// name := bt.Field(i).Name + +// zeroIface, ok := fiface.(opt.OptType) +// if ok { +// if zeroIface.IsZero() { +// continue +// } +// //fmt.Printf("marshalling type %#v\n", zeroIface) +// rawOut, err := json.Marshal(zeroIface) +// if err != nil { +// return nil, errors.Wrap(err, "error marshalling type from UsedMemStats") +// } +// outMap[name] = string(rawOut) + +// } else { +// outMap[name] = fiface +// } +// } +// } +// //fmt.Printf("Pre-marshal map is %#v\n", outMap) +// return json.Marshal(outMap) +// } + // Get returns platform-independent memory metrics. func Get(procfs resolve.Resolver) (Memory, error) { base, err := get(procfs) diff --git a/metricbeat/internal/metrics/memory/memory_test.go b/metricbeat/internal/metrics/memory/memory_test.go index 4c9c979fdcef..535bbb6b98f5 100644 --- a/metricbeat/internal/metrics/memory/memory_test.go +++ b/metricbeat/internal/metrics/memory/memory_test.go @@ -22,6 +22,8 @@ package memory import ( + "bytes" + "encoding/json" "runtime" "testing" @@ -29,8 +31,95 @@ import ( "github.com/elastic/beats/v7/libbeat/metric/system/resolve" "github.com/elastic/beats/v7/libbeat/opt" + "github.com/elastic/go-structform/gotype" + gsjson "github.com/elastic/go-structform/json" ) +func TestMarshal(t *testing.T) { + testStat := UsedMemStatsTest{ + Raw: 5, + Iface: opt.NewFloatNone(), + } + + jsonData, err := runJsonMarshal(testStat) + assert.NoError(t, err) + t.Logf("%s", jsonData) +} + +func TestStdLibJSON(t *testing.T) { + testStat := SwapMetrics{ + Total: opt.UintWith(5), + Free: opt.NewUintNone(), + Used: UsedMemStats{ + Pct: opt.FloatWith(4.5), + Bytes: opt.UintWith(5), + }, + } + out, err := json.Marshal(testStat) + assert.NoError(t, err, "Marshal") + t.Logf("Out: %s", string(out)) +} + +func TestStructform(t *testing.T) { + outBuf := new(bytes.Buffer) + visitor := gsjson.NewVisitor(outBuf) + folder, err := gotype.NewIterator(visitor, + gotype.Folders(), + ) + assert.NoError(t, err, "NewIterator") + err = runStructformEncoder(folder) + assert.NoError(t, err, "runStructformEncoder") + t.Logf("output from structform: %s", string(outBuf.Bytes())) +} + +func BenchmarkStdLibJSON(b *testing.B) { + testStat := UsedMemStatsTest{ + Raw: 5, + Iface: opt.FloatWith(4.3), + } + wrapper := MarshalWrapper{ + Butterfly: &testStat, + } + for i := 0; i < b.N; i++ { + json.Marshal(&wrapper) + } +} + +func BenchmarkStructform(b *testing.B) { + testStat := SwapMetrics{ + Total: opt.UintWith(5), + Free: opt.NewUintNone(), + Used: UsedMemStats{ + Pct: opt.FloatWith(4.5), + Bytes: opt.UintWith(5), + }, + } + outBuf := new(bytes.Buffer) + visitor := gsjson.NewVisitor(outBuf) + folder, err := gotype.NewIterator(visitor, + gotype.Folders(), + ) + if err != nil { + b.Fatalf("err: %s", err) + } + err = runStructformEncoder(folder) + if err != nil { + b.Fatalf("err: %s", err) + } + + for i := 0; i < b.N; i++ { + folder.Fold(testStat) + } +} + +func runStructformEncoder(folder *gotype.Iterator) error { + testStat := UsedMemStatsTest{ + Raw: 5, + Iface: opt.FloatWith(4.3), + } + return folder.Fold(testStat) +} + func TestGetMemory(t *testing.T) { mem, err := Get(resolve.NewTestResolver("")) From 107cafc83dfffce5c10d687a9f4349cf8b512a8f Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Wed, 23 Feb 2022 14:22:29 -0800 Subject: [PATCH 2/6] Revert "PoC for optional json encoding" This reverts commit 3550969c7b66c71573ef77ca9eb59a7acdc443d6. --- libbeat/opt/opt.go | 31 +------ .../internal/metrics/memory/marshalexp.go | 92 ------------------- metricbeat/internal/metrics/memory/memory.go | 41 +-------- .../internal/metrics/memory/memory_test.go | 89 ------------------ 4 files changed, 6 insertions(+), 247 deletions(-) delete mode 100644 metricbeat/internal/metrics/memory/marshalexp.go diff --git a/libbeat/opt/opt.go b/libbeat/opt/opt.go index 9f1bab866df5..22dd5178c139 100644 --- a/libbeat/opt/opt.go +++ b/libbeat/opt/opt.go @@ -17,16 +17,7 @@ package opt -import ( - "strconv" - - "github.com/elastic/go-structform" -) - -type OptType interface { - IsZero() bool - MarshalJSON() ([]byte, error) -} +import "github.com/elastic/go-structform" // ZeroInterface is a type interface for cases where we need to cast from a void pointer type ZeroInterface interface { @@ -98,16 +89,6 @@ type Uint struct { value uint64 } -// MarshalJSON implements the marshal interface -func (v Uint) MarshalJSON() ([]byte, error) { - //fmt.Printf("In custom marshaller for Uint: %#v\n", v) - if v.exists { - return []byte(strconv.Itoa(int(v.value))), nil - } else { - return []byte(strconv.Itoa(int(0))), nil - } -} - // NewUintNone returns a new OptUint wrapper func NewUintNone() Uint { return Uint{ @@ -173,16 +154,6 @@ type Float struct { value float64 } -// MarshalJSON implements the marshal interface -func (v Float) MarshalJSON() ([]byte, error) { - //fmt.Printf("In custom marshaller for Float: %#v\n", v) - if v.exists { - return []byte(strconv.FormatFloat(v.value, 'f', 6, 64)), nil - } else { - return []byte(strconv.FormatFloat(v.value, 'f', 6, 64)), nil - } -} - // NewFloatNone returns a new uint wrapper func NewFloatNone() Float { return Float{ diff --git a/metricbeat/internal/metrics/memory/marshalexp.go b/metricbeat/internal/metrics/memory/marshalexp.go deleted file mode 100644 index f22ea5ba238d..000000000000 --- a/metricbeat/internal/metrics/memory/marshalexp.go +++ /dev/null @@ -1,92 +0,0 @@ -package memory - -import ( - "encoding/json" - "fmt" - "reflect" - - "github.com/elastic/beats/v7/libbeat/opt" -) - -type CustomFloat interface { - IsZero() bool -} - -type ZeroTest struct { - Zero bool -} - -func (z ZeroTest) MarshalJSON() ([]byte, error) { - return json.Marshal(z.Zero) -} - -func (z ZeroTest) IsZero() bool { - return z.Zero -} - -type UsedMemStatsTest struct { - Raw float64 `json:"raw,omitempty"` - Iface opt.OptType `json:"iface,omitempty"` -} - -// func (s UsedMemStatsTest) MarshalJSON() ([]byte, error) { - -// type sAlias UsedMemStatsTest - -// if s.Iface.IsZero() { -// s.Iface = nil -// } - -// return json.Marshal(sAlias(s)) -// } - -type MarshalWrapper struct { - Butterfly interface{} -} - -func (m MarshalWrapper) MarshalJSON() ([]byte, error) { - - bv := reflect.ValueOf(m.Butterfly).Elem() - for i := 0; i < bv.NumField(); i++ { - if bv.Field(i).CanInterface() { - fiface := bv.Field(i).Interface() - zeroIface, ok := fiface.(CustomFloat) - if ok { - if zeroIface.IsZero() { - zeroField := reflect.ValueOf(m.Butterfly).Elem().Field(i) - fmt.Printf("===%v\n", zeroField.Type()) - if zeroField.CanSet() { - zeroField.Set(reflect.Zero(zeroField.Type())) - } else { - fmt.Printf("Can't Set field %v\n", zeroField.Type()) - } - } - - } - } - } - return json.Marshal(m.Butterfly) -} - -func runJsonMarshal(input UsedMemStatsTest) (string, error) { - - // testStat := UsedMemStats{ - // Pct: opt.FloatWith(2.3), - // Bytes: opt.UintWith(100), - // } - //zero := ZeroTest{Zero: false} - // testStat := UsedMemStatsTest{ - - // Raw: 1.0, - // Iface: opt.FloatWith(2), - // } - wrapper := MarshalWrapper{ - Butterfly: &input, - } - - val, err := json.MarshalIndent(&wrapper, " ", " ") - if err != nil { - return "", err - } - return string(val), nil -} diff --git a/metricbeat/internal/metrics/memory/memory.go b/metricbeat/internal/metrics/memory/memory.go index 7695ba79311c..79efbdd825e3 100644 --- a/metricbeat/internal/metrics/memory/memory.go +++ b/metricbeat/internal/metrics/memory/memory.go @@ -48,8 +48,8 @@ type Memory struct { // UsedMemStats wraps used.* memory metrics type UsedMemStats struct { - Pct opt.Float `struct:"pct,omitempty" json:"pct,omitempty"` - Bytes opt.Uint `struct:"bytes,omitempty" json:"bytes,omitempty"` + Pct opt.Float `struct:"pct,omitempty"` + Bytes opt.Uint `struct:"bytes,omitempty"` } // ActualMemoryMetrics wraps the actual.* memory metrics @@ -60,42 +60,11 @@ type ActualMemoryMetrics struct { // SwapMetrics wraps swap.* memory metrics type SwapMetrics struct { - Total opt.Uint `struct:"total,omitempty" json:"total,omitempty"` - Used UsedMemStats `struct:"used,omitempty" json:"used,omitempty"` - Free opt.Uint `struct:"free,omitempty" json:"free,omitempty"` + Total opt.Uint `struct:"total,omitempty"` + Used UsedMemStats `struct:"used,omitempty"` + Free opt.Uint `struct:"free,omitempty"` } -// func (m UsedMemStats) MarshalJSON() ([]byte, error) { - -// outMap := map[string]interface{}{} -// bv := reflect.ValueOf(m) -// bt := reflect.TypeOf(m) -// for i := 0; i < bv.NumField(); i++ { -// if bv.Field(i).CanInterface() { -// fiface := bv.Field(i).Interface() -// name := bt.Field(i).Name - -// zeroIface, ok := fiface.(opt.OptType) -// if ok { -// if zeroIface.IsZero() { -// continue -// } -// //fmt.Printf("marshalling type %#v\n", zeroIface) -// rawOut, err := json.Marshal(zeroIface) -// if err != nil { -// return nil, errors.Wrap(err, "error marshalling type from UsedMemStats") -// } -// outMap[name] = string(rawOut) - -// } else { -// outMap[name] = fiface -// } -// } -// } -// //fmt.Printf("Pre-marshal map is %#v\n", outMap) -// return json.Marshal(outMap) -// } - // Get returns platform-independent memory metrics. func Get(procfs resolve.Resolver) (Memory, error) { base, err := get(procfs) diff --git a/metricbeat/internal/metrics/memory/memory_test.go b/metricbeat/internal/metrics/memory/memory_test.go index 535bbb6b98f5..4c9c979fdcef 100644 --- a/metricbeat/internal/metrics/memory/memory_test.go +++ b/metricbeat/internal/metrics/memory/memory_test.go @@ -22,8 +22,6 @@ package memory import ( - "bytes" - "encoding/json" "runtime" "testing" @@ -31,95 +29,8 @@ import ( "github.com/elastic/beats/v7/libbeat/metric/system/resolve" "github.com/elastic/beats/v7/libbeat/opt" - "github.com/elastic/go-structform/gotype" - gsjson "github.com/elastic/go-structform/json" ) -func TestMarshal(t *testing.T) { - testStat := UsedMemStatsTest{ - Raw: 5, - Iface: opt.NewFloatNone(), - } - - jsonData, err := runJsonMarshal(testStat) - assert.NoError(t, err) - t.Logf("%s", jsonData) -} - -func TestStdLibJSON(t *testing.T) { - testStat := SwapMetrics{ - Total: opt.UintWith(5), - Free: opt.NewUintNone(), - Used: UsedMemStats{ - Pct: opt.FloatWith(4.5), - Bytes: opt.UintWith(5), - }, - } - out, err := json.Marshal(testStat) - assert.NoError(t, err, "Marshal") - t.Logf("Out: %s", string(out)) -} - -func TestStructform(t *testing.T) { - outBuf := new(bytes.Buffer) - visitor := gsjson.NewVisitor(outBuf) - folder, err := gotype.NewIterator(visitor, - gotype.Folders(), - ) - assert.NoError(t, err, "NewIterator") - err = runStructformEncoder(folder) - assert.NoError(t, err, "runStructformEncoder") - t.Logf("output from structform: %s", string(outBuf.Bytes())) -} - -func BenchmarkStdLibJSON(b *testing.B) { - testStat := UsedMemStatsTest{ - Raw: 5, - Iface: opt.FloatWith(4.3), - } - wrapper := MarshalWrapper{ - Butterfly: &testStat, - } - for i := 0; i < b.N; i++ { - json.Marshal(&wrapper) - } -} - -func BenchmarkStructform(b *testing.B) { - testStat := SwapMetrics{ - Total: opt.UintWith(5), - Free: opt.NewUintNone(), - Used: UsedMemStats{ - Pct: opt.FloatWith(4.5), - Bytes: opt.UintWith(5), - }, - } - outBuf := new(bytes.Buffer) - visitor := gsjson.NewVisitor(outBuf) - folder, err := gotype.NewIterator(visitor, - gotype.Folders(), - ) - if err != nil { - b.Fatalf("err: %s", err) - } - err = runStructformEncoder(folder) - if err != nil { - b.Fatalf("err: %s", err) - } - - for i := 0; i < b.N; i++ { - folder.Fold(testStat) - } -} - -func runStructformEncoder(folder *gotype.Iterator) error { - testStat := UsedMemStatsTest{ - Raw: 5, - Iface: opt.FloatWith(4.3), - } - return folder.Fold(testStat) -} - func TestGetMemory(t *testing.T) { mem, err := Get(resolve.NewTestResolver("")) From 6b880cf64ed4f4b0049f0f43c04aef1143d11bbc Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Thu, 3 Mar 2022 16:11:48 -0800 Subject: [PATCH 3/6] try to fix rolled-over values in diskio, add rounding --- .../metric/system/diskio/diskstat_linux.go | 48 ++++++++++++++----- .../system/diskio/diskstat_linux_test.go | 17 +++++++ .../module/linux/iostat/_meta/data.json | 14 +++--- 3 files changed, 60 insertions(+), 19 deletions(-) diff --git a/libbeat/metric/system/diskio/diskstat_linux.go b/libbeat/metric/system/diskio/diskstat_linux.go index 252d8e5d72fd..0b72e197713f 100644 --- a/libbeat/metric/system/diskio/diskstat_linux.go +++ b/libbeat/metric/system/diskio/diskstat_linux.go @@ -24,6 +24,7 @@ import ( "github.com/pkg/errors" "github.com/shirou/gopsutil/v3/disk" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/metric/system/numcpu" ) @@ -52,6 +53,28 @@ func (stat *IOStat) OpenSampling() error { return stat.curCPU.Get() } +// a few of the diskio counters are actually 32-bit on the kernel side, which means they can roll over fairly easily. +// Here we try to reconstruct the values by calculating the pre-rollover delta from unt32 max, then adding. +// If you want to get technical, this could be a tad unsafe, as we don't actually have any way of knowing if the word size changes in a future kernel, and we've rolled over at UINT64_MAX + +// See https://docs.kernel.org/admin-guide/iostats.html and https://github.com/torvalds/linux/blob/master/block/genhd.c +func returnOrFix(current, prev uint64) uint64 { + var maxUint32 uint64 = 4_294_967_295 // Max value in uint32/unsigned int + + if current >= prev { + return current - prev + } + // we're at a uint64 if we hit this + if prev > maxUint32 { + return 0 + } + + delta := maxUint32 - prev + + return delta + current + +} + // CalcIOStatistics calculates IO statistics. func (stat *IOStat) CalcIOStatistics(counter disk.IOCountersStat) (IOMetric, error) { var last disk.IOCountersStat @@ -72,13 +95,14 @@ func (stat *IOStat) CalcIOStatistics(counter disk.IOCountersStat) (IOMetric, err rdIOs := counter.ReadCount - last.ReadCount rdMerges := counter.MergedReadCount - last.MergedReadCount rdBytes := counter.ReadBytes - last.ReadBytes - rdTicks := counter.ReadTime - last.ReadTime + rdTicks := returnOrFix(counter.ReadTime, last.ReadTime) wrIOs := counter.WriteCount - last.WriteCount wrMerges := counter.MergedWriteCount - last.MergedWriteCount wrBytes := counter.WriteBytes - last.WriteBytes - wrTicks := counter.WriteTime - last.WriteTime - ticks := counter.IoTime - last.IoTime - aveq := counter.WeightedIO - last.WeightedIO + wrTicks := returnOrFix(counter.WriteTime, last.WriteTime) + ticks := returnOrFix(counter.IoTime, last.IoTime) + aveq := returnOrFix(counter.WeightedIO, last.WeightedIO) + nIOs := rdIOs + wrIOs nTicks := rdTicks + wrTicks nBytes := rdBytes + wrBytes @@ -94,7 +118,7 @@ func (stat *IOStat) CalcIOStatistics(counter disk.IOCountersStat) (IOMetric, err queue := float64(aveq) / deltams perSec := func(x uint64) float64 { - return 1000.0 * float64(x) / deltams + return common.Round(1000.0*float64(x)/deltams, common.DefaultDecimalPlacesCount) } result := IOMetric{} @@ -104,17 +128,17 @@ func (stat *IOStat) CalcIOStatistics(counter disk.IOCountersStat) (IOMetric, err result.WriteRequestCountPerSec = perSec(wrIOs) result.ReadBytesPerSec = perSec(rdBytes) result.WriteBytesPerSec = perSec(wrBytes) - result.AvgRequestSize = size - result.AvgQueueSize = queue - result.AvgAwaitTime = wait + result.AvgRequestSize = common.Round(size, common.DefaultDecimalPlacesCount) + result.AvgQueueSize = common.Round(queue, common.DefaultDecimalPlacesCount) + result.AvgAwaitTime = common.Round(wait, common.DefaultDecimalPlacesCount) if rdIOs > 0 { - result.AvgReadAwaitTime = float64(rdTicks) / float64(rdIOs) + result.AvgReadAwaitTime = common.Round(float64(rdTicks)/float64(rdIOs), common.DefaultDecimalPlacesCount) } if wrIOs > 0 { - result.AvgWriteAwaitTime = float64(wrTicks) / float64(wrIOs) + result.AvgWriteAwaitTime = common.Round(float64(wrTicks)/float64(wrIOs), common.DefaultDecimalPlacesCount) } - result.AvgServiceTime = svct - result.BusyPct = 100.0 * float64(ticks) / deltams + result.AvgServiceTime = common.Round(svct, common.DefaultDecimalPlacesCount) + result.BusyPct = common.Round(100.0*float64(ticks)/deltams, common.DefaultDecimalPlacesCount) if result.BusyPct > 100.0 { result.BusyPct = 100.0 } diff --git a/libbeat/metric/system/diskio/diskstat_linux_test.go b/libbeat/metric/system/diskio/diskstat_linux_test.go index 4e78c7d85de5..6b65632590ab 100644 --- a/libbeat/metric/system/diskio/diskstat_linux_test.go +++ b/libbeat/metric/system/diskio/diskstat_linux_test.go @@ -34,6 +34,23 @@ func Test_GetCLKTCK(t *testing.T) { assert.Equal(t, uint32(100), GetCLKTCK()) } +func Test32BitRollover(t *testing.T) { + var maxUint32 uint64 = 4_294_967_295 + + var prev = maxUint32 - 100_000 + + // A rolled-over value + var current32 uint64 = 1000 + // Theoretical un-rolled over value + var current64 = (maxUint32 + current32) + + var correct = current64 - prev + assert.Equal(t, returnOrFix(current32, prev), returnOrFix(current64, prev)) + assert.Equal(t, correct, returnOrFix(current32, prev)) + + assert.Equal(t, uint64(0), returnOrFix(current32, current32)) +} + func TestDiskIOStat_CalIOStatistics(t *testing.T) { counter := disk.IOCountersStat{ ReadCount: 13, diff --git a/metricbeat/module/linux/iostat/_meta/data.json b/metricbeat/module/linux/iostat/_meta/data.json index 99f23d8cc959..b6c45c080b72 100644 --- a/metricbeat/module/linux/iostat/_meta/data.json +++ b/metricbeat/module/linux/iostat/_meta/data.json @@ -8,10 +8,10 @@ "linux": { "iostat": { "await": 0, - "busy": 0, - "name": "sr0", + "busy": 0.1503, + "name": "sda", "queue": { - "avg_size": 0 + "avg_size": 0.0005 }, "read": { "await": 0, @@ -24,17 +24,17 @@ } }, "request": { - "avg_size": 0 + "avg_size": 2867.2 }, - "service_time": 0, + "service_time": 0.3, "write": { "await": 0, "per_sec": { - "bytes": 0 + "bytes": 14365.929 }, "request": { "merges_per_sec": 0, - "per_sec": 0 + "per_sec": 5.0104 } } } From b59b43e9efa7bd39853b4fecc1621cb1f9f72f77 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Fri, 4 Mar 2022 10:39:40 -0800 Subject: [PATCH 4/6] use math package, add docs --- libbeat/metric/system/diskio/diskstat_linux.go | 4 +++- libbeat/metric/system/diskio/diskstat_linux_test.go | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/libbeat/metric/system/diskio/diskstat_linux.go b/libbeat/metric/system/diskio/diskstat_linux.go index 0b72e197713f..7238032b6de4 100644 --- a/libbeat/metric/system/diskio/diskstat_linux.go +++ b/libbeat/metric/system/diskio/diskstat_linux.go @@ -21,6 +21,8 @@ package diskio import ( + "math" + "github.com/pkg/errors" "github.com/shirou/gopsutil/v3/disk" @@ -59,7 +61,7 @@ func (stat *IOStat) OpenSampling() error { // See https://docs.kernel.org/admin-guide/iostats.html and https://github.com/torvalds/linux/blob/master/block/genhd.c func returnOrFix(current, prev uint64) uint64 { - var maxUint32 uint64 = 4_294_967_295 // Max value in uint32/unsigned int + var maxUint32 uint64 = math.MaxUint32 //4_294_967_295 Max value in uint32/unsigned int if current >= prev { return current - prev diff --git a/libbeat/metric/system/diskio/diskstat_linux_test.go b/libbeat/metric/system/diskio/diskstat_linux_test.go index 6b65632590ab..28a46dc2fac8 100644 --- a/libbeat/metric/system/diskio/diskstat_linux_test.go +++ b/libbeat/metric/system/diskio/diskstat_linux_test.go @@ -21,11 +21,13 @@ package diskio import ( + "math" "testing" "github.com/shirou/gopsutil/v3/disk" "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/libbeat/common" sigar "github.com/elastic/gosigar" ) @@ -35,7 +37,7 @@ func Test_GetCLKTCK(t *testing.T) { } func Test32BitRollover(t *testing.T) { - var maxUint32 uint64 = 4_294_967_295 + var maxUint32 uint64 = math.MaxUint32 // 4_294_967_295 var prev = maxUint32 - 100_000 @@ -75,7 +77,7 @@ func TestDiskIOStat_CalIOStatistics(t *testing.T) { } expected := IOMetric{ - AvgAwaitTime: 24.0 / 22.0, + AvgAwaitTime: common.Round(24.0/22.0, common.DefaultDecimalPlacesCount), AvgReadAwaitTime: 1.2, AvgWriteAwaitTime: 1, } From 30f0ed25fd61dfbdf1ebf4b105dcf5e177b28160 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Fri, 4 Mar 2022 11:39:33 -0800 Subject: [PATCH 5/6] name change --- libbeat/metric/system/diskio/diskstat_linux.go | 12 ++++++------ libbeat/metric/system/diskio/diskstat_linux_test.go | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/libbeat/metric/system/diskio/diskstat_linux.go b/libbeat/metric/system/diskio/diskstat_linux.go index 7238032b6de4..78425ee0900c 100644 --- a/libbeat/metric/system/diskio/diskstat_linux.go +++ b/libbeat/metric/system/diskio/diskstat_linux.go @@ -59,8 +59,8 @@ func (stat *IOStat) OpenSampling() error { // Here we try to reconstruct the values by calculating the pre-rollover delta from unt32 max, then adding. // If you want to get technical, this could be a tad unsafe, as we don't actually have any way of knowing if the word size changes in a future kernel, and we've rolled over at UINT64_MAX -// See https://docs.kernel.org/admin-guide/iostats.html and https://github.com/torvalds/linux/blob/master/block/genhd.c -func returnOrFix(current, prev uint64) uint64 { +// See https://docs.kernel.org/admin-guide/iostats.html and https://github.com/torvalds/linux/blob/master/block/genhd.c diskstats_show() +func returnOrFixRollover(current, prev uint64) uint64 { var maxUint32 uint64 = math.MaxUint32 //4_294_967_295 Max value in uint32/unsigned int if current >= prev { @@ -97,13 +97,13 @@ func (stat *IOStat) CalcIOStatistics(counter disk.IOCountersStat) (IOMetric, err rdIOs := counter.ReadCount - last.ReadCount rdMerges := counter.MergedReadCount - last.MergedReadCount rdBytes := counter.ReadBytes - last.ReadBytes - rdTicks := returnOrFix(counter.ReadTime, last.ReadTime) + rdTicks := returnOrFixRollover(counter.ReadTime, last.ReadTime) wrIOs := counter.WriteCount - last.WriteCount wrMerges := counter.MergedWriteCount - last.MergedWriteCount wrBytes := counter.WriteBytes - last.WriteBytes - wrTicks := returnOrFix(counter.WriteTime, last.WriteTime) - ticks := returnOrFix(counter.IoTime, last.IoTime) - aveq := returnOrFix(counter.WeightedIO, last.WeightedIO) + wrTicks := returnOrFixRollover(counter.WriteTime, last.WriteTime) + ticks := returnOrFixRollover(counter.IoTime, last.IoTime) + aveq := returnOrFixRollover(counter.WeightedIO, last.WeightedIO) nIOs := rdIOs + wrIOs nTicks := rdTicks + wrTicks diff --git a/libbeat/metric/system/diskio/diskstat_linux_test.go b/libbeat/metric/system/diskio/diskstat_linux_test.go index 28a46dc2fac8..7e0f2cfeb2f7 100644 --- a/libbeat/metric/system/diskio/diskstat_linux_test.go +++ b/libbeat/metric/system/diskio/diskstat_linux_test.go @@ -47,10 +47,10 @@ func Test32BitRollover(t *testing.T) { var current64 = (maxUint32 + current32) var correct = current64 - prev - assert.Equal(t, returnOrFix(current32, prev), returnOrFix(current64, prev)) - assert.Equal(t, correct, returnOrFix(current32, prev)) + assert.Equal(t, returnOrFixRollover(current32, prev), returnOrFixRollover(current64, prev)) + assert.Equal(t, correct, returnOrFixRollover(current32, prev)) - assert.Equal(t, uint64(0), returnOrFix(current32, current32)) + assert.Equal(t, uint64(0), returnOrFixRollover(current32, current32)) } func TestDiskIOStat_CalIOStatistics(t *testing.T) { From 217381a3374817d8f16b9021557ec80fc827e8b7 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Fri, 4 Mar 2022 14:46:22 -0800 Subject: [PATCH 6/6] change name, add changelog --- CHANGELOG.next.asciidoc | 1 + libbeat/metric/system/diskio/diskstat_linux.go | 10 +++++----- libbeat/metric/system/diskio/diskstat_linux_test.go | 6 +++--- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 840a7903306a..6539cecf7522 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -139,6 +139,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif - Remove strict parsing on RabbitMQ module {pull}30090[30090] - Add `kubernetes.container.status.last.reason` metric {pull}30306[30306] - Extend documentation about `orchestrator.cluster` fields {pull}30518[30518] +- Fix overflow in `iostat` metrics {pull}30679[30679] *Packetbeat* diff --git a/libbeat/metric/system/diskio/diskstat_linux.go b/libbeat/metric/system/diskio/diskstat_linux.go index 78425ee0900c..83245090d554 100644 --- a/libbeat/metric/system/diskio/diskstat_linux.go +++ b/libbeat/metric/system/diskio/diskstat_linux.go @@ -60,7 +60,7 @@ func (stat *IOStat) OpenSampling() error { // If you want to get technical, this could be a tad unsafe, as we don't actually have any way of knowing if the word size changes in a future kernel, and we've rolled over at UINT64_MAX // See https://docs.kernel.org/admin-guide/iostats.html and https://github.com/torvalds/linux/blob/master/block/genhd.c diskstats_show() -func returnOrFixRollover(current, prev uint64) uint64 { +func returnOrFix32BitRollover(current, prev uint64) uint64 { var maxUint32 uint64 = math.MaxUint32 //4_294_967_295 Max value in uint32/unsigned int if current >= prev { @@ -97,13 +97,13 @@ func (stat *IOStat) CalcIOStatistics(counter disk.IOCountersStat) (IOMetric, err rdIOs := counter.ReadCount - last.ReadCount rdMerges := counter.MergedReadCount - last.MergedReadCount rdBytes := counter.ReadBytes - last.ReadBytes - rdTicks := returnOrFixRollover(counter.ReadTime, last.ReadTime) + rdTicks := returnOrFix32BitRollover(counter.ReadTime, last.ReadTime) wrIOs := counter.WriteCount - last.WriteCount wrMerges := counter.MergedWriteCount - last.MergedWriteCount wrBytes := counter.WriteBytes - last.WriteBytes - wrTicks := returnOrFixRollover(counter.WriteTime, last.WriteTime) - ticks := returnOrFixRollover(counter.IoTime, last.IoTime) - aveq := returnOrFixRollover(counter.WeightedIO, last.WeightedIO) + wrTicks := returnOrFix32BitRollover(counter.WriteTime, last.WriteTime) + ticks := returnOrFix32BitRollover(counter.IoTime, last.IoTime) + aveq := returnOrFix32BitRollover(counter.WeightedIO, last.WeightedIO) nIOs := rdIOs + wrIOs nTicks := rdTicks + wrTicks diff --git a/libbeat/metric/system/diskio/diskstat_linux_test.go b/libbeat/metric/system/diskio/diskstat_linux_test.go index 7e0f2cfeb2f7..5061d310533d 100644 --- a/libbeat/metric/system/diskio/diskstat_linux_test.go +++ b/libbeat/metric/system/diskio/diskstat_linux_test.go @@ -47,10 +47,10 @@ func Test32BitRollover(t *testing.T) { var current64 = (maxUint32 + current32) var correct = current64 - prev - assert.Equal(t, returnOrFixRollover(current32, prev), returnOrFixRollover(current64, prev)) - assert.Equal(t, correct, returnOrFixRollover(current32, prev)) + assert.Equal(t, returnOrFix32BitRollover(current32, prev), returnOrFix32BitRollover(current64, prev)) + assert.Equal(t, correct, returnOrFix32BitRollover(current32, prev)) - assert.Equal(t, uint64(0), returnOrFixRollover(current32, current32)) + assert.Equal(t, uint64(0), returnOrFix32BitRollover(current32, current32)) } func TestDiskIOStat_CalIOStatistics(t *testing.T) {