Skip to content

Commit

Permalink
Merge branch '7.x' into backport_23496_7.x
Browse files Browse the repository at this point in the history
  • Loading branch information
michalpristas authored Jan 17, 2021
2 parents 24e39bf + 424b34b commit b7f427c
Show file tree
Hide file tree
Showing 65 changed files with 320 additions and 488 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `nested` subfield handling in generated Elasticsearch templates. {issue}23178[23178] {pull}23183[23183]
- Fix CPU usage metrics on VMs with dynamic CPU config {pull}23154[23154]
- Fix panic due to unhandled DeletedFinalStateUnknown in k8s OnDelete {pull}23419[23419]
- Fix error loop with runaway CPU use when the Kafka output encounters some connection errors {pull}23484[23484]


*Auditbeat*
Expand Down Expand Up @@ -256,6 +257,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix syslog header parsing in infoblox module. {issue}23272[23272] {pull}23273[23273]
- Fix CredentialsJSON unpacking for `gcp-pubsub` and `httpjson` inputs. {pull}23277[23277]
- Fix various processing errors in the Suricata module. {pull}23236[23236]
- Change the `event.created` in Netflow events to be the time the event was created by Filebeat
to be consistent with ECS. {pull}23094[23094]

*Heartbeat*

Expand Down Expand Up @@ -553,6 +556,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Heartbeat*

- Add mime type detection for http responses. {pull}22976[22976]
- Bundle synthetics deps with heartbeat docker image. {pull}23274[23274]

*Heartbeat*

Expand Down
39 changes: 37 additions & 2 deletions dev-tools/packaging/templates/docker/Dockerfile.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,15 @@ FROM {{ .buildFrom }} AS home

COPY beat {{ $beatHome }}

RUN mkdir {{ $beatHome }}/data {{ $beatHome }}/logs && \
{{- if (and (eq .BeatName "heartbeat") (not (contains .from "ubi-minimal"))) }}
RUN mkdir -p {{ $beatHome }}/.node \
{{ $beatHome }}/.npm \
{{ $beatHome }}/.cache \
{{ $beatHome }}/.config \
{{ $beatHome }}/suites
{{- end }}

RUN mkdir -p {{ $beatHome }}/data {{ $beatHome }}/logs && \
chown -R root:root {{ $beatHome }} && \
find {{ $beatHome }} -type d -exec chmod 0750 {} \; && \
find {{ $beatHome }} -type f -exec chmod 0640 {} \; && \
Expand All @@ -28,9 +36,19 @@ RUN microdnf -y --setopt=tsflags=nodocs update && \
microdnf install shadow-utils && \
microdnf clean all
{{- else }}
RUN yum -y --setopt=tsflags=nodocs update && yum clean all
RUN yum -y --setopt=tsflags=nodocs update \
{{- if (eq .BeatName "heartbeat") }}
&& yum -y install epel-release \
&& yum -y install atk cups gtk gdk xrandr pango.x86_64 libXcomposite.x86_64 libXcursor.x86_64 libXdamage.x86_64 \
libXext.x86_64 libXi.x86_64 libXtst.x86_64 cups-libs.x86_64 libXScrnSaver.x86_64 libXrandr.x86_64 GConf2.x86_64 \
alsa-lib.x86_64 atk.x86_64 gtk3.x86_64 ipa-gothic-fonts xorg-x11-fonts-100dpi xorg-x11-fonts-75dpi xorg-x11-utils \
xorg-x11-fonts-cyrillic xorg-x11-fonts-Type1 xorg-x11-fonts-misc \
{{- end }}
&& yum clean all && rm -rf /var/cache/yum
# See https://access.redhat.com/discussions/3195102 for why rm is needed
{{- end }}


LABEL \
org.label-schema.build-date="{{ date }}" \
org.label-schema.schema-version="1.0" \
Expand Down Expand Up @@ -80,6 +98,23 @@ RUN mkdir /licenses
COPY --from=home {{ $beatHome }}/LICENSE.txt /licenses
COPY --from=home {{ $beatHome }}/NOTICE.txt /licenses

{{- if (and (eq .BeatName "heartbeat") (not (contains .from "ubi-minimal"))) }}
# Setup synthetics env vars
ENV ELASTIC_SYNTHETICS_CAPABLE=true
ENV SUITES_DIR={{ $beatHome }}/suites
ENV NODE_PATH={{ $beatHome }}/.node

# Setup node
RUN cd /usr/share/heartbeat/.node \
&& mkdir node \
&& curl https://nodejs.org/dist/v12.18.4/node-v12.18.4-linux-x64.tar.xz | tar -xJ --strip 1 -C node
ENV PATH="/usr/share/heartbeat/.node/node/bin:$PATH"
# Install the latest version of @elastic/synthetics forcefully ignoring the previously
# cached node_modules, hearbeat then calls the global executable to run test suites
RUN npm i -g -f @elastic/synthetics
{{- end }}


