From ba92b25953bcc5b2fc447ce411187180c8fa11fe Mon Sep 17 00:00:00 2001 From: Povilas Versockas Date: Sun, 10 May 2020 12:08:20 +0300 Subject: [PATCH 1/3] Add internal/trace from golang --- Makefile | 1 + internal/trace/README.md | 7 + internal/trace/gc.go | 825 ++++++++++++ internal/trace/gc_test.go | 202 +++ internal/trace/goroutines.go | 338 +++++ internal/trace/mkcanned.bash | 20 + internal/trace/mud.go | 223 ++++ internal/trace/mud_test.go | 87 ++ internal/trace/order.go | 279 ++++ internal/trace/parser.go | 1120 +++++++++++++++++ internal/trace/parser_test.go | 111 ++ internal/trace/testdata/http_1_10_good | Bin 0 -> 2201 bytes internal/trace/testdata/http_1_11_good | Bin 0 -> 2779 bytes internal/trace/testdata/http_1_5_good | Bin 0 -> 42218 bytes internal/trace/testdata/http_1_7_good | Bin 0 -> 1971 bytes internal/trace/testdata/http_1_9_good | Bin 0 -> 2187 bytes internal/trace/testdata/stress_1_10_good | Bin 0 -> 370999 bytes internal/trace/testdata/stress_1_11_good | Bin 0 -> 370129 bytes internal/trace/testdata/stress_1_5_good | Bin 0 -> 7446 bytes internal/trace/testdata/stress_1_5_unordered | Bin 0 -> 8194 bytes internal/trace/testdata/stress_1_7_good | Bin 0 -> 396526 bytes internal/trace/testdata/stress_1_9_good | Bin 0 -> 365129 bytes .../testdata/stress_start_stop_1_10_good | Bin 0 -> 6338 bytes .../testdata/stress_start_stop_1_11_good | Bin 0 -> 4882 bytes .../trace/testdata/stress_start_stop_1_5_good | Bin 0 -> 6997 bytes .../trace/testdata/stress_start_stop_1_7_good | Bin 0 -> 2055 bytes .../trace/testdata/stress_start_stop_1_9_good | Bin 0 -> 6271 bytes .../trace/testdata/user_task_span_1_11_good | Bin 0 -> 2000 bytes internal/trace/writer.go | 49 + 29 files changed, 3262 insertions(+) create mode 100644 internal/trace/README.md create mode 100644 internal/trace/gc.go create mode 100644 internal/trace/gc_test.go create mode 100644 internal/trace/goroutines.go create mode 100755 internal/trace/mkcanned.bash create mode 100644 internal/trace/mud.go create mode 100644 internal/trace/mud_test.go create mode 100644 internal/trace/order.go create mode 100644 internal/trace/parser.go create mode 100644 internal/trace/parser_test.go create mode 100644 internal/trace/testdata/http_1_10_good create mode 100644 internal/trace/testdata/http_1_11_good create mode 100644 internal/trace/testdata/http_1_5_good create mode 100644 internal/trace/testdata/http_1_7_good create mode 100644 internal/trace/testdata/http_1_9_good create mode 100644 internal/trace/testdata/stress_1_10_good create mode 100644 internal/trace/testdata/stress_1_11_good create mode 100644 internal/trace/testdata/stress_1_5_good create mode 100644 internal/trace/testdata/stress_1_5_unordered create mode 100644 internal/trace/testdata/stress_1_7_good create mode 100644 internal/trace/testdata/stress_1_9_good create mode 100644 internal/trace/testdata/stress_start_stop_1_10_good create mode 100644 internal/trace/testdata/stress_start_stop_1_11_good create mode 100644 internal/trace/testdata/stress_start_stop_1_5_good create mode 100644 internal/trace/testdata/stress_start_stop_1_7_good create mode 100644 internal/trace/testdata/stress_start_stop_1_9_good create mode 100644 internal/trace/testdata/user_task_span_1_11_good create mode 100644 internal/trace/writer.go diff --git a/Makefile b/Makefile index 4461addaa9f..cff230d3658 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,7 @@ # Needs to be defined before including Makefile.common to auto-generate targets DOCKER_ARCHS ?= amd64 armv7 arm64 +GOLANGCI_LINT_OPTS = --skip-dirs internal include Makefile.common diff --git a/internal/trace/README.md b/internal/trace/README.md new file mode 100644 index 00000000000..2c79cff1166 --- /dev/null +++ b/internal/trace/README.md @@ -0,0 +1,7 @@ +# README + +This is copied from Go's `internal/trace` package, using `go version go1.14.2 linux/amd64` https://golang.org/pkg/internal/trace/ + +We can remove this if Go exposes the package, issue: https://github.com/golang/go/issues/38971 + +writer.go didn't have a LICENSE header so I've added it manually. diff --git a/internal/trace/gc.go b/internal/trace/gc.go new file mode 100644 index 00000000000..cc19fdf8912 --- /dev/null +++ b/internal/trace/gc.go @@ -0,0 +1,825 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package trace + +import ( + "container/heap" + "math" + "sort" + "strings" + "time" +) + +// MutatorUtil is a change in mutator utilization at a particular +// time. Mutator utilization functions are represented as a +// time-ordered []MutatorUtil. +type MutatorUtil struct { + Time int64 + // Util is the mean mutator utilization starting at Time. This + // is in the range [0, 1]. + Util float64 +} + +// UtilFlags controls the behavior of MutatorUtilization. +type UtilFlags int + +const ( + // UtilSTW means utilization should account for STW events. + UtilSTW UtilFlags = 1 << iota + // UtilBackground means utilization should account for + // background mark workers. + UtilBackground + // UtilAssist means utilization should account for mark + // assists. + UtilAssist + // UtilSweep means utilization should account for sweeping. + UtilSweep + + // UtilPerProc means each P should be given a separate + // utilization function. Otherwise, there is a single function + // and each P is given a fraction of the utilization. + UtilPerProc +) + +// MutatorUtilization returns a set of mutator utilization functions +// for the given trace. Each function will always end with 0 +// utilization. The bounds of each function are implicit in the first +// and last event; outside of these bounds each function is undefined. +// +// If the UtilPerProc flag is not given, this always returns a single +// utilization function. Otherwise, it returns one function per P. +func MutatorUtilization(events []*Event, flags UtilFlags) [][]MutatorUtil { + if len(events) == 0 { + return nil + } + + type perP struct { + // gc > 0 indicates that GC is active on this P. + gc int + // series the logical series number for this P. This + // is necessary because Ps may be removed and then + // re-added, and then the new P needs a new series. + series int + } + ps := []perP{} + stw := 0 + + out := [][]MutatorUtil{} + assists := map[uint64]bool{} + block := map[uint64]*Event{} + bgMark := map[uint64]bool{} + + for _, ev := range events { + switch ev.Type { + case EvGomaxprocs: + gomaxprocs := int(ev.Args[0]) + if len(ps) > gomaxprocs { + if flags&UtilPerProc != 0 { + // End each P's series. + for _, p := range ps[gomaxprocs:] { + out[p.series] = addUtil(out[p.series], MutatorUtil{ev.Ts, 0}) + } + } + ps = ps[:gomaxprocs] + } + for len(ps) < gomaxprocs { + // Start new P's series. + series := 0 + if flags&UtilPerProc != 0 || len(out) == 0 { + series = len(out) + out = append(out, []MutatorUtil{{ev.Ts, 1}}) + } + ps = append(ps, perP{series: series}) + } + case EvGCSTWStart: + if flags&UtilSTW != 0 { + stw++ + } + case EvGCSTWDone: + if flags&UtilSTW != 0 { + stw-- + } + case EvGCMarkAssistStart: + if flags&UtilAssist != 0 { + ps[ev.P].gc++ + assists[ev.G] = true + } + case EvGCMarkAssistDone: + if flags&UtilAssist != 0 { + ps[ev.P].gc-- + delete(assists, ev.G) + } + case EvGCSweepStart: + if flags&UtilSweep != 0 { + ps[ev.P].gc++ + } + case EvGCSweepDone: + if flags&UtilSweep != 0 { + ps[ev.P].gc-- + } + case EvGoStartLabel: + if flags&UtilBackground != 0 && strings.HasPrefix(ev.SArgs[0], "GC ") && ev.SArgs[0] != "GC (idle)" { + // Background mark worker. + // + // If we're in per-proc mode, we don't + // count dedicated workers because + // they kick all of the goroutines off + // that P, so don't directly + // contribute to goroutine latency. + if !(flags&UtilPerProc != 0 && ev.SArgs[0] == "GC (dedicated)") { + bgMark[ev.G] = true + ps[ev.P].gc++ + } + } + fallthrough + case EvGoStart: + if assists[ev.G] { + // Unblocked during assist. + ps[ev.P].gc++ + } + block[ev.G] = ev.Link + default: + if ev != block[ev.G] { + continue + } + + if assists[ev.G] { + // Blocked during assist. + ps[ev.P].gc-- + } + if bgMark[ev.G] { + // Background mark worker done. + ps[ev.P].gc-- + delete(bgMark, ev.G) + } + delete(block, ev.G) + } + + if flags&UtilPerProc == 0 { + // Compute the current average utilization. + if len(ps) == 0 { + continue + } + gcPs := 0 + if stw > 0 { + gcPs = len(ps) + } else { + for i := range ps { + if ps[i].gc > 0 { + gcPs++ + } + } + } + mu := MutatorUtil{ev.Ts, 1 - float64(gcPs)/float64(len(ps))} + + // Record the utilization change. (Since + // len(ps) == len(out), we know len(out) > 0.) + out[0] = addUtil(out[0], mu) + } else { + // Check for per-P utilization changes. + for i := range ps { + p := &ps[i] + util := 1.0 + if stw > 0 || p.gc > 0 { + util = 0.0 + } + out[p.series] = addUtil(out[p.series], MutatorUtil{ev.Ts, util}) + } + } + } + + // Add final 0 utilization event to any remaining series. This + // is important to mark the end of the trace. The exact value + // shouldn't matter since no window should extend beyond this, + // but using 0 is symmetric with the start of the trace. + mu := MutatorUtil{events[len(events)-1].Ts, 0} + for i := range ps { + out[ps[i].series] = addUtil(out[ps[i].series], mu) + } + return out +} + +func addUtil(util []MutatorUtil, mu MutatorUtil) []MutatorUtil { + if len(util) > 0 { + if mu.Util == util[len(util)-1].Util { + // No change. + return util + } + if mu.Time == util[len(util)-1].Time { + // Take the lowest utilization at a time stamp. + if mu.Util < util[len(util)-1].Util { + util[len(util)-1] = mu + } + return util + } + } + return append(util, mu) +} + +// totalUtil is total utilization, measured in nanoseconds. This is a +// separate type primarily to distinguish it from mean utilization, +// which is also a float64. +type totalUtil float64 + +func totalUtilOf(meanUtil float64, dur int64) totalUtil { + return totalUtil(meanUtil * float64(dur)) +} + +// mean returns the mean utilization over dur. +func (u totalUtil) mean(dur time.Duration) float64 { + return float64(u) / float64(dur) +} + +// An MMUCurve is the minimum mutator utilization curve across +// multiple window sizes. +type MMUCurve struct { + series []mmuSeries +} + +type mmuSeries struct { + util []MutatorUtil + // sums[j] is the cumulative sum of util[:j]. + sums []totalUtil + // bands summarizes util in non-overlapping bands of duration + // bandDur. + bands []mmuBand + // bandDur is the duration of each band. + bandDur int64 +} + +type mmuBand struct { + // minUtil is the minimum instantaneous mutator utilization in + // this band. + minUtil float64 + // cumUtil is the cumulative total mutator utilization between + // time 0 and the left edge of this band. + cumUtil totalUtil + + // integrator is the integrator for the left edge of this + // band. + integrator integrator +} + +// NewMMUCurve returns an MMU curve for the given mutator utilization +// function. +func NewMMUCurve(utils [][]MutatorUtil) *MMUCurve { + series := make([]mmuSeries, len(utils)) + for i, util := range utils { + series[i] = newMMUSeries(util) + } + return &MMUCurve{series} +} + +// bandsPerSeries is the number of bands to divide each series into. +// This is only changed by tests. +var bandsPerSeries = 1000 + +func newMMUSeries(util []MutatorUtil) mmuSeries { + // Compute cumulative sum. + sums := make([]totalUtil, len(util)) + var prev MutatorUtil + var sum totalUtil + for j, u := range util { + sum += totalUtilOf(prev.Util, u.Time-prev.Time) + sums[j] = sum + prev = u + } + + // Divide the utilization curve up into equal size + // non-overlapping "bands" and compute a summary for each of + // these bands. + // + // Compute the duration of each band. + numBands := bandsPerSeries + if numBands > len(util) { + // There's no point in having lots of bands if there + // aren't many events. + numBands = len(util) + } + dur := util[len(util)-1].Time - util[0].Time + bandDur := (dur + int64(numBands) - 1) / int64(numBands) + if bandDur < 1 { + bandDur = 1 + } + // Compute the bands. There are numBands+1 bands in order to + // record the final cumulative sum. + bands := make([]mmuBand, numBands+1) + s := mmuSeries{util, sums, bands, bandDur} + leftSum := integrator{&s, 0} + for i := range bands { + startTime, endTime := s.bandTime(i) + cumUtil := leftSum.advance(startTime) + predIdx := leftSum.pos + minUtil := 1.0 + for i := predIdx; i < len(util) && util[i].Time < endTime; i++ { + minUtil = math.Min(minUtil, util[i].Util) + } + bands[i] = mmuBand{minUtil, cumUtil, leftSum} + } + + return s +} + +func (s *mmuSeries) bandTime(i int) (start, end int64) { + start = int64(i)*s.bandDur + s.util[0].Time + end = start + s.bandDur + return +} + +type bandUtil struct { + // Utilization series index + series int + // Band index + i int + // Lower bound of mutator utilization for all windows + // with a left edge in this band. + utilBound float64 +} + +type bandUtilHeap []bandUtil + +func (h bandUtilHeap) Len() int { + return len(h) +} + +func (h bandUtilHeap) Less(i, j int) bool { + return h[i].utilBound < h[j].utilBound +} + +func (h bandUtilHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *bandUtilHeap) Push(x interface{}) { + *h = append(*h, x.(bandUtil)) +} + +func (h *bandUtilHeap) Pop() interface{} { + x := (*h)[len(*h)-1] + *h = (*h)[:len(*h)-1] + return x +} + +// UtilWindow is a specific window at Time. +type UtilWindow struct { + Time int64 + // MutatorUtil is the mean mutator utilization in this window. + MutatorUtil float64 +} + +type utilHeap []UtilWindow + +func (h utilHeap) Len() int { + return len(h) +} + +func (h utilHeap) Less(i, j int) bool { + if h[i].MutatorUtil != h[j].MutatorUtil { + return h[i].MutatorUtil > h[j].MutatorUtil + } + return h[i].Time > h[j].Time +} + +func (h utilHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *utilHeap) Push(x interface{}) { + *h = append(*h, x.(UtilWindow)) +} + +func (h *utilHeap) Pop() interface{} { + x := (*h)[len(*h)-1] + *h = (*h)[:len(*h)-1] + return x +} + +// An accumulator takes a windowed mutator utilization function and +// tracks various statistics for that function. +type accumulator struct { + mmu float64 + + // bound is the mutator utilization bound where adding any + // mutator utilization above this bound cannot affect the + // accumulated statistics. + bound float64 + + // Worst N window tracking + nWorst int + wHeap utilHeap + + // Mutator utilization distribution tracking + mud *mud + // preciseMass is the distribution mass that must be precise + // before accumulation is stopped. + preciseMass float64 + // lastTime and lastMU are the previous point added to the + // windowed mutator utilization function. + lastTime int64 + lastMU float64 +} + +// resetTime declares a discontinuity in the windowed mutator +// utilization function by resetting the current time. +func (acc *accumulator) resetTime() { + // This only matters for distribution collection, since that's + // the only thing that depends on the progression of the + // windowed mutator utilization function. + acc.lastTime = math.MaxInt64 +} + +// addMU adds a point to the windowed mutator utilization function at +// (time, mu). This must be called for monotonically increasing values +// of time. +// +// It returns true if further calls to addMU would be pointless. +func (acc *accumulator) addMU(time int64, mu float64, window time.Duration) bool { + if mu < acc.mmu { + acc.mmu = mu + } + acc.bound = acc.mmu + + if acc.nWorst == 0 { + // If the minimum has reached zero, it can't go any + // lower, so we can stop early. + return mu == 0 + } + + // Consider adding this window to the n worst. + if len(acc.wHeap) < acc.nWorst || mu < acc.wHeap[0].MutatorUtil { + // This window is lower than the K'th worst window. + // + // Check if there's any overlapping window + // already in the heap and keep whichever is + // worse. + for i, ui := range acc.wHeap { + if time+int64(window) > ui.Time && ui.Time+int64(window) > time { + if ui.MutatorUtil <= mu { + // Keep the first window. + goto keep + } else { + // Replace it with this window. + heap.Remove(&acc.wHeap, i) + break + } + } + } + + heap.Push(&acc.wHeap, UtilWindow{time, mu}) + if len(acc.wHeap) > acc.nWorst { + heap.Pop(&acc.wHeap) + } + keep: + } + + if len(acc.wHeap) < acc.nWorst { + // We don't have N windows yet, so keep accumulating. + acc.bound = 1.0 + } else { + // Anything above the least worst window has no effect. + acc.bound = math.Max(acc.bound, acc.wHeap[0].MutatorUtil) + } + + if acc.mud != nil { + if acc.lastTime != math.MaxInt64 { + // Update distribution. + acc.mud.add(acc.lastMU, mu, float64(time-acc.lastTime)) + } + acc.lastTime, acc.lastMU = time, mu + if _, mudBound, ok := acc.mud.approxInvCumulativeSum(); ok { + acc.bound = math.Max(acc.bound, mudBound) + } else { + // We haven't accumulated enough total precise + // mass yet to even reach our goal, so keep + // accumulating. + acc.bound = 1 + } + // It's not worth checking percentiles every time, so + // just keep accumulating this band. + return false + } + + // If we've found enough 0 utilizations, we can stop immediately. + return len(acc.wHeap) == acc.nWorst && acc.wHeap[0].MutatorUtil == 0 +} + +// MMU returns the minimum mutator utilization for the given time +// window. This is the minimum utilization for all windows of this +// duration across the execution. The returned value is in the range +// [0, 1]. +func (c *MMUCurve) MMU(window time.Duration) (mmu float64) { + acc := accumulator{mmu: 1.0, bound: 1.0} + c.mmu(window, &acc) + return acc.mmu +} + +// Examples returns n specific examples of the lowest mutator +// utilization for the given window size. The returned windows will be +// disjoint (otherwise there would be a huge number of +// mostly-overlapping windows at the single lowest point). There are +// no guarantees on which set of disjoint windows this returns. +func (c *MMUCurve) Examples(window time.Duration, n int) (worst []UtilWindow) { + acc := accumulator{mmu: 1.0, bound: 1.0, nWorst: n} + c.mmu(window, &acc) + sort.Sort(sort.Reverse(acc.wHeap)) + return ([]UtilWindow)(acc.wHeap) +} + +// MUD returns mutator utilization distribution quantiles for the +// given window size. +// +// The mutator utilization distribution is the distribution of mean +// mutator utilization across all windows of the given window size in +// the trace. +// +// The minimum mutator utilization is the minimum (0th percentile) of +// this distribution. (However, if only the minimum is desired, it's +// more efficient to use the MMU method.) +func (c *MMUCurve) MUD(window time.Duration, quantiles []float64) []float64 { + if len(quantiles) == 0 { + return []float64{} + } + + // Each unrefined band contributes a known total mass to the + // distribution (bandDur except at the end), but in an unknown + // way. However, we know that all the mass it contributes must + // be at or above its worst-case mean mutator utilization. + // + // Hence, we refine bands until the highest desired + // distribution quantile is less than the next worst-case mean + // mutator utilization. At this point, all further + // contributions to the distribution must be beyond the + // desired quantile and hence cannot affect it. + // + // First, find the highest desired distribution quantile. + maxQ := quantiles[0] + for _, q := range quantiles { + if q > maxQ { + maxQ = q + } + } + // The distribution's mass is in units of time (it's not + // normalized because this would make it more annoying to + // account for future contributions of unrefined bands). The + // total final mass will be the duration of the trace itself + // minus the window size. Using this, we can compute the mass + // corresponding to quantile maxQ. + var duration int64 + for _, s := range c.series { + duration1 := s.util[len(s.util)-1].Time - s.util[0].Time + if duration1 >= int64(window) { + duration += duration1 - int64(window) + } + } + qMass := float64(duration) * maxQ + + // Accumulate the MUD until we have precise information for + // everything to the left of qMass. + acc := accumulator{mmu: 1.0, bound: 1.0, preciseMass: qMass, mud: new(mud)} + acc.mud.setTrackMass(qMass) + c.mmu(window, &acc) + + // Evaluate the quantiles on the accumulated MUD. + out := make([]float64, len(quantiles)) + for i := range out { + mu, _ := acc.mud.invCumulativeSum(float64(duration) * quantiles[i]) + if math.IsNaN(mu) { + // There are a few legitimate ways this can + // happen: + // + // 1. If the window is the full trace + // duration, then the windowed MU function is + // only defined at a single point, so the MU + // distribution is not well-defined. + // + // 2. If there are no events, then the MU + // distribution has no mass. + // + // Either way, all of the quantiles will have + // converged toward the MMU at this point. + mu = acc.mmu + } + out[i] = mu + } + return out +} + +func (c *MMUCurve) mmu(window time.Duration, acc *accumulator) { + if window <= 0 { + acc.mmu = 0 + return + } + + var bandU bandUtilHeap + windows := make([]time.Duration, len(c.series)) + for i, s := range c.series { + windows[i] = window + if max := time.Duration(s.util[len(s.util)-1].Time - s.util[0].Time); window > max { + windows[i] = max + } + + bandU1 := bandUtilHeap(s.mkBandUtil(i, windows[i])) + if bandU == nil { + bandU = bandU1 + } else { + bandU = append(bandU, bandU1...) + } + } + + // Process bands from lowest utilization bound to highest. + heap.Init(&bandU) + + // Refine each band into a precise window and MMU until + // refining the next lowest band can no longer affect the MMU + // or windows. + for len(bandU) > 0 && bandU[0].utilBound < acc.bound { + i := bandU[0].series + c.series[i].bandMMU(bandU[0].i, windows[i], acc) + heap.Pop(&bandU) + } +} + +func (c *mmuSeries) mkBandUtil(series int, window time.Duration) []bandUtil { + // For each band, compute the worst-possible total mutator + // utilization for all windows that start in that band. + + // minBands is the minimum number of bands a window can span + // and maxBands is the maximum number of bands a window can + // span in any alignment. + minBands := int((int64(window) + c.bandDur - 1) / c.bandDur) + maxBands := int((int64(window) + 2*(c.bandDur-1)) / c.bandDur) + if window > 1 && maxBands < 2 { + panic("maxBands < 2") + } + tailDur := int64(window) % c.bandDur + nUtil := len(c.bands) - maxBands + 1 + if nUtil < 0 { + nUtil = 0 + } + bandU := make([]bandUtil, nUtil) + for i := range bandU { + // To compute the worst-case MU, we assume the minimum + // for any bands that are only partially overlapped by + // some window and the mean for any bands that are + // completely covered by all windows. + var util totalUtil + + // Find the lowest and second lowest of the partial + // bands. + l := c.bands[i].minUtil + r1 := c.bands[i+minBands-1].minUtil + r2 := c.bands[i+maxBands-1].minUtil + minBand := math.Min(l, math.Min(r1, r2)) + // Assume the worst window maximally overlaps the + // worst minimum and then the rest overlaps the second + // worst minimum. + if minBands == 1 { + util += totalUtilOf(minBand, int64(window)) + } else { + util += totalUtilOf(minBand, c.bandDur) + midBand := 0.0 + switch { + case minBand == l: + midBand = math.Min(r1, r2) + case minBand == r1: + midBand = math.Min(l, r2) + case minBand == r2: + midBand = math.Min(l, r1) + } + util += totalUtilOf(midBand, tailDur) + } + + // Add the total mean MU of bands that are completely + // overlapped by all windows. + if minBands > 2 { + util += c.bands[i+minBands-1].cumUtil - c.bands[i+1].cumUtil + } + + bandU[i] = bandUtil{series, i, util.mean(window)} + } + + return bandU +} + +// bandMMU computes the precise minimum mutator utilization for +// windows with a left edge in band bandIdx. +func (c *mmuSeries) bandMMU(bandIdx int, window time.Duration, acc *accumulator) { + util := c.util + + // We think of the mutator utilization over time as the + // box-filtered utilization function, which we call the + // "windowed mutator utilization function". The resulting + // function is continuous and piecewise linear (unless + // window==0, which we handle elsewhere), where the boundaries + // between segments occur when either edge of the window + // encounters a change in the instantaneous mutator + // utilization function. Hence, the minimum of this function + // will always occur when one of the edges of the window + // aligns with a utilization change, so these are the only + // points we need to consider. + // + // We compute the mutator utilization function incrementally + // by tracking the integral from t=0 to the left edge of the + // window and to the right edge of the window. + left := c.bands[bandIdx].integrator + right := left + time, endTime := c.bandTime(bandIdx) + if utilEnd := util[len(util)-1].Time - int64(window); utilEnd < endTime { + endTime = utilEnd + } + acc.resetTime() + for { + // Advance edges to time and time+window. + mu := (right.advance(time+int64(window)) - left.advance(time)).mean(window) + if acc.addMU(time, mu, window) { + break + } + if time == endTime { + break + } + + // The maximum slope of the windowed mutator + // utilization function is 1/window, so we can always + // advance the time by at least (mu - mmu) * window + // without dropping below mmu. + minTime := time + int64((mu-acc.bound)*float64(window)) + + // Advance the window to the next time where either + // the left or right edge of the window encounters a + // change in the utilization curve. + if t1, t2 := left.next(time), right.next(time+int64(window))-int64(window); t1 < t2 { + time = t1 + } else { + time = t2 + } + if time < minTime { + time = minTime + } + if time >= endTime { + // For MMUs we could stop here, but for MUDs + // it's important that we span the entire + // band. + time = endTime + } + } +} + +// An integrator tracks a position in a utilization function and +// integrates it. +type integrator struct { + u *mmuSeries + // pos is the index in u.util of the current time's non-strict + // predecessor. + pos int +} + +// advance returns the integral of the utilization function from 0 to +// time. advance must be called on monotonically increasing values of +// times. +func (in *integrator) advance(time int64) totalUtil { + util, pos := in.u.util, in.pos + // Advance pos until pos+1 is time's strict successor (making + // pos time's non-strict predecessor). + // + // Very often, this will be nearby, so we optimize that case, + // but it may be arbitrarily far away, so we handled that + // efficiently, too. + const maxSeq = 8 + if pos+maxSeq < len(util) && util[pos+maxSeq].Time > time { + // Nearby. Use a linear scan. + for pos+1 < len(util) && util[pos+1].Time <= time { + pos++ + } + } else { + // Far. Binary search for time's strict successor. + l, r := pos, len(util) + for l < r { + h := int(uint(l+r) >> 1) + if util[h].Time <= time { + l = h + 1 + } else { + r = h + } + } + pos = l - 1 // Non-strict predecessor. + } + in.pos = pos + var partial totalUtil + if time != util[pos].Time { + partial = totalUtilOf(util[pos].Util, time-util[pos].Time) + } + return in.u.sums[pos] + partial +} + +// next returns the smallest time t' > time of a change in the +// utilization function. +func (in *integrator) next(time int64) int64 { + for _, u := range in.u.util[in.pos:] { + if u.Time > time { + return u.Time + } + } + return 1<<63 - 1 +} diff --git a/internal/trace/gc_test.go b/internal/trace/gc_test.go new file mode 100644 index 00000000000..4f9c77041a9 --- /dev/null +++ b/internal/trace/gc_test.go @@ -0,0 +1,202 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package trace + +import ( + "bytes" + "io/ioutil" + "math" + "testing" + "time" +) + +// aeq returns true if x and y are equal up to 8 digits (1 part in 100 +// million). +func aeq(x, y float64) bool { + if x < 0 && y < 0 { + x, y = -x, -y + } + const digits = 8 + factor := 1 - math.Pow(10, -digits+1) + return x*factor <= y && y*factor <= x +} + +func TestMMU(t *testing.T) { + t.Parallel() + + // MU + // 1.0 ***** ***** ***** + // 0.5 * * * * + // 0.0 ***** ***** + // 0 1 2 3 4 5 + util := [][]MutatorUtil{{ + {0e9, 1}, + {1e9, 0}, + {2e9, 1}, + {3e9, 0}, + {4e9, 1}, + {5e9, 0}, + }} + mmuCurve := NewMMUCurve(util) + + for _, test := range []struct { + window time.Duration + want float64 + worst []float64 + }{ + {0, 0, []float64{}}, + {time.Millisecond, 0, []float64{0, 0}}, + {time.Second, 0, []float64{0, 0}}, + {2 * time.Second, 0.5, []float64{0.5, 0.5}}, + {3 * time.Second, 1 / 3.0, []float64{1 / 3.0}}, + {4 * time.Second, 0.5, []float64{0.5}}, + {5 * time.Second, 3 / 5.0, []float64{3 / 5.0}}, + {6 * time.Second, 3 / 5.0, []float64{3 / 5.0}}, + } { + if got := mmuCurve.MMU(test.window); !aeq(test.want, got) { + t.Errorf("for %s window, want mu = %f, got %f", test.window, test.want, got) + } + worst := mmuCurve.Examples(test.window, 2) + // Which exact windows are returned is unspecified + // (and depends on the exact banding), so we just + // check that we got the right number with the right + // utilizations. + if len(worst) != len(test.worst) { + t.Errorf("for %s window, want worst %v, got %v", test.window, test.worst, worst) + } else { + for i := range worst { + if worst[i].MutatorUtil != test.worst[i] { + t.Errorf("for %s window, want worst %v, got %v", test.window, test.worst, worst) + break + } + } + } + } +} + +func TestMMUTrace(t *testing.T) { + // Can't be t.Parallel() because it modifies the + // testingOneBand package variable. + if testing.Short() { + // test input too big for all.bash + t.Skip("skipping in -short mode") + } + + data, err := ioutil.ReadFile("testdata/stress_1_10_good") + if err != nil { + t.Fatalf("failed to read input file: %v", err) + } + _, events, err := parse(bytes.NewReader(data), "") + if err != nil { + t.Fatalf("failed to parse trace: %s", err) + } + mu := MutatorUtilization(events.Events, UtilSTW|UtilBackground|UtilAssist) + mmuCurve := NewMMUCurve(mu) + + // Test the optimized implementation against the "obviously + // correct" implementation. + for window := time.Nanosecond; window < 10*time.Second; window *= 10 { + want := mmuSlow(mu[0], window) + got := mmuCurve.MMU(window) + if !aeq(want, got) { + t.Errorf("want %f, got %f mutator utilization in window %s", want, got, window) + } + } + + // Test MUD with band optimization against MUD without band + // optimization. We don't have a simple testing implementation + // of MUDs (the simplest implementation is still quite + // complex), but this is still a pretty good test. + defer func(old int) { bandsPerSeries = old }(bandsPerSeries) + bandsPerSeries = 1 + mmuCurve2 := NewMMUCurve(mu) + quantiles := []float64{0, 1 - .999, 1 - .99} + for window := time.Microsecond; window < time.Second; window *= 10 { + mud1 := mmuCurve.MUD(window, quantiles) + mud2 := mmuCurve2.MUD(window, quantiles) + for i := range mud1 { + if !aeq(mud1[i], mud2[i]) { + t.Errorf("for quantiles %v at window %v, want %v, got %v", quantiles, window, mud2, mud1) + break + } + } + } +} + +func BenchmarkMMU(b *testing.B) { + data, err := ioutil.ReadFile("testdata/stress_1_10_good") + if err != nil { + b.Fatalf("failed to read input file: %v", err) + } + _, events, err := parse(bytes.NewReader(data), "") + if err != nil { + b.Fatalf("failed to parse trace: %s", err) + } + mu := MutatorUtilization(events.Events, UtilSTW|UtilBackground|UtilAssist|UtilSweep) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mmuCurve := NewMMUCurve(mu) + xMin, xMax := time.Microsecond, time.Second + logMin, logMax := math.Log(float64(xMin)), math.Log(float64(xMax)) + const samples = 100 + for i := 0; i < samples; i++ { + window := time.Duration(math.Exp(float64(i)/(samples-1)*(logMax-logMin) + logMin)) + mmuCurve.MMU(window) + } + } +} + +func mmuSlow(util []MutatorUtil, window time.Duration) (mmu float64) { + if max := time.Duration(util[len(util)-1].Time - util[0].Time); window > max { + window = max + } + + mmu = 1.0 + + // muInWindow returns the mean mutator utilization between + // util[0].Time and end. + muInWindow := func(util []MutatorUtil, end int64) float64 { + total := 0.0 + var prevU MutatorUtil + for _, u := range util { + if u.Time > end { + total += prevU.Util * float64(end-prevU.Time) + break + } + total += prevU.Util * float64(u.Time-prevU.Time) + prevU = u + } + return total / float64(end-util[0].Time) + } + update := func() { + for i, u := range util { + if u.Time+int64(window) > util[len(util)-1].Time { + break + } + mmu = math.Min(mmu, muInWindow(util[i:], u.Time+int64(window))) + } + } + + // Consider all left-aligned windows. + update() + // Reverse the trace. Slightly subtle because each MutatorUtil + // is a *change*. + rutil := make([]MutatorUtil, len(util)) + if util[len(util)-1].Util != 0 { + panic("irreversible trace") + } + for i, u := range util { + util1 := 0.0 + if i != 0 { + util1 = util[i-1].Util + } + rutil[len(rutil)-i-1] = MutatorUtil{Time: -u.Time, Util: util1} + } + util = rutil + // Consider all right-aligned windows. + update() + return +} diff --git a/internal/trace/goroutines.go b/internal/trace/goroutines.go new file mode 100644 index 00000000000..a5fda489bea --- /dev/null +++ b/internal/trace/goroutines.go @@ -0,0 +1,338 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package trace + +import "sort" + +// GDesc contains statistics and execution details of a single goroutine. +type GDesc struct { + ID uint64 + Name string + PC uint64 + CreationTime int64 + StartTime int64 + EndTime int64 + + // List of regions in the goroutine, sorted based on the start time. + Regions []*UserRegionDesc + + // Statistics of execution time during the goroutine execution. + GExecutionStat + + *gdesc // private part. +} + +// UserRegionDesc represents a region and goroutine execution stats +// while the region was active. +type UserRegionDesc struct { + TaskID uint64 + Name string + + // Region start event. Normally EvUserRegion start event or nil, + // but can be EvGoCreate event if the region is a synthetic + // region representing task inheritance from the parent goroutine. + Start *Event + + // Region end event. Normally EvUserRegion end event or nil, + // but can be EvGoStop or EvGoEnd event if the goroutine + // terminated without explicitly ending the region. + End *Event + + GExecutionStat +} + +// GExecutionStat contains statistics about a goroutine's execution +// during a period of time. +type GExecutionStat struct { + ExecTime int64 + SchedWaitTime int64 + IOTime int64 + BlockTime int64 + SyscallTime int64 + GCTime int64 + SweepTime int64 + TotalTime int64 +} + +// sub returns the stats v-s. +func (s GExecutionStat) sub(v GExecutionStat) (r GExecutionStat) { + r = s + r.ExecTime -= v.ExecTime + r.SchedWaitTime -= v.SchedWaitTime + r.IOTime -= v.IOTime + r.BlockTime -= v.BlockTime + r.SyscallTime -= v.SyscallTime + r.GCTime -= v.GCTime + r.SweepTime -= v.SweepTime + r.TotalTime -= v.TotalTime + return r +} + +// snapshotStat returns the snapshot of the goroutine execution statistics. +// This is called as we process the ordered trace event stream. lastTs and +// activeGCStartTime are used to process pending statistics if this is called +// before any goroutine end event. +func (g *GDesc) snapshotStat(lastTs, activeGCStartTime int64) (ret GExecutionStat) { + ret = g.GExecutionStat + + if g.gdesc == nil { + return ret // finalized GDesc. No pending state. + } + + if activeGCStartTime != 0 { // terminating while GC is active + if g.CreationTime < activeGCStartTime { + ret.GCTime += lastTs - activeGCStartTime + } else { + // The goroutine's lifetime completely overlaps + // with a GC. + ret.GCTime += lastTs - g.CreationTime + } + } + + if g.TotalTime == 0 { + ret.TotalTime = lastTs - g.CreationTime + } + + if g.lastStartTime != 0 { + ret.ExecTime += lastTs - g.lastStartTime + } + if g.blockNetTime != 0 { + ret.IOTime += lastTs - g.blockNetTime + } + if g.blockSyncTime != 0 { + ret.BlockTime += lastTs - g.blockSyncTime + } + if g.blockSyscallTime != 0 { + ret.SyscallTime += lastTs - g.blockSyscallTime + } + if g.blockSchedTime != 0 { + ret.SchedWaitTime += lastTs - g.blockSchedTime + } + if g.blockSweepTime != 0 { + ret.SweepTime += lastTs - g.blockSweepTime + } + return ret +} + +// finalize is called when processing a goroutine end event or at +// the end of trace processing. This finalizes the execution stat +// and any active regions in the goroutine, in which case trigger is nil. +func (g *GDesc) finalize(lastTs, activeGCStartTime int64, trigger *Event) { + if trigger != nil { + g.EndTime = trigger.Ts + } + finalStat := g.snapshotStat(lastTs, activeGCStartTime) + + g.GExecutionStat = finalStat + for _, s := range g.activeRegions { + s.End = trigger + s.GExecutionStat = finalStat.sub(s.GExecutionStat) + g.Regions = append(g.Regions, s) + } + *(g.gdesc) = gdesc{} +} + +// gdesc is a private part of GDesc that is required only during analysis. +type gdesc struct { + lastStartTime int64 + blockNetTime int64 + blockSyncTime int64 + blockSyscallTime int64 + blockSweepTime int64 + blockGCTime int64 + blockSchedTime int64 + + activeRegions []*UserRegionDesc // stack of active regions +} + +// GoroutineStats generates statistics for all goroutines in the trace. +func GoroutineStats(events []*Event) map[uint64]*GDesc { + gs := make(map[uint64]*GDesc) + var lastTs int64 + var gcStartTime int64 // gcStartTime == 0 indicates gc is inactive. + for _, ev := range events { + lastTs = ev.Ts + switch ev.Type { + case EvGoCreate: + g := &GDesc{ID: ev.Args[0], CreationTime: ev.Ts, gdesc: new(gdesc)} + g.blockSchedTime = ev.Ts + // When a goroutine is newly created, inherit the + // task of the active region. For ease handling of + // this case, we create a fake region description with + // the task id. + if creatorG := gs[ev.G]; creatorG != nil && len(creatorG.gdesc.activeRegions) > 0 { + regions := creatorG.gdesc.activeRegions + s := regions[len(regions)-1] + if s.TaskID != 0 { + g.gdesc.activeRegions = []*UserRegionDesc{ + {TaskID: s.TaskID, Start: ev}, + } + } + } + gs[g.ID] = g + case EvGoStart, EvGoStartLabel: + g := gs[ev.G] + if g.PC == 0 { + g.PC = ev.Stk[0].PC + g.Name = ev.Stk[0].Fn + } + g.lastStartTime = ev.Ts + if g.StartTime == 0 { + g.StartTime = ev.Ts + } + if g.blockSchedTime != 0 { + g.SchedWaitTime += ev.Ts - g.blockSchedTime + g.blockSchedTime = 0 + } + case EvGoEnd, EvGoStop: + g := gs[ev.G] + g.finalize(ev.Ts, gcStartTime, ev) + case EvGoBlockSend, EvGoBlockRecv, EvGoBlockSelect, + EvGoBlockSync, EvGoBlockCond: + g := gs[ev.G] + g.ExecTime += ev.Ts - g.lastStartTime + g.lastStartTime = 0 + g.blockSyncTime = ev.Ts + case EvGoSched, EvGoPreempt: + g := gs[ev.G] + g.ExecTime += ev.Ts - g.lastStartTime + g.lastStartTime = 0 + g.blockSchedTime = ev.Ts + case EvGoSleep, EvGoBlock: + g := gs[ev.G] + g.ExecTime += ev.Ts - g.lastStartTime + g.lastStartTime = 0 + case EvGoBlockNet: + g := gs[ev.G] + g.ExecTime += ev.Ts - g.lastStartTime + g.lastStartTime = 0 + g.blockNetTime = ev.Ts + case EvGoBlockGC: + g := gs[ev.G] + g.ExecTime += ev.Ts - g.lastStartTime + g.lastStartTime = 0 + g.blockGCTime = ev.Ts + case EvGoUnblock: + g := gs[ev.Args[0]] + if g.blockNetTime != 0 { + g.IOTime += ev.Ts - g.blockNetTime + g.blockNetTime = 0 + } + if g.blockSyncTime != 0 { + g.BlockTime += ev.Ts - g.blockSyncTime + g.blockSyncTime = 0 + } + g.blockSchedTime = ev.Ts + case EvGoSysBlock: + g := gs[ev.G] + g.ExecTime += ev.Ts - g.lastStartTime + g.lastStartTime = 0 + g.blockSyscallTime = ev.Ts + case EvGoSysExit: + g := gs[ev.G] + if g.blockSyscallTime != 0 { + g.SyscallTime += ev.Ts - g.blockSyscallTime + g.blockSyscallTime = 0 + } + g.blockSchedTime = ev.Ts + case EvGCSweepStart: + g := gs[ev.G] + if g != nil { + // Sweep can happen during GC on system goroutine. + g.blockSweepTime = ev.Ts + } + case EvGCSweepDone: + g := gs[ev.G] + if g != nil && g.blockSweepTime != 0 { + g.SweepTime += ev.Ts - g.blockSweepTime + g.blockSweepTime = 0 + } + case EvGCStart: + gcStartTime = ev.Ts + case EvGCDone: + for _, g := range gs { + if g.EndTime != 0 { + continue + } + if gcStartTime < g.CreationTime { + g.GCTime += ev.Ts - g.CreationTime + } else { + g.GCTime += ev.Ts - gcStartTime + } + } + gcStartTime = 0 // indicates gc is inactive. + case EvUserRegion: + g := gs[ev.G] + switch mode := ev.Args[1]; mode { + case 0: // region start + g.activeRegions = append(g.activeRegions, &UserRegionDesc{ + Name: ev.SArgs[0], + TaskID: ev.Args[0], + Start: ev, + GExecutionStat: g.snapshotStat(lastTs, gcStartTime), + }) + case 1: // region end + var sd *UserRegionDesc + if regionStk := g.activeRegions; len(regionStk) > 0 { + n := len(regionStk) + sd = regionStk[n-1] + regionStk = regionStk[:n-1] // pop + g.activeRegions = regionStk + } else { + sd = &UserRegionDesc{ + Name: ev.SArgs[0], + TaskID: ev.Args[0], + } + } + sd.GExecutionStat = g.snapshotStat(lastTs, gcStartTime).sub(sd.GExecutionStat) + sd.End = ev + g.Regions = append(g.Regions, sd) + } + } + } + + for _, g := range gs { + g.finalize(lastTs, gcStartTime, nil) + + // sort based on region start time + sort.Slice(g.Regions, func(i, j int) bool { + x := g.Regions[i].Start + y := g.Regions[j].Start + if x == nil { + return true + } + if y == nil { + return false + } + return x.Ts < y.Ts + }) + + g.gdesc = nil + } + + return gs +} + +// RelatedGoroutines finds a set of goroutines related to goroutine goid. +func RelatedGoroutines(events []*Event, goid uint64) map[uint64]bool { + // BFS of depth 2 over "unblock" edges + // (what goroutines unblock goroutine goid?). + gmap := make(map[uint64]bool) + gmap[goid] = true + for i := 0; i < 2; i++ { + gmap1 := make(map[uint64]bool) + for g := range gmap { + gmap1[g] = true + } + for _, ev := range events { + if ev.Type == EvGoUnblock && gmap[ev.Args[0]] { + gmap1[ev.G] = true + } + } + gmap = gmap1 + } + gmap[0] = true // for GC events + return gmap +} diff --git a/internal/trace/mkcanned.bash b/internal/trace/mkcanned.bash new file mode 100755 index 00000000000..b365b909c8c --- /dev/null +++ b/internal/trace/mkcanned.bash @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +# Copyright 2016 The Go Authors. All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +# mkcanned.bash creates canned traces for the trace test suite using +# the current Go version. + +set -e + +if [ $# != 1 ]; then + echo "usage: $0