Skip to content
This repository has been archived by the owner on Sep 21, 2023. It is now read-only.

Kafka Output #147

Merged
merged 31 commits into from
Nov 8, 2022
Merged

Kafka Output #147

merged 31 commits into from
Nov 8, 2022

Conversation

robbavey
Copy link
Member

@robbavey robbavey commented Oct 27, 2022

What does this PR do?

This is a port of the Kafka output from beats to the shipper.

This is mostly a direct port from the beats output to the shipper. The configuration is mostly as is, with a couple of deviations due to lack of current functional support in the shipper:

Currently, this port uses the sarama SyncProducer sending each message at a time rather than batching or using the AsyncProducer, to simplify the flow of event state back to the event loop while working on the rest of the implementation.

Code is rough, and contains copious amounts of commented out code related to previous implementation in beats.

What is missing:

Functionality to maintain parity with beats output:

  • Dynamic topic selection from event content is more basic, using a fmtstr.EventFormatString rather than a selector, due to lack of support for selector creation.
  • Codec selection is not currently supported, events are formatted using pretty json to help with visual inspection of the events during manual testing
  • Exposing metrics/stats - the existing beats output uses an observer to track events sent, failed, etc. This implementation does not have that yet
  • Retries. This output has retries with backoff integrated with the sarama library, as per the beats output. There seems to be some disconnect between some of the comments in the beats output stating that retries should be handled by libbeat, rather than the library, but then setting retries and an exponential backoff function in the library. Let's get our story straight here.
  • Using event data "caches" to store topic, partition and key along with the message to avoid re-processing data on retry
  • Batching. The beats output supplies a set size of the batch of events that the Kafka output should process, right now the output pulls a fixed batch of 1000 events and then sends asynchronously and waits for completion.

Testing

Testing is rudimentary, with only the config having any tests at the moment. Before this can be considered complete, we will need to have

  • Integration tests, setting up a docker container with Kafka to send events to
    • There are a lot of different permutations on how to run kafka, including Kerberos, compression, etc. We should figure out how extensive we want to be with these tests
  • More comprehensive unit tests of other functionality created.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
    - [ ] I have added an entry in CHANGELOG.md or CHANGELOG-developer.md.

How to test this PR locally

Obviously this is also lacking tests at the moment, to test manually, I have been running a local Kafka running on port 9092, and using metricbeat with:

output.shipper:
  server: "localhost:50052"

And the following settings in elastic-agent-shipper.yml

output.kafka:
  enabled: true
  hosts: ["localhost:9092"]
  topic: '%{[metricset][name]}'
  partition.round_robin:
    reachable_only: true
  key: '%{[metricset][name]}'

And using kcat to pull messages out of the various topics to prove the correctness of delivery.

kcat -b localhost:9092 -G mygroup <TOPIC_NAME>

Related issues

Relates #116

Minimal working output, that will accept messages from beats via the shipper output,
create a kafka client and send the messages to the given kafka topic to the given
hosts.

Still a lot to do:

Serialize the data properly, not the standard JSONization of the protobuf
Populate the metadata of the event
Assign topic based on the content of the message
Metrics
Partition assignment based on message contents
Tests
Move config to final resting place (output.kafka not kafka)
Kafka keys
Better understand the backoff
Figure out what the data cache was used for, and how to replace it, particularly wrt partition assignment
Use SyncProducer with sendMessage, for now, to simplify control flow back to consumption loop
@mergify
Copy link
Contributor

mergify bot commented Oct 27, 2022

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @robbavey? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

@elasticmachine
Copy link
Collaborator

elasticmachine commented Oct 27, 2022

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2022-11-08T20:10:02.662+0000

  • Duration: 15 min 37 sec

❕ Flaky test report

No test was executed to be analysed.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

Setting `validate:required` mandated kafka.topic and kafka.host be set
regardless of whether the kafka was in the config...
@robbavey
Copy link
Member Author

/test

@roaksoax roaksoax linked an issue Oct 31, 2022 that may be closed by this pull request
8 tasks
@robbavey robbavey changed the title [Work in Progress] Kafka Output Kafka Output Nov 2, 2022
@robbavey robbavey marked this pull request as ready for review November 2, 2022 13:38
@robbavey robbavey requested a review from a team as a code owner November 2, 2022 13:38
@robbavey robbavey requested review from cmacknz, leehinman and faec and removed request for a team November 2, 2022 13:38
Copy link
Member

@cmacknz cmacknz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I went through and tried to clarify most of the open decisions left here. Most of the questions you are asking apply to any of the outputs we are writing, if we can't get answers in the PR itself in a reasonable amount of time I will setup a sync with @leehinman and @faec to clarify things.

It may be worth setting that up regardless to make sure we are aligned on how we want implement retries and acknowledgements across outputs.

Most of the open TODOs can be converted into follow up issues. For example we will almost certainly want an integration test that starts Kafka in a container, similar to what is described in #148.

