diff --git a/config/config.go b/config/config.go index d9cb8d25c5..8993d82c67 100644 --- a/config/config.go +++ b/config/config.go @@ -84,6 +84,7 @@ type Config struct { BlockProfileRate int `yaml:"block-profile-rate"` MutexProfileFraction int `yaml:"mutex-profile-fraction"` MemProfileRate int `yaml:"memory-profile-rate"` + FlightRecorderTargetURL string `yaml:"flight-recorder-target-url"` DebugGcMetrics bool `yaml:"debug-gc-metrics"` RuntimeMetrics bool `yaml:"runtime-metrics"` ServeRouteMetrics bool `yaml:"serve-route-metrics"` @@ -369,6 +370,7 @@ func NewConfig() *Config { // logging, metrics, tracing: flag.BoolVar(&cfg.EnablePrometheusMetrics, "enable-prometheus-metrics", false, "*Deprecated*: use metrics-flavour. Switch to Prometheus metrics format to expose metrics") + flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds") flag.StringVar(&cfg.OpenTracing, "opentracing", "noop", "list of arguments for opentracing (space separated), first argument is the tracer implementation") flag.StringVar(&cfg.OpenTracingInitialSpan, "opentracing-initial-span", "ingress", "set the name of the initial, pre-routing, tracing span") flag.StringVar(&cfg.OpenTracingExcludedProxyTags, "opentracing-excluded-proxy-tags", "", "set tags that should be excluded from spans created for proxy operation. must be a comma-separated list of strings.") @@ -382,7 +384,7 @@ func NewConfig() *Config { flag.IntVar(&cfg.BlockProfileRate, "block-profile-rate", 0, "block profile sample rate, see runtime.SetBlockProfileRate") flag.IntVar(&cfg.MutexProfileFraction, "mutex-profile-fraction", 0, "mutex profile fraction rate, see runtime.SetMutexProfileFraction") flag.IntVar(&cfg.MemProfileRate, "memory-profile-rate", 0, "memory profile rate, see runtime.SetMemProfileRate, keeps default 512 kB") - flag.BoolVar(&cfg.EnablePrometheusStartLabel, "enable-prometheus-start-label", false, "adds start label to each prometheus counter with the value of counter creation timestamp as unix nanoseconds") + flag.StringVar(&cfg.FlightRecorderTargetURL, "flight-recorder-target-url", "", "sets the flight recorder target URL that is used to write out the trace to.") flag.BoolVar(&cfg.DebugGcMetrics, "debug-gc-metrics", false, "enables reporting of the Go garbage collector statistics exported in debug.GCStats") flag.BoolVar(&cfg.RuntimeMetrics, "runtime-metrics", true, "enables reporting of the Go runtime statistics exported in runtime and specifically runtime.MemStats") flag.BoolVar(&cfg.ServeRouteMetrics, "serve-route-metrics", false, "enables reporting total serve time metrics for each route") @@ -755,6 +757,7 @@ func (c *Config) ToOptions() skipper.Options { EnableProfile: c.EnableProfile, BlockProfileRate: c.BlockProfileRate, MutexProfileFraction: c.MutexProfileFraction, + FlightRecorderTargetURL: c.FlightRecorderTargetURL, EnableDebugGcMetrics: c.DebugGcMetrics, EnableRuntimeMetrics: c.RuntimeMetrics, EnableServeRouteMetrics: c.ServeRouteMetrics, diff --git a/filters/builtin/builtin.go b/filters/builtin/builtin.go index 1003756c60..0c8a6345e1 100644 --- a/filters/builtin/builtin.go +++ b/filters/builtin/builtin.go @@ -191,6 +191,7 @@ func Filters() []filters.Spec { diag.NewNormalResponseLatency(), diag.NewHistogramRequestLatency(), diag.NewHistogramResponseLatency(), + diag.NewTrace(), tee.NewTee(), tee.NewTeeDeprecated(), tee.NewTeeNoFollow(), diff --git a/filters/diag/trace.go b/filters/diag/trace.go new file mode 100644 index 0000000000..1f4e6725ed --- /dev/null +++ b/filters/diag/trace.go @@ -0,0 +1,52 @@ +package diag + +import ( + "time" + + log "github.com/sirupsen/logrus" + "github.com/zalando/skipper/filters" +) + +func getDurationArg(a interface{}) (time.Duration, error) { + if s, ok := a.(string); ok { + return time.ParseDuration(s) + } + return 0, filters.ErrInvalidFilterParameters +} + +type traceSpec struct{} + +type trace struct { + d time.Duration +} + +// NewTrace creates a filter specification for the trace() filter +func NewTrace() filters.Spec { + return &traceSpec{} +} + +func (*traceSpec) Name() string { + return filters.TraceName +} + +func (ts *traceSpec) CreateFilter(args []interface{}) (filters.Filter, error) { + if len(args) != 1 { + return nil, filters.ErrInvalidFilterParameters + } + + d, err := getDurationArg(args[0]) + if err != nil { + log.Warnf("d failed on creation of trace(): %v", err) + return nil, filters.ErrInvalidFilterParameters + } + + return &trace{ + d: d, + }, nil +} + +func (tr *trace) Request(ctx filters.FilterContext) { + ctx.StateBag()[filters.TraceName] = tr.d +} + +func (*trace) Response(filters.FilterContext) {} diff --git a/filters/filters.go b/filters/filters.go index 358e666b17..8050b17ac1 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -272,6 +272,7 @@ const ( NormalResponseLatencyName = "normalResponseLatency" HistogramRequestLatencyName = "histogramRequestLatency" HistogramResponseLatencyName = "histogramResponseLatency" + TraceName = "trace" LogBodyName = "logBody" LogHeaderName = "logHeader" TeeName = "tee" diff --git a/go.mod b/go.mod index d5239f4f6d..2ae2767152 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/yuin/gopher-lua v1.1.1 go4.org/netipx v0.0.0-20220925034521-797b0c90d8ab golang.org/x/crypto v0.26.0 - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 + golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 golang.org/x/net v0.28.0 golang.org/x/oauth2 v0.22.0 golang.org/x/sync v0.8.0 @@ -169,11 +169,18 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/automaxprocs v1.5.3 // indirect +<<<<<<< HEAD golang.org/x/mod v0.19.0 // indirect golang.org/x/sys v0.23.0 // indirect golang.org/x/text v0.17.0 // indirect golang.org/x/tools v0.23.0 // indirect golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f // indirect +======= + golang.org/x/mod v0.18.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + golang.org/x/tools v0.22.0 // indirect +>>>>>>> baa60cfa (feature: allow configuration for Go x/trace.FlightRecorder) gonum.org/v1/gonum v0.8.2 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect diff --git a/go.sum b/go.sum index ce0fafcd25..1c539cce14 100644 --- a/go.sum +++ b/go.sum @@ -538,10 +538,8 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= -golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f h1:3CW0unweImhOzd5FmYuRsD4Y4oQFKZIjAnKbjV4WIrw= -golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= diff --git a/proxy/flightrecorder_test.go b/proxy/flightrecorder_test.go new file mode 100644 index 0000000000..858dc15f57 --- /dev/null +++ b/proxy/flightrecorder_test.go @@ -0,0 +1,76 @@ +package proxy_test + +import ( + "bytes" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/filters/diag" + "github.com/zalando/skipper/proxy" + "github.com/zalando/skipper/proxy/proxytest" + xtrace "golang.org/x/exp/trace" +) + +func TestFlightRecorder(t *testing.T) { + service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "PUT" { + w.WriteHeader(http.StatusMethodNotAllowed) + w.Write([]byte(http.StatusText(http.StatusMethodNotAllowed))) + return + } + + var buf bytes.Buffer + n, err := io.Copy(&buf, r.Body) + if err != nil { + t.Fatalf("Failed to copy data: %v", err) + } + if n < 100 { + t.Fatalf("Failed to write enough data: %d bytes", n) + } + w.WriteHeader(http.StatusCreated) + w.Write([]byte(http.StatusText(http.StatusCreated))) + + })) + defer service.Close() + + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + defer backend.Close() + + flightRecorder := xtrace.NewFlightRecorder() + flightRecorder.Start() + + spec := diag.NewTrace() + fr := make(filters.Registry) + fr.Register(spec) + + doc := fmt.Sprintf(`r: * -> trace("20µs") -> "%s"`, backend.URL) + rr := eskip.MustParse(doc) + + pr := proxytest.WithParams(fr, proxy.Params{ + FlightRecorder: flightRecorder, + FlightRecorderTargetURL: service.URL, + }, rr...) + defer pr.Close() + + rsp, err := pr.Client().Get(pr.URL) + if err != nil { + t.Fatalf("Failed to GET %q: %v", pr.URL, err) + } + defer rsp.Body.Close() + _, err = io.ReadAll(rsp.Body) + if err != nil { + t.Fatalf("Failed to read body: %v", err) + } + + switch rsp.StatusCode { + case 200, 201, 204: + // ok + default: + t.Fatalf("Failed to get status OK: %d", rsp.StatusCode) + } +} diff --git a/proxy/proxy.go b/proxy/proxy.go index fe4c71726b..197bd6d127 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -18,7 +18,6 @@ import ( "runtime" "strconv" "strings" - "sync" "time" "unicode/utf8" @@ -362,6 +361,13 @@ type Params struct { // PassiveHealthCheck defines the parameters for the healthy endpoints checker. PassiveHealthCheck *PassiveHealthCheck + + // FlightRecorder is a started instance of https://pkg.go.dev/golang.org/x/exp/trace#FlightRecorder + FlightRecorder *trace.FlightRecorder + + // FlightRecorderTargetURL is the target to write the trace + // to. Supported targets are http URL and file URL. + FlightRecorderTargetURL string } type ( @@ -457,8 +463,7 @@ type Proxy struct { hostname string onPanicSometimes rate.Sometimes flightRecorder *trace.FlightRecorder - traceOnce sync.Once - tooLong time.Duration + flightRecorderURL *url.URL } // proxyError is used to wrap errors during proxying and to indicate @@ -850,13 +855,15 @@ func WithParams(p Params) *Proxy { maxUnhealthyEndpointsRatio: p.PassiveHealthCheck.MaxUnhealthyEndpointsRatio, } } - // TODO(sszuecs): expose an option to start it - fr := trace.NewFlightRecorder() - //fr.SetPeriod(d) - //fr.SetSize(bytes int) - err := fr.Start() - if err != nil { - println("Failed to start FlightRecorder:", err.Error()) + + var frURL *url.URL + if p.FlightRecorder != nil { + var err error + frURL, err = url.Parse(p.FlightRecorderTargetURL) + if err != nil { + p.FlightRecorder.Stop() + p.FlightRecorder = nil + } } return &Proxy{ @@ -887,32 +894,62 @@ func WithParams(p Params) *Proxy { clientTLS: tr.TLSClientConfig, hostname: hostname, onPanicSometimes: rate.Sometimes{First: 3, Interval: 1 * time.Minute}, - flightRecorder: fr, - traceOnce: sync.Once{}, - tooLong: 250 * time.Millisecond, + flightRecorder: p.FlightRecorder, + flightRecorderURL: frURL, } } func (p *Proxy) writeTraceIfTooSlow(ctx *context) { - p.log.Infof("write trace if too slow: %s > %s", time.Since(ctx.startServe), p.tooLong) - if time.Since(ctx.startServe) > p.tooLong { - p.log.Info("too slow") - // Do it only once for simplicitly, but you can take more than one. - p.traceOnce.Do(func() { - p.log.Info("write trace because we were too slow") - // Grab the snapshot. - var b bytes.Buffer - _, err := p.flightRecorder.WriteTo(&b) - if err != nil { - p.log.Errorf("Failed to write flightrecorder data: %v", err) + if p.flightRecorder == nil || p.flightRecorderURL == nil { + return + } + + var d time.Duration + if e, ok := ctx.StateBag()[filters.TraceName]; ok { + d = e.(time.Duration) + } + if d < 1*time.Microsecond { + return + } + + p.log.Infof("write trace if too slow: %s > %s", time.Since(ctx.startServe), d) + if time.Since(ctx.startServe) > d { + var b bytes.Buffer + _, err := p.flightRecorder.WriteTo(&b) + if err != nil { + p.log.Errorf("Failed to write flightrecorder data: %v", err) + return + } + + switch p.flightRecorderURL.Scheme { + case "file": + if err := os.WriteFile(p.flightRecorderURL.Path, b.Bytes(), 0o644); err != nil { + p.log.Errorf("Failed to write file trace.out: %v", err) return + } else { + p.log.Infof("FlightRecorder wrote %d bytes to trace file %q", b.Len(), p.flightRecorderURL.Path) } - // Write it to a file. - if err := os.WriteFile("trace.out", b.Bytes(), 0o755); err != nil { - p.log.Errorf("Failed to write trace.out: %v", err) - return + case "http", "https": + req, err := http.NewRequest("PUT", p.flightRecorderURL.String(), &b) + if err != nil { + p.log.Errorf("Failed to create request to %q to send a trace: %v", p.flightRecorderURL.String(), err) } - }) + + rsp, err := p.roundTripper.RoundTrip(req) + if err != nil { + p.log.Errorf("Failed to write trace to %q: %v", p.flightRecorderURL.String(), err) + } else { + rsp.Body.Close() + } + switch rsp.StatusCode { + case 200, 201, 204: + p.log.Infof("Successful send of a trace to %q", p.flightRecorderURL.String()) + default: + p.log.Errorf("Failed to get successful response from %s: (%d) %s", p.flightRecorderURL.String(), rsp.StatusCode, rsp.Status) + } + default: + p.log.Errorf("Failed to write trace, unknown FlightRecorderURL scheme %q", p.flightRecorderURL.Scheme) + } } } @@ -1698,7 +1735,10 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (p *Proxy) Close() error { close(p.quit) p.registry.Close() - p.flightRecorder.Stop() + if p.flightRecorder != nil { + p.flightRecorder.Stop() + } + return nil } diff --git a/skipper.go b/skipper.go index 8b4bdc2697..1f31b21269 100644 --- a/skipper.go +++ b/skipper.go @@ -22,6 +22,7 @@ import ( ot "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" + "golang.org/x/exp/trace" "github.com/zalando/skipper/circuit" "github.com/zalando/skipper/dataclients/kubernetes" @@ -74,6 +75,9 @@ import ( const ( defaultSourcePollTimeout = 30 * time.Millisecond defaultRoutingUpdateBuffer = 1 << 5 + + defaultFlightRecorderPeriod = 1 * time.Minute + defaultFlightRecorderSize = 1 << 27 // 128 MB ) const DefaultPluginDir = "./plugins" @@ -462,6 +466,21 @@ type Options struct { // MemProfileRate calls runtime.SetMemProfileRate(MemProfileRate) if non zero value, deactivate with <0 MemProfileRate int + // FlightRecorderSizeBytes set size of the FlightRecorder https://pkg.go.dev/golang.org/x/exp/trace#FlightRecorder.SetSize + FlightRecorderSize int + + // FlightRecorderPeriod set period of the FlightRecorder https://pkg.go.dev/golang.org/x/exp/trace#FlightRecorder.SetPeriod + FlightRecorderPeriod time.Duration + + // FlightRecorderTargetURL is the target to write the trace + // to. Supported targets are http URL and file URL. Skipper + // will try to upload the trace data by an http PUT request to + // this http URL. This is required to set if you want to have + // trace.FlightRecorder + // https://pkg.go.dev/golang.org/x/exp/trace#FlightRecorder + // enabled to support Go tool trace. + FlightRecorderTargetURL string + // Flag that enables reporting of the Go garbage collector statistics exported in debug.GCStats EnableDebugGcMetrics bool @@ -2031,6 +2050,31 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { routing := routing.New(ro) defer routing.Close() + var fr *trace.FlightRecorder + if o.FlightRecorderTargetURL != "" { + fr = trace.NewFlightRecorder() + + if o.FlightRecorderPeriod != 0 { + fr.SetPeriod(o.FlightRecorderPeriod) + } else { + fr.SetPeriod(defaultFlightRecorderPeriod) + } + + if o.FlightRecorderSize != 0 { + fr.SetSize(o.FlightRecorderSize) + } else { + fr.SetSize(defaultFlightRecorderSize) + } + + err := fr.Start() + if err != nil { + log.Errorf("Failed to start FlightRecorder: %v", err) + fr.Stop() + fr = nil + } + } + log.Infof("FlightRecorder: %v", fr) + proxyFlags := proxy.Flags(o.ProxyOptions) | o.ProxyFlags proxyParams := proxy.Params{ Routing: routing, @@ -2059,6 +2103,8 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { EndpointRegistry: endpointRegistry, EnablePassiveHealthCheck: passiveHealthCheckEnabled, PassiveHealthCheck: passiveHealthCheck, + FlightRecorder: fr, + FlightRecorderTargetURL: o.FlightRecorderTargetURL, } if o.EnableBreakers || len(o.BreakerSettings) > 0 {