Skip to content

Commit

Permalink
Merge branch 'master' of github.com:elastic/beats into bug/yuri-reported
Browse files Browse the repository at this point in the history
  • Loading branch information
michalpristas committed Jun 29, 2021
2 parents 81f09b4 + 67cf2c6 commit 33b3a90
Show file tree
Hide file tree
Showing 38 changed files with 534 additions and 99 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix ILM alias creation when write alias exists and initial index does not exist {pull}26143[26143]
- Omit full index template from errors that occur while loading the template. {pull}25743[25743]
- In the script processor, the `decode_xml` and `decode_xml_wineventlog` processors are now available as `DecodeXML` and `DecodeXMLWineventlog` respectively.
- Fix encoding errors when using the disk queue on nested data with multi-byte characters {pull}26484[26484]

*Auditbeat*

Expand Down Expand Up @@ -818,6 +819,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update PanOS module to parse HIP Match logs. {issue}24350[24350] {pull}25686[25686]
- Support MongoDB 4.4 in filebeat's MongoDB module. {issue}20501[20501] {pull}24774[24774]
- Enhance GCP module to populate orchestrator.* fields for GKE / K8S logs {pull}25368[25368]
- Add log_group_name_prefix config into aws-cloudwatch input. {pull}26187[26187]
- Move Filebeat azure module to GA. {pull}26114[26114] {pull}26168[26168]
- http_endpoint: Support multiple documents in a single request by POSTing an array or NDJSON format. {pull}25764[25764]
- Make `filestream` input GA. {pull}26127[26127]
Expand All @@ -842,6 +844,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Journalbeat*

- Suppress too many bad message error logs when reading from corrupted journal for 5 seconds. {pull}26224[26224]

*Metricbeat*

Expand Down
1 change: 0 additions & 1 deletion filebeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:beat_default_index_prefix: {beatname_lc}
:beat_kib_app: {kib} Logs
:has_ml_jobs: yes
:has_central_config:
:has_solutions:
:ignores_max_retries:
:has_docker_label_ex:
Expand Down
31 changes: 30 additions & 1 deletion heartbeat/docs/monitors/monitor-browser.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,33 @@ Set this option to `true` to enable the normally disabled chromium sandbox. Defa
[[monitor-browser-synthetics-args]]
==== `synthetics_args`

Extra arguments to pass to the synthetics agent package. Takes a list of strings.
Extra arguments to pass to the synthetics agent package. Takes a list of
strings.

[float]
[[monitor-browser-screenshots]]
==== `screenshots`

Set this option to manage the screenshots captured by the synthetics agent.

Under `screenshots`, specify one of these options:

*`on`*:: capture screenshots for all steps in a journey (default)
*`off`*:: do not capture any screenshots
*`only-on-failure`*:: capture screenshots for all steps when a journey fails
(any failing step marks the whole journey as failed)

Example configuration:

[source,yaml]
-------------------------------------------------------------------------------
- type: browser
id: local-journeys
name: Local journeys
schedule: '@every 1m'
screenshots: "on"
source:
local:
path: "/path/to/synthetics/journeys"
-------------------------------------------------------------------------------

34 changes: 34 additions & 0 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
package input

