Skip to content

Commit

Permalink
Experimental self-profiling (#2839)
Browse files Browse the repository at this point in the history
* vendor: add github.com/google/pprof/profile

* model/profile: add profile data model

* config: config for profiling the server

* beater: add profiling endpoint

* vendor: add github.com/OneOfOne/xxhash

Already being used by Beats.

* model: fix fields

* beater/api/profile: address review comments

* beater/config: split InstrumentationConfig out

* decoder: add LimitedReader

* beater/api/profile: use decoder.LimitedReader

* Add integration test

* idxmgmt: add "profile" event index

* tests/system: add system tests for profiling

* model/profile: remove profile.id

Not needed for now, and will get in the way of storage
optimisations (aggregating multiple profiles) later.
  • Loading branch information
axw authored Nov 29, 2019
1 parent 9dbfc0d commit 2b1f9e9
Show file tree
Hide file tree
Showing 52 changed files with 31,085 additions and 45 deletions.
18 changes: 18 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,15 @@ License type (autodetected): Apache-2.0
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/google/pprof
Revision: 27840fff0d09770c422884093a210ac5ce453ea6
License type (autodetected): Apache-2.0
./vendor/github.com/google/pprof/LICENSE:
--------------------------------------------------------------------
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/google/shlex
Revision: c34317bd91bf98fab745d77b03933cf8769299fe
Expand Down Expand Up @@ -2474,6 +2483,15 @@ License type (autodetected): Apache-2.0
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/OneOfOne/xxhash
Revision: 8f0be54a8d5ccf68817143d8c996147ec5cbd795
License type (autodetected): Apache-2.0
./vendor/github.com/OneOfOne/xxhash/LICENSE:
--------------------------------------------------------------------
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/opencontainers/go-digest
Revision: eaa60544f31ccf3b0653b1a118b76d33418ff41b
Expand Down
14 changes: 14 additions & 0 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ apm-server:
# secret_token for the remote apm-servers.
#secret_token:

# Enable profiling of the server, recording profile samples as events.
#
# This feature is experimental.
#profiling:
#cpu:
# Set to true to enable CPU profiling.
#enabled: false
#interval: 60s
#duration: 10s
#heap:
# Set to true to enable heap profiling.
#enabled: false
#interval: 60s

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
Expand Down
14 changes: 14 additions & 0 deletions apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ apm-server:
# secret_token for the remote apm-servers.
#secret_token:

# Enable profiling of the server, recording profile samples as events.
#
# This feature is experimental.
#profiling:
#cpu:
# Set to true to enable CPU profiling.
#enabled: false
#interval: 60s
#duration: 10s
#heap:
# Set to true to enable heap profiling.
#enabled: false
#interval: 60s

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
Expand Down
14 changes: 14 additions & 0 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ apm-server:
# secret_token for the remote apm-servers.
#secret_token:

# Enable profiling of the server, recording profile samples as events.
#
# This feature is experimental.
#profiling:
#cpu:
# Set to true to enable CPU profiling.
#enabled: false
#interval: 60s
#duration: 10s
#heap:
# Set to true to enable heap profiling.
#enabled: false
#interval: 60s

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
Expand Down
18 changes: 18 additions & 0 deletions beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/apm-server/beater/api/asset/sourcemap"
"github.com/elastic/apm-server/beater/api/config/agent"
"github.com/elastic/apm-server/beater/api/intake"
"github.com/elastic/apm-server/beater/api/profile"
"github.com/elastic/apm-server/beater/api/root"
"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/beater/middleware"
Expand Down Expand Up @@ -58,6 +59,9 @@ const (
// IntakeRUMPath defines the path to ingest monitored RUM events
IntakeRUMPath = "/intake/v2/rum/events"

// ProfilePath defines the path to ingest profiles
ProfilePath = "/intake/v2/profile"

// AssetSourcemapPath defines the path to upload sourcemaps
AssetSourcemapPath = "/assets/v1/sourcemaps"
)
Expand Down Expand Up @@ -86,6 +90,15 @@ func NewMux(beaterConfig *config.Config, report publish.Reporter) (*http.ServeMu
{IntakePath, backendHandler},
}

// Profiling is currently experimental, and intended for profiling the
// server itself, so we only add the route if self-profiling is enabled.
if beaterConfig.SelfInstrumentation.IsEnabled() {
if beaterConfig.SelfInstrumentation.Profiling.CPU.IsEnabled() ||
beaterConfig.SelfInstrumentation.Profiling.Heap.IsEnabled() {
routeMap = append(routeMap, route{ProfilePath, profileHandler})
}
}

for _, route := range routeMap {
h, err := route.handlerFn(beaterConfig, report)
if err != nil {
Expand All @@ -103,6 +116,11 @@ func NewMux(beaterConfig *config.Config, report publish.Reporter) (*http.ServeMu
return mux, nil
}

func profileHandler(cfg *config.Config, reporter publish.Reporter) (request.Handler, error) {
h := profile.Handler(systemMetadataDecoder(cfg, emptyDecoder), transform.Config{}, reporter)
return middleware.Wrap(h, backendMiddleware(cfg, profile.MonitoringMap)...)
}

func backendHandler(cfg *config.Config, reporter publish.Reporter) (request.Handler, error) {
h := intake.Handler(systemMetadataDecoder(cfg, emptyDecoder),
&stream.Processor{
Expand Down
240 changes: 240 additions & 0 deletions beater/api/profile/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package profile

import (
"fmt"
"io"
"net/http"
"strings"

pprof_profile "github.com/google/pprof/profile"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/monitoring"

"github.com/elastic/apm-server/beater/headers"
"github.com/elastic/apm-server/beater/request"
"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/model/metadata"
"github.com/elastic/apm-server/model/profile"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/transform"
"github.com/elastic/apm-server/utility"
"github.com/elastic/apm-server/validation"
)

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.MonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.profile", monitoring.PublishExpvar)
)

const (
// TODO(axw) include messageType in pprofContentType; needs fix in agent
pprofContentType = "application/x-protobuf"
metadataContentType = "application/json"
requestContentType = "multipart/form-data"

metadataContentLengthLimit = 10 * 1024
profileContentLengthLimit = 10 * 1024 * 1024
)

// Handler returns a request.Handler for managing profile requests.
func Handler(
dec decoder.ReqDecoder,
transformConfig transform.Config,
report publish.Reporter,
) request.Handler {
handle := func(c *request.Context) (*result, error) {
if c.Request.Method != http.MethodPost {
return nil, requestError{
id: request.IDResponseErrorsMethodNotAllowed,
err: errors.New("only POST requests are supported"),
}
}
if err := validateContentType(c.Request.Header, requestContentType); err != nil {
return nil, requestError{
id: request.IDResponseErrorsValidate,
err: err,
}
}

ok := c.RateLimiter == nil || c.RateLimiter.Allow()
if !ok {
return nil, requestError{
id: request.IDResponseErrorsRateLimit,
err: errors.New("rate limit exceeded"),
}
}

// Extract metadata from the request, such as the remote address.
reqMeta, err := dec(c.Request)
if err != nil {
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode request metadata"),
}
}

tctx := &transform.Context{
RequestTime: utility.RequestTime(c.Request.Context()),
Config: transformConfig,
}

var totalLimitRemaining int64 = profileContentLengthLimit
var profiles []*pprof_profile.Profile
mr, err := c.Request.MultipartReader()
if err != nil {
return nil, err
}
for {
part, err := mr.NextPart()
if err == io.EOF {
break
} else if err != nil {
return nil, err
}

switch part.FormName() {
case "metadata":
if err := validateContentType(http.Header(part.Header), metadataContentType); err != nil {
return nil, requestError{
id: request.IDResponseErrorsValidate,
err: errors.Wrap(err, "invalid metadata"),
}
}
r := &decoder.LimitedReader{R: part, N: metadataContentLengthLimit}
raw, err := decoder.DecodeJSONData(r)
if err != nil {
if r.N < 0 {
return nil, requestError{
id: request.IDResponseErrorsRequestTooLarge,
err: err,
}
}
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode metadata JSON"),
}
}
for k, v := range reqMeta {
utility.InsertInMap(raw, k, v.(map[string]interface{}))
}
if err := validation.Validate(raw, metadata.ModelSchema()); err != nil {
return nil, requestError{
id: request.IDResponseErrorsValidate,
err: errors.Wrap(err, "invalid metadata"),
}
}
metadata, err := metadata.DecodeMetadata(raw)
if err != nil {
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode metadata"),
}
}
tctx.Metadata = *metadata

case "profile":
if err := validateContentType(http.Header(part.Header), pprofContentType); err != nil {
return nil, requestError{
id: request.IDResponseErrorsValidate,
err: errors.Wrap(err, "invalid profile"),
}
}
r := &decoder.LimitedReader{R: part, N: totalLimitRemaining}
profile, err := pprof_profile.Parse(r)
if err != nil {
if r.N < 0 {
return nil, requestError{
id: request.IDResponseErrorsRequestTooLarge,
err: err,
}
}
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode profile"),
}
}
profiles = append(profiles, profile)
totalLimitRemaining = r.N
}
}

transformables := make([]transform.Transformable, len(profiles))
for i, p := range profiles {
transformables[i] = profile.PprofProfile{Profile: p}
}

if err := report(c.Request.Context(), publish.PendingReq{
Transformables: transformables,
Tcontext: tctx,
}); err != nil {
switch err {
case publish.ErrChannelClosed:
return nil, requestError{
id: request.IDResponseErrorsShuttingDown,
err: errors.New("server is shutting down"),
}
case publish.ErrFull:
return nil, requestError{
id: request.IDResponseErrorsFullQueue,
err: err,
}
}
return nil, err
}
return &result{Accepted: len(transformables)}, nil
}
return func(c *request.Context) {
result, err := handle(c)
if err != nil {
switch err := err.(type) {
case requestError:
c.Result.SetWithError(err.id, err)
default:
c.Result.SetWithError(request.IDResponseErrorsInternal, err)
}
} else {
c.Result.SetWithBody(request.IDResponseValidAccepted, result)
}
c.Write()
}
}

func validateContentType(header http.Header, contentType string) error {
got := header.Get(headers.ContentType)
if !strings.Contains(got, contentType) {
return fmt.Errorf("invalid content type %q, expected %q", got, contentType)
}
return nil
}

type result struct {
Accepted int `json:"accepted"`
}

type requestError struct {
id request.ResultID
err error
}

func (e requestError) Error() string {
return e.err.Error()
}
Loading

0 comments on commit 2b1f9e9

Please sign in to comment.