{{- if ne .user "root" }}
RUN groupadd --gid 1000 {{ .BeatName }}
RUN useradd -M --uid 1000 --gid 1000 --groups 0 --home {{ $beatHome }} {{ .user }}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/dop251/goja v0.0.0-00010101000000-000000000000
github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6
github.com/dustin/go-humanize v1.0.0
github.com/eapache/go-resiliency v1.2.0
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2
github.com/elastic/ecs v1.6.0
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a
Expand Down
96 changes: 95 additions & 1 deletion libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Shopify/sarama"
"github.com/eapache/go-resiliency/breaker"

"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/common/transport"
Expand All @@ -47,6 +49,7 @@ type client struct {
codec codec.Codec
config sarama.Config
mux sync.Mutex
done chan struct{}

producer sarama.AsyncProducer

Expand Down Expand Up @@ -85,6 +88,7 @@ func newKafkaClient(
index: strings.ToLower(index),
codec: writer,
config: *cfg,
done: make(chan struct{}),
}
return c, nil
}
Expand Down Expand Up @@ -121,6 +125,7 @@ func (c *client) Close() error {
return nil
}

close(c.done)
c.producer.AsyncClose()
c.wg.Wait()
c.producer = nil
Expand Down Expand Up @@ -237,12 +242,92 @@ func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) {
}

func (c *client) errorWorker(ch <-chan *sarama.ProducerError) {
breakerOpen := false
defer c.wg.Done()
defer c.log.Debug("Stop kafka error handler")

for errMsg := range ch {
msg := errMsg.Msg.Metadata.(*message)
msg.ref.fail(msg, errMsg.Err)

if errMsg.Err == breaker.ErrBreakerOpen {
// ErrBreakerOpen is a very special case in Sarama. It happens only when
// there have been repeated critical (broker / topic-level) errors, and it
// puts Sarama into a state where it immediately rejects all input
// for 10 seconds, ignoring retry / backoff settings.
// With this output's current design (in which Publish passes through to
// Sarama's input channel with no further synchronization), retrying
// these failed values causes an infinite retry loop that degrades
// the entire system.
// "Nice" approaches and why we haven't used them:
// - Use exposed API to navigate this state and its effect on retries.
// * Unfortunately, Sarama's circuit breaker and its errors are
// hard-coded and undocumented. We'd like to address this in the
// future.
// - If a batch fails with a circuit breaker error, delay before
// retrying it.
// * This would fix the most urgent performance issues, but requires
// extra bookkeeping because the Kafka output handles each batch
// independently. It results in potentially many batches / 10s of
// thousands of events being loaded and attempted, even though we
// know there's a fatal error early in the first batch. It also
// makes it hard to know when each batch should be retried.
// - In the Kafka Publish method, add a blocking first-pass intake step
// that can gate on error conditions, rather than handing off data
// to Sarama immediately.
// * This would fix the issue but would require a lot of work and
// testing, and we need a fix for the release now. It's also a
// fairly elaborate workaround for something that might be
// easier to fix in the library itself.
//
// Instead, we have applied the following fix, which is not very "nice"
// but satisfies all other important constraints:
// - When we receive a circuit breaker error, sleep for 10 seconds
// (Sarama's hard-coded timeout) on the _error worker thread_.
//
// This works because connection-level errors that can trigger the
// circuit breaker are on the critical path for input processing, and
// thus blocking on the error channel applies back-pressure to the
// input channel. This means that if there are any more errors while the
// error worker is asleep, any call to Publish will block until we
// start reading again.
//
// Reasons this solution is preferred:
// - It responds immediately to Sarama's global error state, rather than
// trying to detect it independently in each batch or adding more
// cumbersome synchronization to the output
// - It gives the minimal delay that is consistent with Sarama's
// internal behavior
// - It requires only a few lines of code and no design changes
//
// That said, this is still relying on undocumented library internals
// for correct behavior, which isn't ideal, but the error itself is an
// undocumented library internal, so this is de facto necessary for now.
// We'd like to have a more official / permanent fix merged into Sarama
// itself in the future.

// The "breakerOpen" flag keeps us from sleeping the first time we see
// a circuit breaker error, because it might be an old error still
// sitting in the channel from 10 seconds ago. So we only end up
// sleeping every _other_ reported breaker error.
if breakerOpen {
// Immediately log the error that presumably caused this state,
// since the error reporting on this batch will be delayed.
if msg.ref.err != nil {
c.log.Errorf("Kafka (topic=%v): %v", msg.topic, msg.ref.err)
}
select {
case <-time.After(10 * time.Second):
// Sarama's circuit breaker is hard-coded to reject all inputs
// for 10sec.
case <-msg.ref.client.done:
// Allow early bailout if the output itself is closing.
}
breakerOpen = false
} else {
breakerOpen = true
}
}
}
}

Expand All @@ -261,9 +346,18 @@ func (r *msgRef) fail(msg *message, err error) {
msg.topic,
len(msg.key)+len(msg.value))

case breaker.ErrBreakerOpen:
// Add this message to the failed list, but don't overwrite r.err since
// all the breaker error means is "there were a lot of other errors".
r.failed = append(r.failed, msg.data)

