From 3b9cf8b14523dbd0b4f44fb9024a08c49035d804 Mon Sep 17 00:00:00 2001 From: Mariana Dima Date: Fri, 15 Jan 2021 10:30:47 +0000 Subject: [PATCH 1/5] Fix flaky tests for Metricbeat on Windows 2008 R2 (#23490) (#23508) * enable 2008 * [CI] Optional stage instead the mandatory one windows stage is mandatory, but optional stages are not mandatory for PRs Co-authored-by: Victor Martinez (cherry picked from commit 9283a6fde602ca0f1a948af11ce713f458897c92) --- metricbeat/Jenkinsfile.yml | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/metricbeat/Jenkinsfile.yml b/metricbeat/Jenkinsfile.yml index 7708cdda4c3..dca45cdae66 100644 --- a/metricbeat/Jenkinsfile.yml +++ b/metricbeat/Jenkinsfile.yml @@ -62,7 +62,6 @@ stages: mage: "mage build unitTest" platforms: ## override default labels in this specific stage. - "windows-2019" - #- "windows-2008-r2" https://github.com/elastic/beats/issues/19800 #- "windows-7-32-bit" https://github.com/elastic/beats/issues/19835 when: ## Override the top-level when. not_changeset_full_match: "^x-pack/.*" ## Disable the stage if ONLY changes for the x-pack @@ -112,6 +111,17 @@ stages: - "windows-8" branches: true ## for all the branches tags: true ## for all the tags + windows-2008: + mage: "mage build unitTest" + platforms: ## override default labels in this specific stage. + - "windows-2008-r2" + when: ## Override the top-level when. + comments: + - "/test metricbeat for windows-2008" + labels: + - "windows-2008" + branches: true ## for all the branches + tags: true ## for all the tags windows-7: mage: "mage build unitTest" platforms: ## override default labels in this specific stage. From a4bfc8dc51679a313c984b1192b9c527ef4fd053 Mon Sep 17 00:00:00 2001 From: Brandon Morelli Date: Fri, 15 Jan 2021 03:29:37 -0800 Subject: [PATCH 2/5] docs: Prepare Changelog for 7.10.2 (#23416) (#23519) * docs: Close changelog for 7.10.2 * Apply suggestions from code review Co-authored-by: Brandon Morelli * Remove empty sections Co-authored-by: Andres Rodriguez Co-authored-by: Brandon Morelli Co-authored-by: Andres Rodriguez Co-authored-by: Elastic Machine Co-authored-by: Andres Rodriguez Co-authored-by: Andres Rodriguez --- CHANGELOG.asciidoc | 16 ++++++++++++++++ libbeat/docs/release.asciidoc | 1 + 2 files changed, 17 insertions(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index cb98ddbb781..9da8d021506 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -3,6 +3,22 @@ :issue: https://github.com/elastic/beats/issues/ :pull: https://github.com/elastic/beats/pull/ +[[release-notes-7.10.2]] +=== Beats version 7.10.2 +https://github.com/elastic/beats/compare/v7.10.1\...v7.10.2[View commits] + +==== Bugfixes + +*Filebeat* + +- Add JSON body check for SQS message. {pull}21727[21727] +- Fix cisco umbrella module config by adding input variable. {pull}22892[22892] +- Fix CredentialsJSON unpacking for `gcp-pubsub` and `httpjson` inputs. {pull}23277[23277] + +*Metricbeat* + +- Change `vsphere.datastore.capacity.used.pct` value to betweeen 0 and 1. {pull}23148[23148] + [[release-notes-7.10.1]] === Beats version 7.10.1 https://github.com/elastic/beats/compare/v7.10.0\...v7.10.1[View commits] diff --git a/libbeat/docs/release.asciidoc b/libbeat/docs/release.asciidoc index ba3fee9eebe..b870aabf077 100644 --- a/libbeat/docs/release.asciidoc +++ b/libbeat/docs/release.asciidoc @@ -8,6 +8,7 @@ This section summarizes the changes in each release. Also read <> for more detail about changes that affect upgrade. +* <> * <> * <> * <> From 699db1eaeac367c749b74fe30fc8359b466ffaf9 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 15 Jan 2021 14:42:19 +0100 Subject: [PATCH 3/5] Cherry-pick #22076 to 7.x: Cherry-pick to 7.10: packaging backports (#23142) (#23396) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backports the following commits to 7.x: * feat: add a new step to run the e2e tests for certain parts of Beats (#21100) * [E2E Tests] fix: set versions ony for PRs (#21608) * [CI: Packaging] fix: push ubi8 images too (#21621) * fix: remove extra curly brace in script (#21692) * fix: update fleet test suite name (#21738) * chore: create CI artifacts for DEV usage (#21645) * chore: simplify triggering the E2E tests for Beats (#21790) * chore: delegate variant pushes to the right method (#21861) * feat: package aliases for snapshots (#21960) * fix: use proper param name for e2e tests (#22836) (cherry picked from commit 1006bd9746ef67b28a3f1c8d0a19a437b76fb5e3) Co-authored-by: Victor Martinez Co-authored-by: Manuel de la Peña --- .ci/packaging.groovy | 179 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 144 insertions(+), 35 deletions(-) diff --git a/.ci/packaging.groovy b/.ci/packaging.groovy index 5fc40df8b4b..5b265cc9410 100644 --- a/.ci/packaging.groovy +++ b/.ci/packaging.groovy @@ -2,15 +2,24 @@ @Library('apm@current') _ +import groovy.transform.Field + +/** + This is required to store the test suites we will use to trigger the E2E tests. +*/ +@Field def e2eTestSuites = [] + pipeline { agent none environment { - BASE_DIR = 'src/github.com/elastic/beats' + REPO = 'beats' + BASE_DIR = "src/github.com/elastic/${env.REPO}" JOB_GCS_BUCKET = 'beats-ci-artifacts' JOB_GCS_BUCKET_STASH = 'beats-ci-temp' JOB_GCS_CREDENTIALS = 'beats-ci-gcs-plugin' DOCKERELASTIC_SECRET = 'secret/observability-team/ci/docker-registry/prod' DOCKER_REGISTRY = 'docker.elastic.co' + GITHUB_CHECK_E2E_TESTS_NAME = 'E2E Tests' SNAPSHOT = "true" PIPELINE_LOG_LEVEL = "INFO" } @@ -34,7 +43,7 @@ pipeline { } stages { stage('Filter build') { - agent { label 'ubuntu && immutable' } + agent { label 'ubuntu-18 && immutable' } when { beforeAgent true anyOf { @@ -103,14 +112,14 @@ pipeline { 'x-pack/heartbeat', // 'x-pack/journalbeat', 'x-pack/metricbeat', - 'x-pack/packetbeat', + // 'x-pack/packetbeat', 'x-pack/winlogbeat' ) } } stages { stage('Package Linux'){ - agent { label 'ubuntu && immutable' } + agent { label 'ubuntu-18 && immutable' } options { skipDefaultCheckout() } when { beforeAgent true @@ -142,6 +151,7 @@ pipeline { release() pushCIDockerImages() } + prepareE2ETestForPackage("${BEATS_FOLDER}") } } stage('Package Mac OS'){ @@ -172,6 +182,13 @@ pipeline { } } } + stage('Run E2E Tests for Packages'){ + agent { label 'ubuntu-18 && immutable' } + options { skipDefaultCheckout() } + steps { + runE2ETests() + } + } } post { success { @@ -190,38 +207,32 @@ pipeline { def pushCIDockerImages(){ catchError(buildResult: 'UNSTABLE', message: 'Unable to push Docker images', stageResult: 'FAILURE') { - if ("${env.BEATS_FOLDER}" == "auditbeat"){ - tagAndPush('auditbeat-oss') - } else if ("${env.BEATS_FOLDER}" == "filebeat") { - tagAndPush('filebeat-oss') - } else if ("${env.BEATS_FOLDER}" == "heartbeat"){ - tagAndPush('heartbeat-oss') + if (env?.BEATS_FOLDER?.endsWith('auditbeat')) { + tagAndPush('auditbeat') + } else if (env?.BEATS_FOLDER?.endsWith('filebeat')) { + tagAndPush('filebeat') + } else if (env?.BEATS_FOLDER?.endsWith('heartbeat')) { + tagAndPush('heartbeat') } else if ("${env.BEATS_FOLDER}" == "journalbeat"){ tagAndPush('journalbeat') - tagAndPush('journalbeat-oss') - } else if ("${env.BEATS_FOLDER}" == "metricbeat"){ - tagAndPush('metricbeat-oss') + } else if (env?.BEATS_FOLDER?.endsWith('metricbeat')) { + tagAndPush('metricbeat') } else if ("${env.BEATS_FOLDER}" == "packetbeat"){ tagAndPush('packetbeat') - tagAndPush('packetbeat-oss') - } else if ("${env.BEATS_FOLDER}" == "x-pack/auditbeat"){ - tagAndPush('auditbeat') } else if ("${env.BEATS_FOLDER}" == "x-pack/elastic-agent") { tagAndPush('elastic-agent') - } else if ("${env.BEATS_FOLDER}" == "x-pack/filebeat"){ - tagAndPush('filebeat') - } else if ("${env.BEATS_FOLDER}" == "x-pack/heartbeat"){ - tagAndPush('heartbeat') - } else if ("${env.BEATS_FOLDER}" == "x-pack/metricbeat"){ - tagAndPush('metricbeat') } } } -def tagAndPush(name){ +def tagAndPush(beatName){ def libbetaVer = env.BEAT_VERSION + def aliasVersion = "" if("${env.SNAPSHOT}" == "true"){ + aliasVersion = libbetaVer.substring(0, libbetaVer.lastIndexOf(".")) // remove third number in version + libbetaVer += "-SNAPSHOT" + aliasVersion += "-SNAPSHOT" } def tagName = "${libbetaVer}" @@ -229,29 +240,127 @@ def tagAndPush(name){ tagName = "pr-${env.CHANGE_ID}" } - def oldName = "${DOCKER_REGISTRY}/beats/${name}:${libbetaVer}" - def newName = "${DOCKER_REGISTRY}/observability-ci/${name}:${tagName}" - def commitName = "${DOCKER_REGISTRY}/observability-ci/${name}:${env.GIT_BASE_COMMIT}" dockerLogin(secret: "${DOCKERELASTIC_SECRET}", registry: "${DOCKER_REGISTRY}") - retry(3){ - sh(label:'Change tag and push', script: """ - docker tag ${oldName} ${newName} - docker push ${newName} - docker tag ${oldName} ${commitName} - docker push ${commitName} - """) + + // supported image flavours + def variants = ["", "-oss", "-ubi8"] + variants.each { variant -> + doTagAndPush(beatName, variant, libbetaVer, tagName) + doTagAndPush(beatName, variant, libbetaVer, "${env.GIT_BASE_COMMIT}") + + if (!isPR() && aliasVersion != "") { + doTagAndPush(beatName, variant, libbetaVer, aliasVersion) + } + } +} + +/** +* @param beatName name of the Beat +* @param variant name of the variant used to build the docker image name +* @param sourceTag tag to be used as source for the docker tag command, usually under the 'beats' namespace +* @param targetTag tag to be used as target for the docker tag command, usually under the 'observability-ci' namespace +*/ +def doTagAndPush(beatName, variant, sourceTag, targetTag) { + def sourceName = "${DOCKER_REGISTRY}/beats/${beatName}${variant}:${sourceTag}" + def targetName = "${DOCKER_REGISTRY}/observability-ci/${beatName}${variant}:${targetTag}" + + def iterations = 0 + retryWithSleep(retries: 3, seconds: 5, backoff: true) { + iterations++ + def status = sh(label: "Change tag and push ${targetName}", script: """ + docker tag ${sourceName} ${targetName} + docker push ${targetName} + """, returnStatus: true) + + if ( status > 0 && iterations < 3) { + error("tag and push failed for ${beatName}, retry") + } else if ( status > 0 ) { + log(level: 'WARN', text: "${beatName} doesn't have ${variant} docker images. See https://github.com/elastic/beats/pull/21621") + } + } +} + +def prepareE2ETestForPackage(String beat){ + if ("${beat}" == "filebeat" || "${beat}" == "x-pack/filebeat") { + e2eTestSuites.push('fleet') + e2eTestSuites.push('helm') + } else if ("${beat}" == "metricbeat" || "${beat}" == "x-pack/metricbeat") { + e2eTestSuites.push('ALL') + echo("${beat} adds all test suites to the E2E tests job.") + } else if ("${beat}" == "x-pack/elastic-agent") { + e2eTestSuites.push('fleet') + } else { + echo("${beat} does not add any test suite to the E2E tests job.") + return } } def release(){ withBeatsEnv(){ - dir("${env.BEATS_FOLDER}") { - sh(label: "Release ${env.BEATS_FOLDER} ${env.PLATFORMS}", script: 'mage package') + withEnv([ + "DEV=true" + ]) { + dir("${env.BEATS_FOLDER}") { + sh(label: "Release ${env.BEATS_FOLDER} ${env.PLATFORMS}", script: 'mage package') + } } publishPackages("${env.BEATS_FOLDER}") } } +def runE2ETests(){ + if (e2eTestSuites.size() == 0) { + echo("Not triggering E2E tests for PR-${env.CHANGE_ID} because the changes does not affect the E2E.") + return + } + + def suites = '' // empty value represents all suites in the E2E tests + + catchError(buildResult: 'UNSTABLE', message: 'Unable to run e2e tests', stageResult: 'FAILURE') { + def suitesSet = e2eTestSuites.toSet() + + if (!suitesSet.contains('ALL')) { + suitesSet.each { suite -> + suites += "${suite}," + }; + } + + triggerE2ETests(suites) + } +} + +def triggerE2ETests(String suite) { + echo("Triggering E2E tests for PR-${env.CHANGE_ID}. Test suites: ${suite}.") + + def branchName = isPR() ? "${env.CHANGE_TARGET}" : "${env.JOB_BASE_NAME}.x" + def e2eTestsPipeline = "e2e-tests/e2e-testing-mbp/${branchName}" + + def parameters = [ + booleanParam(name: 'forceSkipGitChecks', value: true), + booleanParam(name: 'forceSkipPresubmit', value: true), + booleanParam(name: 'notifyOnGreenBuilds', value: !isPR()), + string(name: 'runTestsSuites', value: suite), + string(name: 'GITHUB_CHECK_NAME', value: env.GITHUB_CHECK_E2E_TESTS_NAME), + string(name: 'GITHUB_CHECK_REPO', value: env.REPO), + string(name: 'GITHUB_CHECK_SHA1', value: env.GIT_BASE_COMMIT), + ] + if (isPR()) { + def version = "pr-${env.CHANGE_ID}" + parameters.push(booleanParam(name: 'ELASTIC_AGENT_USE_CI_SNAPSHOTS', value: true)) + parameters.push(string(name: 'ELASTIC_AGENT_VERSION', value: "${version}")) + parameters.push(string(name: 'METRICBEAT_VERSION', value: "${version}")) + } + + build(job: "${e2eTestsPipeline}", + parameters: parameters, + propagate: false, + wait: false + ) + + def notifyContext = "${env.GITHUB_CHECK_E2E_TESTS_NAME}" + githubNotify(context: "${notifyContext}", description: "${notifyContext} ...", status: 'PENDING', targetUrl: "${env.JENKINS_URL}search/?q=${e2eTestsPipeline.replaceAll('/','+')}") +} + def withMacOSEnv(Closure body){ withEnvMask( vars: [ [var: "KEYCHAIN_PASS", password: getVaultSecret(secret: "secret/jenkins-ci/macos-codesign-keychain").data.password], From c0784d7dccacd8b8474abe9e73998006b9e4f6c3 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 15 Jan 2021 11:00:42 -0500 Subject: [PATCH 4/5] [Elastic Agent] Capture the rollover files for logging (#23514) (#23516) * Capture the rollover files for logging. * Add changelog. (cherry picked from commit 72eb9691f9dc42b84d33b2a349b12838feb77561) --- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 1 + x-pack/elastic-agent/pkg/agent/operation/monitoring.go | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 75478112e8c..fc7246dc107 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -32,6 +32,7 @@ - Remove artifacts on transient download errors {pull}23235[23235] - Skip top level files when unziping archive during upgrade {pull}23456[23456] - Do not take ownership of Endpoint log path {pull}23444[23444] +- Fix issue of missing log messages from filebeat monitor {pull}23514[23514] ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go index a7b835503be..c6a5492133f 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go @@ -188,7 +188,9 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i }, "paths": []string{ filepath.Join(paths.Home(), "logs", "elastic-agent-json.log"), + filepath.Join(paths.Home(), "logs", "elastic-agent-json.log*"), filepath.Join(paths.Home(), "logs", "elastic-agent-watcher-json.log"), + filepath.Join(paths.Home(), "logs", "elastic-agent-watcher-json.log*"), }, "index": "logs-elastic_agent-default", "processors": []map[string]interface{}{ @@ -531,7 +533,10 @@ func (o *Operator) getLogFilePaths() map[string][]string { for _, a := range o.apps { logPath := a.Monitor().LogPath(a.Spec(), o.pipelineID) if logPath != "" { - paths[strings.ReplaceAll(a.Name(), "-", "_")] = append(paths[a.Name()], logPath) + paths[strings.ReplaceAll(a.Name(), "-", "_")] = []string{ + logPath, + fmt.Sprintf("%s*", logPath), + } } } From 274d07827f19c2375d2d661bfd2d81968c48b638 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 15 Jan 2021 12:53:17 -0500 Subject: [PATCH 5/5] [libbeat] Fix Kafka output "circuit breaker is open" errors (#23484) (#23527) (cherry picked from commit c11d12cf6e80eee6561e931292c7048864106d13) --- CHANGELOG.next.asciidoc | 1 + go.mod | 1 + libbeat/outputs/kafka/client.go | 96 ++++++++++++++++++++++++++++++++- 3 files changed, 97 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e16860fa00d..49fc91b02ce 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -164,6 +164,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix reporting of cgroup metrics when running under Docker {pull}22879[22879] - Fix typo in config docs {pull}23185[23185] - Fix panic due to unhandled DeletedFinalStateUnknown in k8s OnDelete {pull}23419[23419] +- Fix error loop with runaway CPU use when the Kafka output encounters some connection errors {pull}23484[23484] *Auditbeat* diff --git a/go.mod b/go.mod index fb2c671fc17..6725f85a6fd 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/dop251/goja v0.0.0-00010101000000-000000000000 github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 github.com/dustin/go-humanize v1.0.0 + github.com/eapache/go-resiliency v1.2.0 github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2 github.com/elastic/ecs v1.6.0 github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 6080783192e..b01d4ab21df 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -24,8 +24,10 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/Shopify/sarama" + "github.com/eapache/go-resiliency/breaker" "github.com/elastic/beats/v7/libbeat/common/fmtstr" "github.com/elastic/beats/v7/libbeat/common/transport" @@ -47,6 +49,7 @@ type client struct { codec codec.Codec config sarama.Config mux sync.Mutex + done chan struct{} producer sarama.AsyncProducer @@ -85,6 +88,7 @@ func newKafkaClient( index: strings.ToLower(index), codec: writer, config: *cfg, + done: make(chan struct{}), } return c, nil } @@ -121,6 +125,7 @@ func (c *client) Close() error { return nil } + close(c.done) c.producer.AsyncClose() c.wg.Wait() c.producer = nil @@ -237,12 +242,92 @@ func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) { } func (c *client) errorWorker(ch <-chan *sarama.ProducerError) { + breakerOpen := false defer c.wg.Done() defer c.log.Debug("Stop kafka error handler") for errMsg := range ch { msg := errMsg.Msg.Metadata.(*message) msg.ref.fail(msg, errMsg.Err) + + if errMsg.Err == breaker.ErrBreakerOpen { + // ErrBreakerOpen is a very special case in Sarama. It happens only when + // there have been repeated critical (broker / topic-level) errors, and it + // puts Sarama into a state where it immediately rejects all input + // for 10 seconds, ignoring retry / backoff settings. + // With this output's current design (in which Publish passes through to + // Sarama's input channel with no further synchronization), retrying + // these failed values causes an infinite retry loop that degrades + // the entire system. + // "Nice" approaches and why we haven't used them: + // - Use exposed API to navigate this state and its effect on retries. + // * Unfortunately, Sarama's circuit breaker and its errors are + // hard-coded and undocumented. We'd like to address this in the + // future. + // - If a batch fails with a circuit breaker error, delay before + // retrying it. + // * This would fix the most urgent performance issues, but requires + // extra bookkeeping because the Kafka output handles each batch + // independently. It results in potentially many batches / 10s of + // thousands of events being loaded and attempted, even though we + // know there's a fatal error early in the first batch. It also + // makes it hard to know when each batch should be retried. + // - In the Kafka Publish method, add a blocking first-pass intake step + // that can gate on error conditions, rather than handing off data + // to Sarama immediately. + // * This would fix the issue but would require a lot of work and + // testing, and we need a fix for the release now. It's also a + // fairly elaborate workaround for something that might be + // easier to fix in the library itself. + // + // Instead, we have applied the following fix, which is not very "nice" + // but satisfies all other important constraints: + // - When we receive a circuit breaker error, sleep for 10 seconds + // (Sarama's hard-coded timeout) on the _error worker thread_. + // + // This works because connection-level errors that can trigger the + // circuit breaker are on the critical path for input processing, and + // thus blocking on the error channel applies back-pressure to the + // input channel. This means that if there are any more errors while the + // error worker is asleep, any call to Publish will block until we + // start reading again. + // + // Reasons this solution is preferred: + // - It responds immediately to Sarama's global error state, rather than + // trying to detect it independently in each batch or adding more + // cumbersome synchronization to the output + // - It gives the minimal delay that is consistent with Sarama's + // internal behavior + // - It requires only a few lines of code and no design changes + // + // That said, this is still relying on undocumented library internals + // for correct behavior, which isn't ideal, but the error itself is an + // undocumented library internal, so this is de facto necessary for now. + // We'd like to have a more official / permanent fix merged into Sarama + // itself in the future. + + // The "breakerOpen" flag keeps us from sleeping the first time we see + // a circuit breaker error, because it might be an old error still + // sitting in the channel from 10 seconds ago. So we only end up + // sleeping every _other_ reported breaker error. + if breakerOpen { + // Immediately log the error that presumably caused this state, + // since the error reporting on this batch will be delayed. + if msg.ref.err != nil { + c.log.Errorf("Kafka (topic=%v): %v", msg.topic, msg.ref.err) + } + select { + case <-time.After(10 * time.Second): + // Sarama's circuit breaker is hard-coded to reject all inputs + // for 10sec. + case <-msg.ref.client.done: + // Allow early bailout if the output itself is closing. + } + breakerOpen = false + } else { + breakerOpen = true + } + } } } @@ -261,9 +346,18 @@ func (r *msgRef) fail(msg *message, err error) { msg.topic, len(msg.key)+len(msg.value)) + case breaker.ErrBreakerOpen: + // Add this message to the failed list, but don't overwrite r.err since + // all the breaker error means is "there were a lot of other errors". + r.failed = append(r.failed, msg.data) + default: r.failed = append(r.failed, msg.data) - r.err = err + if r.err == nil { + // Don't overwrite an existing error. This way at tne end of the batch + // we report the first error that we saw, rather than the last one. + r.err = err + } } r.dec() }