Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/httpjson: add transaction tracer #32412

Merged
merged 6 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Optimize grok patterns in system.auth module pipeline. {pull}32360[32360]
- Checkpoint module: add authentication operation outcome enrichment. {issue}32230[32230] {pull}32431[32431]
- add documentation for decode_xml_wineventlog processor field mappings. {pull}32456[32456]
- httpjson input: Add request tracing logger. {issue}32402[32402] {pull}32412[32412]

*Auditbeat*

Expand Down
483 changes: 272 additions & 211 deletions NOTICE.txt

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ require (
github.com/urso/sderr v0.0.0-20210525210834-52b04e8f5c71
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41
github.com/xdg/scram v1.0.3
go.elastic.co/ecszap v1.0.1 // indirect
go.elastic.co/ecszap v1.0.1
go.elastic.co/go-licence-detector v0.5.0
go.etcd.io/bbolt v1.3.6
go.uber.org/atomic v1.9.0
Expand Down Expand Up @@ -212,6 +212,7 @@ require (
go.elastic.co/apm/module/apmhttp/v2 v2.0.0
go.elastic.co/apm/v2 v2.0.0
go.mongodb.org/mongo-driver v1.5.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
Expand Down Expand Up @@ -2502,6 +2503,7 @@ gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlI
gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/square/go-jose.v2 v2.2.2/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
Expand Down
37 changes: 37 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,43 @@ filebeat.inputs:
value: '[[now (parseDuration "-1h")]]'
----

[float]
==== `request.tracer.filename`

It is possible to log httpjson requests and responses to a local file-system for debugging configurations.
This option is enabled by setting the `request.tracer.filename` value. Additional options are available to
tune log rotation behavior.

Enabling this option compromises security and should only be used for debugging.

[float]
==== `request.tracer.maxsize`

This value sets the maximum size, in megabytes, the log file will reach before it is rotated. By default
logs are allowed to reach 1MB before rotation.

[float]
==== `request.tracer.maxage`

This specifies the number days to retain rotated log files. If it is not set, log files are retained
indefinitely.

[float]
==== `request.tracer.maxbackups`

The number of old logs to retain. If it is not set all old logs are retained subject to the `request.tracer.maxage`
setting.

[float]
==== `request.tracer.localtime`

Whether to use the host's local time rather that UTC for timestamping rotated log file names.

[float]
==== `request.tracer.compress`

This determines whether rotated logs should be gzip compressed.

[float]
==== `response.decode_as`

Expand Down
16 changes: 16 additions & 0 deletions x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"strings"
"time"

"gopkg.in/natefinch/lumberjack.v2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a logging library in Beats: https://github.com/elastic/elastic-agent-libs/tree/main/logp that uses go.uber.org/zap under the hood.

If possible I'd prefer to stick with those ones (preferably elastic-agent-libs/logp) to avoid introducing a new dependency and "logging" library.

What are the advantages of using lumberjack over the ones we already have in Beats?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another alternative is https://pkg.go.dev/github.com/elastic/elastic-agent-libs/file#Rotator. It gives you an io.WriteCloser interface and manages rolling the underlying files.

But it looks like the library got polluted since I last looked with a hardcoded file extension 😢 . https://github.com/elastic/elastic-agent-libs/blob/a09d65fd12dec3efe53c44f05562431f96b1028b/file/rotator.go#L416 IMO is should have used filepath.Ext to get the extension from the configured filename.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the advantages of using lumberjack over the ones we already have in Beats?

Simplicity; this does exactly what is needed and is transparently simple.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewkroh's suggestion looks great and simple. The file extension is a pitty, but we can easily fix that as well. I glanced over the code and it seems to be possible to make the extension configurable without changing the public API.


"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)
Expand Down Expand Up @@ -100,6 +102,8 @@ type requestConfig struct {
Transforms transformsConfig `config:"transforms"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`

Tracer *lumberjack.Logger `config:"tracer"`
}

func (c *requestConfig) Validate() error {
Expand All @@ -124,5 +128,17 @@ func (c *requestConfig) Validate() error {
}
}

if c.Tracer != nil {
if c.Tracer.Filename == "" {
return errors.New("request tracer must have a filename if used")
}
if c.Tracer.MaxSize == 0 {
// By default Lumberjack caps file sizes at 100MB which
// is excessive for a debugging logger, so default to 1MB
// which is the minimum.
c.Tracer.MaxSize = 1
}
}

return nil
}
15 changes: 15 additions & 0 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ import (
"time"

retryablehttp "github.com/hashicorp/go-retryablehttp"
"go.elastic.co/ecszap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/internal/httplog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
Expand Down Expand Up @@ -160,6 +163,18 @@ func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpC
return nil, err
}

if config.Request.Tracer != nil {
w := zapcore.AddSync(config.Request.Tracer)
core := ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(),
w,
zap.DebugLevel,
)
traceLogger := zap.New(core)

netHTTPClient.Transport = httplog.NewLoggingRoundTripper(netHTTPClient.Transport, traceLogger)
}