default:
r.failed = append(r.failed, msg.data)
r.err = err
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}
}
r.dec()
}
Expand Down
12 changes: 11 additions & 1 deletion metricbeat/Jenkinsfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ stages:
mage: "mage build unitTest"
platforms: ## override default labels in this specific stage.
- "windows-2019"
#- "windows-2008-r2" https://github.com/elastic/beats/issues/19800
#- "windows-7-32-bit" https://github.com/elastic/beats/issues/19835
when: ## Override the top-level when.
not_changeset_full_match: "^x-pack/.*" ## Disable the stage if ONLY changes for the x-pack
Expand Down Expand Up @@ -112,6 +111,17 @@ stages:
- "windows-8"
branches: true ## for all the branches
tags: true ## for all the tags
windows-2008:
mage: "mage build unitTest"
platforms: ## override default labels in this specific stage.
- "windows-2008-r2"
when: ## Override the top-level when.
comments:
- "/test metricbeat for windows-2008"
labels:
- "windows-2008"
branches: true ## for all the branches
tags: true ## for all the tags
windows-7:
mage: "mage build unitTest"
platforms: ## override default labels in this specific stage.
Expand Down
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
- Fix shell wrapper for deb/rpm packaging {pull}23038[23038]
- Fixed parsing of npipe URI {pull}22978[22978]
- Remove artifacts on transient download errors {pull}23235[23235]
- Support for linux/arm64 {pull}23479[23479]
- Skip top level files when unziping archive during upgrade {pull}23456[23456]
- Do not take ownership of Endpoint log path {pull}23444[23444]
- Fixed fetching DBus service PID {pull}23496[23496]
- Fix issue of missing log messages from filebeat monitor {pull}23514[23514]

==== New features

Expand Down
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/global_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package application

import (
"runtime"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
Expand All @@ -30,5 +32,7 @@ func agentGlobalConfig() map[string]interface{} {
"home": paths.Home(),
"logs": paths.Logs(),
},
"runtime.os": runtime.GOOS,
"runtime.arch": runtime.GOARCH,
}
}
7 changes: 6 additions & 1 deletion x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
},
"paths": []string{
filepath.Join(paths.Home(), "logs", "elastic-agent-json.log"),
filepath.Join(paths.Home(), "logs", "elastic-agent-json.log*"),
filepath.Join(paths.Home(), "logs", "elastic-agent-watcher-json.log"),
filepath.Join(paths.Home(), "logs", "elastic-agent-watcher-json.log*"),
},
"index": "logs-elastic_agent-default",
"processors": []map[string]interface{}{
Expand Down Expand Up @@ -531,7 +533,10 @@ func (o *Operator) getLogFilePaths() map[string][]string {
for _, a := range o.apps {
logPath := a.Monitor().LogPath(a.Spec(), o.pipelineID)
if logPath != "" {
paths[strings.ReplaceAll(a.Name(), "-", "_")] = append(paths[a.Name()], logPath)
paths[strings.ReplaceAll(a.Name(), "-", "_")] = []string{
logPath,
fmt.Sprintf("%s*", logPath),
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions x-pack/elastic-agent/pkg/agent/program/program.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ func detectPrograms(agentInfo transpiler.AgentInfo, singleConfig *transpiler.AST
programs := make([]Program, 0)
for _, spec := range Supported {
specificAST := singleConfig.Clone()
if len(spec.Constraints) > 0 {
constraints, err := eql.New(spec.Constraints)
if err != nil {
return nil, err
}
ok, err := constraints.Eval(specificAST)
if err != nil {
return nil, err
}

if !ok {
continue
}
}

err := spec.Rules.Apply(agentInfo, specificAST)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions x-pack/elastic-agent/pkg/agent/program/program_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ func TestConfiguration(t *testing.T) {
"endpoint_unknown_output": {
expected: 0,
},
"endpoint_arm": {
expected: 0,
},
}

for name, test := range testcases {
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/program/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Spec struct {
PostInstallSteps *transpiler.StepList `yaml:"post_install"`
PreUninstallSteps *transpiler.StepList `yaml:"pre_uninstall"`
When string `yaml:"when"`
Constraints string `yaml:"constraints"`
}

// ReadSpecs reads all the specs that match the provided globbing path.
Expand Down
4 changes: 3 additions & 1 deletion x-pack/elastic-agent/pkg/agent/program/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func TestSerialization(t *testing.T) {
PreUninstallSteps: transpiler.NewStepList(
transpiler.ExecFile(30, "app", "uninstall", "--force"),
),
When: "1 == 1",
When: "1 == 1",
Constraints: "2 == 2",
}
yml := `name: hello
cmd: hellocmd
Expand Down Expand Up @@ -118,6 +119,7 @@ pre_uninstall:
- --force
timeout: 30
when: 1 == 1
constraints: 2 == 2
`
t.Run("serialization", func(t *testing.T) {
b, err := yaml.Marshal(spec)
Expand Down
Loading

0 comments on commit b7f427c

Please sign in to comment.