Skip to content

Commit

Permalink
Add output type kafka to agent policies (#2850)
Browse files Browse the repository at this point in the history
* Add output type kafka to agent policies
  • Loading branch information
michel-laterman authored Jul 28, 2023
1 parent 87d618e commit 899ba08
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Add kafka output type for agent policies

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent-shipper/issues/116
4 changes: 4 additions & 0 deletions internal/pkg/policy/policy_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
const (
OutputTypeElasticsearch = "elasticsearch"
OutputTypeLogstash = "logstash"
OutputTypeKafka = "kafka"
)

var (
Expand Down Expand Up @@ -54,6 +55,9 @@ func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.B
case OutputTypeLogstash:
zlog.Debug().Msg("preparing logstash output")
zlog.Info().Msg("no actions required for logstash output preparation")
case OutputTypeKafka:
zlog.Debug().Msg("preparing kafka output")
zlog.Info().Msg("no actions required for kafka output preparation")
default:
zlog.Error().Msgf("unknown output type: %s; skipping preparation", p.Type)
return fmt.Errorf("encountered unexpected output type while preparing outputs: %s", p.Type)
Expand Down
31 changes: 31 additions & 0 deletions internal/pkg/policy/policy_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,37 @@ func TestPolicyDefaultLogstashOutputPrepare(t *testing.T) {
bulker.AssertExpectations(t)
}

func TestPolicyKafkaOutputPrepare(t *testing.T) {
logger := testlog.SetLogger(t)
bulker := ftesting.NewMockBulk()
po := Output{
Type: OutputTypeKafka,
Name: "test output",
Role: &RoleT{
Sha2: "fake sha",
Raw: TestPayload,
},
}

err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, smap.Map{})
require.Nil(t, err, "expected prepare to pass")
bulker.AssertExpectations(t)
}
func TestPolicyKafkaOutputPrepareNoRole(t *testing.T) {
logger := testlog.SetLogger(t)
bulker := ftesting.NewMockBulk()
po := Output{
Type: OutputTypeKafka,
Name: "test output",
Role: nil,
}

err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, smap.Map{})
// No permissions are required by kafka currently
require.Nil(t, err, "expected prepare to pass")
bulker.AssertExpectations(t)
}

func TestPolicyESOutputPrepareNoRole(t *testing.T) {
logger := testlog.SetLogger(t)
bulker := ftesting.NewMockBulk()
Expand Down

0 comments on commit 899ba08

Please sign in to comment.