netHTTPClient.CheckRedirect = checkRedirect(config.Request, log)

client := &retryablehttp.Client{
Expand Down
208 changes: 208 additions & 0 deletions x-pack/filebeat/input/httpjson/internal/httplog/roundtripper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// Package httplog provides http request and response transaction logging.
package httplog

import (
"bytes"
"encoding/base32"
"encoding/binary"
"errors"
"fmt"
"io"
"net/http"
"net/http/httputil"
"strconv"
"time"

"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var _ http.RoundTripper = (*LoggingRoundTripper)(nil)

// TraceIDKey is key used to add a trace.id value to the context of HTTP
// requests. The value will be logged by LoggingRoundTripper.
const TraceIDKey = contextKey("trace.id")

type contextKey string

// NewLoggingRoundTripper returns a LoggingRoundTripper that logs requests and
// responses to the provided logger.
func NewLoggingRoundTripper(next http.RoundTripper, logger *zap.Logger) *LoggingRoundTripper {
return &LoggingRoundTripper{
transport: next,
logger: logger,
txBaseID: newID(),
txIDCounter: atomic.NewUint64(0),
}
}

// LoggingRoundTripper is an http.RoundTripper that logs requests and responses.
type LoggingRoundTripper struct {
transport http.RoundTripper
logger *zap.Logger // Destination logger.
txBaseID string // Random value to make transaction IDs unique.
txIDCounter *atomic.Uint64 // Transaction ID counter that is incremented for each request.
}

// RoundTrip implements the http.RoundTripper interface, logging
// the request and response to the underlying logger.
//
// Fields logged in requests:
// url.original
// url.scheme
// url.path
// url.domain
// url.port
// url.query
// http.request
// user_agent.original
// http.request.body.content
// http.request.body.bytes
// http.request.mime_type
// event.original (the full request and body from httputil.DumpRequestOut)
//
// Fields logged in responses:
// http.response.status_code
// http.response.body.content
// http.response.body.bytes
// http.response.mime_type
// event.original (the full response and body from httputil.DumpResponse)
//
func (rt *LoggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
// Create a child logger for this request.
log := rt.logger.With(
zap.String("transaction.id", rt.nextTxID()),
)

if v := req.Context().Value(TraceIDKey); v != nil {
if traceID, ok := v.(string); ok {
log = log.With(zap.String("trace.id", traceID))
}
}

reqParts := []zapcore.Field{
zap.String("url.original", req.URL.String()),
zap.String("url.scheme", req.URL.Scheme),
zap.String("url.path", req.URL.Path),
zap.String("url.domain", req.URL.Hostname()),
zap.String("url.port", req.URL.Port()),
zap.String("url.query", req.URL.RawQuery),
zap.String("http.request.method", req.Method),
zap.String("user_agent.original", req.Header.Get("User-Agent")),
}
var (
body []byte
err error
errorsMessages []string
)
req.Body, body, err = copyBody(req.Body)
if err != nil {
errorsMessages = append(errorsMessages, fmt.Sprintf("failed to read request body: %s", err))
} else {
reqParts = append(reqParts,
zap.ByteString("http.request.body.content", body),
zap.Int("http.request.body.bytes", len(body)),
zap.String("http.request.mime_type", req.Header.Get("Content-Type")),
)
}
message, err := httputil.DumpRequestOut(req, true)
if err != nil {
errorsMessages = append(errorsMessages, fmt.Sprintf("failed to dump request: %s", err))
} else {
reqParts = append(reqParts, zap.ByteString("event.original", message))
}
switch len(errorsMessages) {
case 0:
case 1:
reqParts = append(reqParts, zap.String("error.message", errorsMessages[0]))
default:
reqParts = append(reqParts, zap.Strings("error.message", errorsMessages))
}
log.Debug("HTTP request", reqParts...)

resp, err := rt.transport.RoundTrip(req)
if err != nil {
log.Debug("HTTP response error", zap.NamedError("error.message", err))
return resp, err
}
if resp == nil {
log.Debug("HTTP response error", noResponse)
return resp, err
}
respParts := append(reqParts[:0],
zap.Int("http.response.status_code", resp.StatusCode),
)
errorsMessages = errorsMessages[:0]
resp.Body, body, err = copyBody(resp.Body)
if err != nil {
errorsMessages = append(errorsMessages, fmt.Sprintf("failed to read response body: %s", err))
} else {
respParts = append(respParts,
zap.ByteString("http.response.body.content", body),
zap.Int("http.response.body.bytes", len(body)),
zap.String("http.response.mime_type", resp.Header.Get("Content-Type")),
)
}
message, err = httputil.DumpResponse(resp, true)
if err != nil {
errorsMessages = append(errorsMessages, fmt.Sprintf("failed to dump response: %s", err))
} else {
respParts = append(respParts, zap.ByteString("event.original", message))
}
switch len(errorsMessages) {
case 0:
case 1:
respParts = append(reqParts, zap.String("error.message", errorsMessages[0]))
default:
respParts = append(reqParts, zap.Strings("error.message", errorsMessages))
}
log.Debug("HTTP response", respParts...)

return resp, err
}

// nextTxID returns the next transaction.id value. It increments the internal
// request counter.
func (rt *LoggingRoundTripper) nextTxID() string {
count := rt.txIDCounter.Inc()
return rt.txBaseID + "-" + strconv.FormatUint(count, 10)
}

var noResponse = zap.NamedError("error.message", errors.New("unexpected nil response"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you checked the output of this? I wanted to make sure that it behaves as expected because I recall that ecszap automatically takes regular zap.Error() calls and writes them to error.message and wanted to make sure this did not result in an extra message like error.message.message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two types of error messages:

{"log.level":"debug","@timestamp":"2022-07-28T21:08:54.725+0930","message":"HTTP request","transaction.id":"63GBEPIAVC2HE-1","url.original":"http://127.0.0.1:57587","url.scheme":"http","url.path":"","url.domain":"127.0.0.1","url.port":"57587","url.query":"","http.request.method":"POST","user_agent.original":"Elastic-Filebeat/8.4.0 (darwin; amd64; unknown; 0001-01-01 00:00:00 +0000 UTC)","error.message":[{"error":"failed to read request body: faux"},{"error":"failed to dump request: faux"}],"ecs.version":"1.6.0"}

and

{"log.level":"debug","@timestamp":"2022-07-28T21:12:09.042+0930","message":"HTTP response error","transaction.id":"R3VDV93NVC2HE-1","error":{"message":"unexpected nil response"},"ecs.version":"1.6.0"}

So the noResponse error looks like it is working as intended. I'll fix the other.

New version:
Single

{"log.level":"debug","@timestamp":"2022-07-28T21:24:05.124+0930","message":"HTTP request","transaction.id":"N34CJAOTVG2HE-3","url.original":"http://127.0.0.1:59402","url.scheme":"http","url.path":"","url.domain":"127.0.0.1","url.port":"59402","url.query":"","http.request.method":"GET","user_agent.original":"Elastic-Filebeat/8.4.0 (darwin; amd64; unknown; 0001-01-01 00:00:00 +0000 UTC)","event.original":"GET / HTTP/1.1\r\nHost: 127.0.0.1:59402\r\nUser-Agent: Elastic-Filebeat/8.4.0 (darwin; amd64; unknown; 0001-01-01 00:00:00 +0000 UTC)\r\nAccept: application/json\r\nAccept-Encoding: gzip\r\n\r\n","error.message":"failed to read request body: faux","ecs.version":"1.6.0"}

Double

{"log.level":"debug","@timestamp":"2022-07-28T21:24:02.121+0930","message":"HTTP request","transaction.id":"F31B1AOTVG2HE-2","url.original":"http://127.0.0.1:59398","url.scheme":"http","url.path":"","url.domain":"127.0.0.1","url.port":"59398","url.query":"","http.request.method":"GET","user_agent.original":"Elastic-Filebeat/8.4.0 (darwin; amd64; unknown; 0001-01-01 00:00:00 +0000 UTC)","error.message":["failed to read request body: faux","failed to dump request: faux"],"ecs.version":"1.6.0"}

Round trip

{"log.level":"debug","@timestamp":"2022-07-28T21:24:03.123+0930","message":"HTTP response error","transaction.id":"N34CJAOTVG2HE-2","error":{"message":"unexpected nil response"},"ecs.version":"1.6.0"}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"New version" looks good.


// newID returns an ID derived from the current time.
func newID() string {
var data [8]byte
binary.LittleEndian.PutUint64(data[:], uint64(time.Now().UnixNano()))
return base32.HexEncoding.WithPadding(base32.NoPadding).EncodeToString(data[:])
}

// copyBody is derived from drainBody in net/http/httputil/dump.go
//
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//
// copyBody reads all of b to memory and then returns a
// ReadCloser yielding the same bytes, and the bytes themselves.
//
// It returns an error if the initial slurp of all bytes fails.
func copyBody(b io.ReadCloser) (r io.ReadCloser, body []byte, err error) {
if b == nil || b == http.NoBody {
// No copying needed. Preserve the magic sentinel meaning of NoBody.
return http.NoBody, nil, nil
}
var buf bytes.Buffer
if _, err = buf.ReadFrom(b); err != nil {
return nil, buf.Bytes(), err
}
if err = b.Close(); err != nil {
return nil, buf.Bytes(), err
}
return io.NopCloser(&buf), buf.Bytes(), nil
}
4 changes: 0 additions & 4 deletions x-pack/filebeat/input/httpjson/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ func newRateLimiterFromConfig(config *rateLimitConfig, log *logp.Logger) *rateLi
func (r *rateLimiter) execute(ctx context.Context, f func() (*http.Response, error)) (*http.Response, error) {
for {
resp, err := f()
if err != nil {
return nil, err
}

if err != nil {
return nil, fmt.Errorf("failed to read http.response.body: %w", err)
}
Expand Down