Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: add transaction tracer (#32412)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Kroh <andrew.kroh@elastic.co>
  • Loading branch information
efd6 and andrewkroh authored Jul 28, 2022
1 parent 1b55527 commit 765648f
Show file tree
Hide file tree
Showing 9 changed files with 553 additions and 216 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Optimize grok patterns in system.auth module pipeline. {pull}32360[32360]
- Checkpoint module: add authentication operation outcome enrichment. {issue}32230[32230] {pull}32431[32431]
- add documentation for decode_xml_wineventlog processor field mappings. {pull}32456[32456]
- httpjson input: Add request tracing logger. {issue}32402[32402] {pull}32412[32412]

*Auditbeat*

Expand Down
483 changes: 272 additions & 211 deletions NOTICE.txt

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ require (
github.com/urso/sderr v0.0.0-20210525210834-52b04e8f5c71
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41
github.com/xdg/scram v1.0.3
go.elastic.co/ecszap v1.0.1 // indirect
go.elastic.co/ecszap v1.0.1
go.elastic.co/go-licence-detector v0.5.0
go.etcd.io/bbolt v1.3.6
go.uber.org/atomic v1.9.0
Expand Down Expand Up @@ -212,6 +212,7 @@ require (
go.elastic.co/apm/module/apmhttp/v2 v2.0.0
go.elastic.co/apm/v2 v2.0.0
go.mongodb.org/mongo-driver v1.5.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
Expand Down Expand Up @@ -2502,6 +2503,7 @@ gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlI
gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/square/go-jose.v2 v2.2.2/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
Expand Down
37 changes: 37 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,43 @@ filebeat.inputs:
value: '[[now (parseDuration "-1h")]]'
----

[float]
==== `request.tracer.filename`

It is possible to log httpjson requests and responses to a local file-system for debugging configurations.
This option is enabled by setting the `request.tracer.filename` value. Additional options are available to
tune log rotation behavior.

Enabling this option compromises security and should only be used for debugging.

[float]
==== `request.tracer.maxsize`

This value sets the maximum size, in megabytes, the log file will reach before it is rotated. By default
logs are allowed to reach 1MB before rotation.

[float]
==== `request.tracer.maxage`

This specifies the number days to retain rotated log files. If it is not set, log files are retained
indefinitely.

[float]
==== `request.tracer.maxbackups`

The number of old logs to retain. If it is not set all old logs are retained subject to the `request.tracer.maxage`
setting.

[float]
==== `request.tracer.localtime`

Whether to use the host's local time rather that UTC for timestamping rotated log file names.

[float]
==== `request.tracer.compress`

This determines whether rotated logs should be gzip compressed.

[float]
==== `response.decode_as`

Expand Down
16 changes: 16 additions & 0 deletions x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"strings"
"time"

"gopkg.in/natefinch/lumberjack.v2"

"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)
Expand Down Expand Up @@ -100,6 +102,8 @@ type requestConfig struct {
Transforms transformsConfig `config:"transforms"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`

Tracer *lumberjack.Logger `config:"tracer"`
}

func (c *requestConfig) Validate() error {
Expand All @@ -124,5 +128,17 @@ func (c *requestConfig) Validate() error {
}
}

if c.Tracer != nil {
if c.Tracer.Filename == "" {
return errors.New("request tracer must have a filename if used")
}
if c.Tracer.MaxSize == 0 {
// By default Lumberjack caps file sizes at 100MB which
// is excessive for a debugging logger, so default to 1MB
// which is the minimum.
c.Tracer.MaxSize = 1
}
}

return nil
}
15 changes: 15 additions & 0 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ import (
"time"

retryablehttp "github.com/hashicorp/go-retryablehttp"
"go.elastic.co/ecszap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/internal/httplog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
Expand Down Expand Up @@ -160,6 +163,18 @@ func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpC
return nil, err
}

if config.Request.Tracer != nil {
w := zapcore.AddSync(config.Request.Tracer)
core := ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(),
w,
zap.DebugLevel,
)
traceLogger := zap.New(core)

netHTTPClient.Transport = httplog.NewLoggingRoundTripper(netHTTPClient.Transport, traceLogger)
}

netHTTPClient.CheckRedirect = checkRedirect(config.Request, log)

client := &retryablehttp.Client{
Expand Down
208 changes: 208 additions & 0 deletions x-pack/filebeat/input/httpjson/internal/httplog/roundtripper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// Package httplog provides http request and response transaction logging.
package httplog

import (
"bytes"
"encoding/base32"
"encoding/binary"
"errors"
"fmt"
"io"
"net/http"
"net/http/httputil"
"strconv"
"time"

"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var _ http.RoundTripper = (*LoggingRoundTripper)(nil)

// TraceIDKey is key used to add a trace.id value to the context of HTTP
// requests. The value will be logged by LoggingRoundTripper.
const TraceIDKey = contextKey("trace.id")

type contextKey string

// NewLoggingRoundTripper returns a LoggingRoundTripper that logs requests and
// responses to the provided logger.
func NewLoggingRoundTripper(next http.RoundTripper, logger *zap.Logger) *LoggingRoundTripper {
return &LoggingRoundTripper{
transport: next,
logger: logger,
txBaseID: newID(),
txIDCounter: atomic.NewUint64(0),
}
}

// LoggingRoundTripper is an http.RoundTripper that logs requests and responses.
type LoggingRoundTripper struct {
transport http.RoundTripper
logger *zap.Logger // Destination logger.
txBaseID string // Random value to make transaction IDs unique.
txIDCounter *atomic.Uint64 // Transaction ID counter that is incremented for each request.
}

// RoundTrip implements the http.RoundTripper interface, logging
// the request and response to the underlying logger.
//
// Fields logged in requests:
// url.original
// url.scheme
// url.path
// url.domain
// url.port
// url.query
// http.request
// user_agent.original
// http.request.body.content
// http.request.body.bytes
// http.request.mime_type
// event.original (the full request and body from httputil.DumpRequestOut)
//
// Fields logged in responses:
// http.response.status_code
// http.response.body.content
// http.response.body.bytes
// http.response.mime_type
// event.original (the full response and body from httputil.DumpResponse)
//
func (rt *LoggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
// Create a child logger for this request.
log := rt.logger.With(
zap.String("transaction.id", rt.nextTxID()),
)

if v := req.Context().Value(TraceIDKey); v != nil {
if traceID, ok := v.(string); ok {
log = log.With(zap.String("trace.id", traceID))
}
}

reqParts := []zapcore.Field{
zap.String("url.original", req.URL.String()),
zap.String("url.scheme", req.URL.Scheme),
zap.String("url.path", req.URL.Path),
zap.String("url.domain", req.URL.Hostname()),
zap.String("url.port", req.URL.Port()),
zap.String("url.query", req.URL.RawQuery),
zap.String("http.request.method", req.Method),
zap.String("user_agent.original", req.Header.Get("User-Agent")),
}
var (
body []byte
err error
errorsMessages []string
)
req.Body, body, err = copyBody(req.Body)
if err != nil {
errorsMessages = append(errorsMessages, fmt.Sprintf("failed to read request body: %s", err))
} else {
reqParts = append(reqParts,
zap.ByteString("http.request.body.content", body),
zap.Int("http.request.body.bytes", len(body)),
zap.String("http.request.mime_type", req.Header.Get("Content-Type")),
)
}
message, err := httputil.DumpRequestOut(req, true)
if err != nil {
errorsMessages = append(errorsMessages, fmt.Sprintf("failed to dump request: %s", err))
} else {
reqParts = append(reqParts, zap.ByteString("event.original", message))
}
switch len(errorsMessages) {
case 0:
case 1:
reqParts = append(reqParts, zap.String("error.message", errorsMessages[0]))
default:
reqParts = append(reqParts, zap.Strings("error.message", errorsMessages))
}
log.Debug("HTTP request", reqParts...)

resp, err := rt.transport.RoundTrip(req)
if err != nil {
log.Debug("HTTP response error", zap.NamedError("error.message", err))
return resp, err
}
if resp == nil {
log.Debug("HTTP response error", noResponse)
return resp, err
}
respParts := append(reqParts[:0],
zap.Int("http.response.status_code", resp.StatusCode),
)
errorsMessages = errorsMessages[:0]
resp.Body, body, err = copyBody(resp.Body)
if err != nil {
errorsMessages = append(errorsMessages, fmt.Sprintf("failed to read response body: %s", err))
} else {
respParts = append(respParts,
zap.ByteString("http.response.body.content", body),
zap.Int("http.response.body.bytes", len(body)),
zap.String("http.response.mime_type", resp.Header.Get("Content-Type")),
)
}
message, err = httputil.DumpResponse(resp, true)
if err != nil {
errorsMessages = append(errorsMessages, fmt.Sprintf("failed to dump response: %s", err))
} else {
respParts = append(respParts, zap.ByteString("event.original", message))
}
switch len(errorsMessages) {
case 0:
case 1:
respParts = append(reqParts, zap.String("error.message", errorsMessages[0]))
default:
respParts = append(reqParts, zap.Strings("error.message", errorsMessages))
}
log.Debug("HTTP response", respParts...)

return resp, err
}

// nextTxID returns the next transaction.id value. It increments the internal
// request counter.
func (rt *LoggingRoundTripper) nextTxID() string {
count := rt.txIDCounter.Inc()
return rt.txBaseID + "-" + strconv.FormatUint(count, 10)
}

var noResponse = zap.NamedError("error.message", errors.New("unexpected nil response"))

// newID returns an ID derived from the current time.
func newID() string {
var data [8]byte
binary.LittleEndian.PutUint64(data[:], uint64(time.Now().UnixNano()))
return base32.HexEncoding.WithPadding(base32.NoPadding).EncodeToString(data[:])
}

// copyBody is derived from drainBody in net/http/httputil/dump.go
//
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//
// copyBody reads all of b to memory and then returns a
// ReadCloser yielding the same bytes, and the bytes themselves.
//
// It returns an error if the initial slurp of all bytes fails.
func copyBody(b io.ReadCloser) (r io.ReadCloser, body []byte, err error) {
if b == nil || b == http.NoBody {
// No copying needed. Preserve the magic sentinel meaning of NoBody.
return http.NoBody, nil, nil
}
var buf bytes.Buffer
if _, err = buf.ReadFrom(b); err != nil {
return nil, buf.Bytes(), err
}
if err = b.Close(); err != nil {
return nil, buf.Bytes(), err
}
return io.NopCloser(&buf), buf.Bytes(), nil
}
4 changes: 0 additions & 4 deletions x-pack/filebeat/input/httpjson/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ func newRateLimiterFromConfig(config *rateLimitConfig, log *logp.Logger) *rateLi
func (r *rateLimiter) execute(ctx context.Context, f func() (*http.Response, error)) (*http.Response, error) {
for {
resp, err := f()
if err != nil {
return nil, err
}

if err != nil {
return nil, fmt.Errorf("failed to read http.response.body: %w", err)
}
Expand Down

0 comments on commit 765648f

Please sign in to comment.