@@ -209,6 +210,9 @@ func outputFromConfig(config output.Config, queue *queue.Queue) (Output, error)
if config.Elasticsearch != nil {
return elasticsearch.NewElasticSearch(config.Elasticsearch, queue), nil
}
if config.Kafka != nil && config.Kafka.Enabled {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily something to fix in this PR, but if a user defines multiple outputs it looks like we will just arbitrarily pick one with an undocumented precedence.

It would be better if we could return an error to the user indicating that multiple outputs are unsupported. Probably the best way to do this is to report the shipper state as FAILED when the agent gives it an expected configuration with multiple outputs.

If someone has a better idea, let me know. Otherwise I'll write up an issue for us to implement this.

go.mod Outdated
@@ -87,6 +91,14 @@ require (
howett.net/plist v1.0.0 // indirect
)

require (
github.com/Shopify/sarama v1.27.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use currently use our own fork of sarama for Beats. Add a replace statement that uses the same version that Beats currently does: https://github.com/elastic/beats/blob/89a1134b6e279e9fa8e33c549d0340e4b0f67430/go.mod#L355

Mostly it just adds better error reporting when connecting to the broker fails: https://github.com/elastic/sarama/commits/beats-fork

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

wg sync.WaitGroup

//observer outputs.Observer TODO: what to do with observers?
//topic outil.Selector TODO: Work out how to do selectors in the new world.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific problem preventing you from using them here? What problem do we need to resolve?

I think we likely need feature parity with Beats here, but it doesn't necessarily need to be the same implementation so this may not be an issue.

Disclaimer: I'm not very familiar with how selectors work in Beats yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to index selection in the Elasticsearch output which had the same issue. Selectors are part of the output API for the current beats pipeline, so Beats outputs are required to accept them, but they don't have much meaning outside of libbeat. Code-wise they are just opaque callbacks that take a beat.Event and return a string indicating the index or topic. We can't adopt them directly since we aren't using beat.Event but we could use a similar approach with callbacks taking an event protobuf. However, the real blocker here is design work: as I understand it, index / topic selection will come as part of the V2 protocol, so implementation decisions about selector-equivalents depend on that. (This is also why the initial Elasticsearch output uses a hard-coded output index, since there's nowhere to get the real one from yet.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created an issue to track topic selection

output/kafka/client.go Outdated Show resolved Hide resolved
client *Client
count int32
total int
failed []*messages.Event // TODO: Need to know how to deal with failed events
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want a generic retry and failure configuration we can use in each output.

I wrote up some initial requirements for the ES output, which can use as a starting point to iterate on: #151

In general we can start simple and just have a configurable backoff algorithm, and a maximum retry count. If you exceed the maximum retry count we log the event at debug level and drop it.

@faec and @leehinman curious to get your thoughts here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, my thoughts are that retry and failure behavior in the shipper is already a big standalone question that needs discussion and ideally a design document :-) Right now in Beats the retry / failure options have common elements but also vary between outputs (as one example, in Kafka the default timeout for client-to-broker communication is higher than the default timeout for broker-to-broker).

The Beats Kafka output already breaks some of our error handling abstraction because its error handling is inherently tied to our Kafka library, but the common error handling config fields are inherently tied to the Beats pipeline. We've had to do special fixes for the Kafka output because the two different error handling mechanisms don't talk to each other very well and they used to get into infinite loops because of it.

So it makes sense to have some common config baseline, but we need to pin down what the actual contract is for shipper outputs. The tentative plan as of earlier this year was that outputs would implement retries themselves but that we'd try to provide API support to make it easy (even if it's just standalone helper routines covering "the simple case"). But we need to separate the overall functional requirements (which we have at least a general idea of) from the specific API responsibilities of an output (which as far as I know we haven't started on yet).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on design doc. I think the disk queue adds some extra edge cases. For example, do we want to try and preserve the number of failed attempts in between shutdowns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @faec mentions, there are some complications around retries in the existing kafka output - from what I can tell (despite the comment stating to the contrary), the beats kafka output sets max retries and the back off function on the sarama library to have that perform retries when communicating to the server, but also calls batch.RetryEvents for events that are not determined to be unretryable (https://github.com/elastic/beats/blob/main/libbeat/outputs/kafka/client.go#L370-L375)

This PR carries over this retry mechanism, setting the number of retries and a back off function in the library config (and will return a list of "failed" events from the publish method), but it is unclear at the moment whether that is appropriate or sufficient. I add my +1 on a design doc here to better understand requirements and the responsibilities of the outputs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll write an issue for tracking this as well and see if we can come up with something reasonable quickly.

topic *fmtstr.EventFormatString,
//topic outil.Selector,
headers []Header,
writer codec.Codec, // TODO: Proper codec support
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned earlier, feature parity with Beats is likely the baseline we need. Beats looks like it supports a configurable JSON codec and a format codec to compose messages from arbitrary fields: https://www.elastic.co/guide/en/beats/filebeat/current/configuration-output-codec.html

At least we need the JSON codec with the same options. I'm not sure how widely used the format codec is though.

You can create an issue to add codec support as a follow up change. If implementing this in the same way as Beats proves to be difficult, or we think it should be different (for example dropping the format codec) we can discuss it in that issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncProducer sarama.AsyncProducer
wg sync.WaitGroup

//observer outputs.Observer TODO: what to do with observers?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to get metrics for outputs in the shipper. I'll defer to @faec and @leehinman on whether we should try to port over the Beat observer pattern or do something else.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to have high overlap with the metrics work @fearful-symmetry is doing -- in the Elasticsearch output I left the observers commented out similarly in the expectation that they would probably be coming through and replacing the hooks with something shipper-specific later (although we don't have an overarching plan or api that components can use yet).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a design doc, made an issue for it. #163

}

switch {
// TODO: Are we still planning on Kerberos to be beta?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a comment from you, or was this ported over from the Beats output?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This is a new comment) Kerberos is still beta and staying that way until we get it working with Active Directory (which might happen before the shipper itself leaves beta but it's not on the current roadmap)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is new, but it refers to the configuration warning below - it's an open question that I will create an issue to track.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you reference the issue from this TODO?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

//if config.MaxRetries < 0 {
// retry = -1
//}
//return outputs.Success(config.BulkMaxSize, retry, client)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this won't work here, as it is a part of the Beat output abstraction layer that I think sits one level above the actual output implementation. We don't have that in the shipper, and is likely something we'll need a design discussion about.

Copy link
Contributor

@faec faec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, thanks so much!

asyncProducer sarama.AsyncProducer
wg sync.WaitGroup

//observer outputs.Observer TODO: what to do with observers?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to have high overlap with the metrics work @fearful-symmetry is doing -- in the Elasticsearch output I left the observers commented out similarly in the expectation that they would probably be coming through and replacing the hooks with something shipper-specific later (although we don't have an overarching plan or api that components can use yet).

wg sync.WaitGroup

//observer outputs.Observer TODO: what to do with observers?
//topic outil.Selector TODO: Work out how to do selectors in the new world.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to index selection in the Elasticsearch output which had the same issue. Selectors are part of the output API for the current beats pipeline, so Beats outputs are required to accept them, but they don't have much meaning outside of libbeat. Code-wise they are just opaque callbacks that take a beat.Event and return a string indicating the index or topic. We can't adopt them directly since we aren't using beat.Event but we could use a similar approach with callbacks taking an event protobuf. However, the real blocker here is design work: as I understand it, index / topic selection will come as part of the V2 protocol, so implementation decisions about selector-equivalents depend on that. (This is also why the initial Elasticsearch output uses a hard-coded output index, since there's nowhere to get the real one from yet.)

}

switch {
// TODO: Are we still planning on Kerberos to be beta?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This is a new comment) Kerberos is still beta and staying that way until we get it working with Active Directory (which might happen before the shipper itself leaves beta but it's not on the current roadmap)

client *Client
count int32
total int
failed []*messages.Event // TODO: Need to know how to deal with failed events
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, my thoughts are that retry and failure behavior in the shipper is already a big standalone question that needs discussion and ideally a design document :-) Right now in Beats the retry / failure options have common elements but also vary between outputs (as one example, in Kafka the default timeout for client-to-broker communication is higher than the default timeout for broker-to-broker).

The Beats Kafka output already breaks some of our error handling abstraction because its error handling is inherently tied to our Kafka library, but the common error handling config fields are inherently tied to the Beats pipeline. We've had to do special fixes for the Kafka output because the two different error handling mechanisms don't talk to each other very well and they used to get into infinite loops because of it.

So it makes sense to have some common config baseline, but we need to pin down what the actual contract is for shipper outputs. The tentative plan as of earlier this year was that outputs would implement retries themselves but that we'd try to provide API support to make it easy (even if it's just standalone helper routines covering "the simple case"). But we need to separate the overall functional requirements (which we have at least a general idea of) from the specific API responsibilities of an output (which as far as I know we haven't started on yet).

asyncProducer sarama.AsyncProducer
wg sync.WaitGroup

//observer outputs.Observer TODO: what to do with observers?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a design doc, made an issue for it. #163

client *Client
count int32
total int
failed []*messages.Event // TODO: Need to know how to deal with failed events
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on design doc. I think the disk queue adds some extra edge cases. For example, do we want to try and preserve the number of failed attempts in between shutdowns?

@robbavey robbavey mentioned this pull request Nov 8, 2022
8 tasks
@mergify
Copy link
Contributor

mergify bot commented Nov 8, 2022

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b feature/kafka-output upstream/feature/kafka-output
git merge upstream/main
git push upstream feature/kafka-output

Copy link
Member

@cmacknz cmacknz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! All the follow up work is in issues for to come back to, I don't think the expectation was that we would perfect this in one shot.

@robbavey robbavey merged commit 598c8e6 into elastic:main Nov 8, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[meta] Implement the kafka output
5 participants