import (
"context"
"fmt"
"strings"
"sync"
"syscall"
"time"

"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
"github.com/elastic/go-concert/timed"

"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"

"github.com/elastic/beats/v7/journalbeat/checkpoint"
Expand Down Expand Up @@ -158,6 +164,10 @@ func (i *Input) publishAll() {
go func() {
defer wg.Done()

suppressed := atomic.NewBool(false)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for {
select {
case <-i.done:
Expand All @@ -168,6 +178,10 @@ func (i *Input) publishAll() {
event, err := r.Next()
if event == nil {
if err != nil {
if i.isErrSuppressed(ctx, err, suppressed) {
i.logger.Debugf("Error message suppressed: EBADMSG")
continue
}
i.logger.Errorf("Error while reading event: %v", err)
}
continue
Expand All @@ -191,6 +205,26 @@ func (i *Input) publishAll() {
}
}

// isErrSuppressed checks if the error is due to a corrupt journal. If yes, only the first error message
// is displayed and then it is suppressed for 5 seconds.
func (i *Input) isErrSuppressed(ctx context.Context, err error, suppressed *atomic.Bool) bool {
if strings.Contains(err.Error(), syscall.EBADMSG.Error()) {
if suppressed.Load() {
return true
}

suppressed.Store(true)
go func(ctx context.Context, suppressed *atomic.Bool) {
if err := timed.Wait(ctx, 5*time.Second); err == nil {
suppressed.Store(false)
}

}(ctx, suppressed)
}

return false
}

// Stop stops all readers of the input.
func (i *Input) Stop() {
for _, r := range i.readers {
Expand Down
57 changes: 57 additions & 0 deletions libbeat/common/transport/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,63 @@ func TestTLSDialer(
}), nil
}

type DialerH2 interface {
Dial(network, address string, cfg *tls.Config) (net.Conn, error)
}

type DialerFuncH2 func(network, address string, cfg *tls.Config) (net.Conn, error)

func (d DialerFuncH2) Dial(network, address string, cfg *tls.Config) (net.Conn, error) {
return d(network, address, cfg)
}

func TLSDialerH2(forward Dialer, config *tlscommon.TLSConfig, timeout time.Duration) (DialerH2, error) {
return TestTLSDialerH2(testing.NullDriver, forward, config, timeout)
}

func TestTLSDialerH2(
d testing.Driver,
forward Dialer,
config *tlscommon.TLSConfig,
timeout time.Duration,
) (DialerH2, error) {
var lastTLSConfig *tls.Config
var lastNetwork string
var lastAddress string
var m sync.Mutex

return DialerFuncH2(func(network, address string, cfg *tls.Config) (net.Conn, error) {
switch network {
case "tcp", "tcp4", "tcp6":
default:
return nil, fmt.Errorf("unsupported network type %v", network)
}

host, _, err := net.SplitHostPort(address)
if err != nil {
return nil, err
}

var tlsConfig *tls.Config
m.Lock()
if network == lastNetwork && address == lastAddress {
tlsConfig = lastTLSConfig
}
if tlsConfig == nil {
tlsConfig = config.BuildModuleClientConfig(host)
lastNetwork = network
lastAddress = address
lastTLSConfig = tlsConfig
}
m.Unlock()

// NextProtos must be set from the passed h2 connection or it will fail
tlsConfig.NextProtos = cfg.NextProtos

return tlsDialWith(d, forward, network, address, timeout, tlsConfig, config)
}), nil
}

func tlsDialWith(
d testing.Driver,
dialer Dialer,
Expand Down
14 changes: 0 additions & 14 deletions libbeat/docs/security/users.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,6 @@ ifdef::apm-server[]
|Set up ingest pipelines
endif::apm-server[]

ifdef::has_central_config[]
|`beats_admin`
|Enroll and manage configurations in Beats central management
endif::has_central_config[]
|====
+
Omit any roles that aren't relevant in your environment.
Expand Down Expand Up @@ -307,10 +303,6 @@ endif::apm-server[]
{kib} users typically need to view dashboards and visualizations that contain
{beatname_uc} data. These users might also need to create and edit dashboards
and visualizations.
ifdef::has_central_config[]
If you're using Beats central management, some of these users might need to
create and manage configurations.
endif::has_central_config[]

To grant users the required privileges:

Expand Down Expand Up @@ -347,12 +339,6 @@ users who need to read {beatname_uc} data:
| `monitoring_user`
| Allow users to monitor the health of {beatname_uc} itself. Only assign this role to users who manage {beatname_uc}.

ifdef::has_central_config[]
|`beats_admin`
|Create and manage configurations in Beats central management. Only assign this
role to users who need to use Beats central management.
+
endif::[]
|====
endif::apm-server[]

Expand Down
2 changes: 1 addition & 1 deletion libbeat/docs/shared-deduplication.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ input {
beats {
port => 5044
}
}}
}
output {
if [@metadata][_id] {
Expand Down
5 changes: 5 additions & 0 deletions libbeat/docs/upgrading.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ we've provided a migration tool to help you migrate your configurations from
version 6.6 to 6.7 or later. For more information, see the
https://www.elastic.co/blog/beats-6-7-0-released[Beats 6.7.0 release blog].

NOTE: {beats} central management has been removed starting in version 7.14.0.
Looking for a replacement? Refer to the
{fleet-guide}/index.html[Fleet User Guide] to learn how to deploy and centrally
manage a single {agent} to monitor and secure each host.

==== Upgrade {beats} binaries to 7.0

Before upgrading:
Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/queue/diskqueue/core_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestHandleWriterLoopResponse(t *testing.T) {

// Write to one segment (no segments should be moved to reading list)
dq.handleWriterLoopResponse(writerLoopResponse{
segments: []writerLoopResponseSegment{
segments: []writerLoopSegmentResponse{
{bytesWritten: 100},
},
})
Expand All @@ -250,7 +250,7 @@ func TestHandleWriterLoopResponse(t *testing.T) {

// Write to two segments (the first one should be moved to reading list)
dq.handleWriterLoopResponse(writerLoopResponse{
segments: []writerLoopResponseSegment{
segments: []writerLoopSegmentResponse{
{bytesWritten: 100},
{bytesWritten: 100},
},
Expand All @@ -270,7 +270,7 @@ func TestHandleWriterLoopResponse(t *testing.T) {

// Write to three segments (the first two should be moved to reading list)
dq.handleWriterLoopResponse(writerLoopResponse{
segments: []writerLoopResponseSegment{
segments: []writerLoopSegmentResponse{
{bytesWritten: 100},
{bytesWritten: 100},
{bytesWritten: 500},
Expand Down
26 changes: 13 additions & 13 deletions libbeat/publisher/queue/diskqueue/reader_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package diskqueue
import (
"encoding/binary"
"fmt"
"io"
"os"
)

Expand Down Expand Up @@ -100,13 +101,12 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon

// Open the file and seek to the starting position.
handle, err := request.segment.getReader(rl.settings)
rl.decoder.useJSON = request.segment.shouldUseJSON()
if err != nil {
return readerLoopResponse{err: err}
}
defer handle.Close()
// getReader positions us at the start of the data region, so we use
// a relative seek to advance to the request position.
_, err = handle.Seek(int64(request.startPosition), os.SEEK_CUR)
_, err = handle.Seek(int64(request.startPosition), io.SeekStart)
if err != nil {
return readerLoopResponse{err: err}
}
Expand Down Expand Up @@ -179,58 +179,58 @@ func (rl *readerLoop) nextFrame(
// Ensure we are allowed to read the frame header.
if maxLength < frameHeaderSize {
return nil, fmt.Errorf(
"Can't read next frame: remaining length %d is too low", maxLength)
"can't read next frame: remaining length %d is too low", maxLength)
}
// Wrap the handle to retry non-fatal errors and always return the full
// requested data length if possible.
reader := autoRetryReader{handle}
var frameLength uint32
err := binary.Read(reader, binary.LittleEndian, &frameLength)
if err != nil {
return nil, fmt.Errorf("Couldn't read data frame header: %w", err)
return nil, fmt.Errorf("couldn't read data frame header: %w", err)
}

// If the frame extends past the area we were told to read, return an error.
// This should never happen unless the segment file is corrupted.
if maxLength < uint64(frameLength) {
return nil, fmt.Errorf(
"Can't read next frame: frame size is %d but remaining data is only %d",
"can't read next frame: frame size is %d but remaining data is only %d",
frameLength, maxLength)
}
if frameLength <= frameMetadataSize {
// Valid enqueued data must have positive length
return nil, fmt.Errorf(
"Data frame with no data (length %d)", frameLength)
"data frame with no data (length %d)", frameLength)
}

// Read the actual frame data
dataLength := frameLength - frameMetadataSize
bytes := rl.decoder.Buffer(int(dataLength))
_, err = reader.Read(bytes)
if err != nil {
return nil, fmt.Errorf("Couldn't read data frame content: %w", err)
return nil, fmt.Errorf("couldn't read data frame content: %w", err)
}

// Read the footer (checksum + duplicate length)
var checksum uint32
err = binary.Read(reader, binary.LittleEndian, &checksum)
if err != nil {
return nil, fmt.Errorf("Couldn't read data frame checksum: %w", err)
return nil, fmt.Errorf("couldn't read data frame checksum: %w", err)
}
expected := computeChecksum(bytes)
if checksum != expected {
return nil, fmt.Errorf(
"Data frame checksum mismatch (%x != %x)", checksum, expected)
"data frame checksum mismatch (%x != %x)", checksum, expected)
}

var duplicateLength uint32
err = binary.Read(reader, binary.LittleEndian, &duplicateLength)
if err != nil {
return nil, fmt.Errorf("Couldn't read data frame footer: %w", err)
return nil, fmt.Errorf("couldn't read data frame footer: %w", err)
}
if duplicateLength != frameLength {
return nil, fmt.Errorf(
"Inconsistent data frame length (%d vs %d)",
"inconsistent data frame length (%d vs %d)",
frameLength, duplicateLength)
}

Expand All @@ -242,7 +242,7 @@ func (rl *readerLoop) nextFrame(
// TODO: Rather than pass this error back to the read request, which
// discards the rest of the segment, we should just log the error and
// advance to the next frame, which is likely still valid.
return nil, fmt.Errorf("Couldn't decode data frame: %w", err)
return nil, fmt.Errorf("couldn't decode data frame: %w", err)
}

frame := &readFrame{
Expand Down
Loading

0 comments on commit 33b3a90

Please sign in to comment.