diff --git a/pkg/logs/tailers/socket/tailer.go b/pkg/logs/tailers/socket/tailer.go index ad28d251058d1..459f92a2fc15d 100644 --- a/pkg/logs/tailers/socket/tailer.go +++ b/pkg/logs/tailers/socket/tailer.go @@ -98,8 +98,7 @@ func (t *Tailer) readForever() { log.Warnf("Couldn't read message from connection: %v", err) return } - copiedTags := make([]string, len(t.source.Config.Tags)) - copy(copiedTags, t.source.Config.Tags) + msg := decoder.NewInput(data) if ipAddress != "" && pkgconfigsetup.Datadog().GetBool("logs_config.use_sourcehost_tag") { lastColonIndex := strings.LastIndex(ipAddress, ":") var ipAddressWithoutPort string @@ -109,10 +108,8 @@ func (t *Tailer) readForever() { ipAddressWithoutPort = ipAddress } sourceHostTag := fmt.Sprintf("source_host:%s", ipAddressWithoutPort) - copiedTags = append(copiedTags, sourceHostTag) + msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, sourceHostTag) } - msg := decoder.NewInput(data) - msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, copiedTags...) t.decoder.InputChan <- msg } } diff --git a/pkg/logs/tailers/socket/tailer_test.go b/pkg/logs/tailers/socket/tailer_test.go index 10a168b9788e8..128896663ce14 100644 --- a/pkg/logs/tailers/socket/tailer_test.go +++ b/pkg/logs/tailers/socket/tailer_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/DataDog/datadog-agent/comp/logs/agent/config" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/logs/sources" ) @@ -58,6 +59,49 @@ func TestReadShouldFailWithError(t *testing.T) { tailer.Stop() } +func TestSourceHostTag(t *testing.T) { + msgChan := make(chan *message.Message) + r, w := net.Pipe() + logsConfig := &config.LogsConfig{ + Tags: []string{"test:tag"}, + } + + logSource := sources.NewLogSource("test-source", logsConfig) + tailer := NewTailer(logSource, r, msgChan, readWithIP) + tailer.Start() + + var msg *message.Message + w.Write([]byte("foo\n")) + msg = <-msgChan + assert.Equal(t, []string{"source_host:192.168.1.100", "test:tag"}, msg.Tags()) + tailer.Stop() +} + +func TestSourceHostTagFlagDisabled(t *testing.T) { + // Set the config flag for source_host tag to false + pkgconfigsetup.Datadog().BindEnvAndSetDefault("logs_config.use_sourcehost_tag", false) + + // Set up test components + msgChan := make(chan *message.Message) + r, w := net.Pipe() + logsConfig := &config.LogsConfig{ + Tags: []string{"test:tag"}, + } + + logSource := sources.NewLogSource("test-source", logsConfig) + tailer := NewTailer(logSource, r, msgChan, readWithIP) + tailer.Start() + + var msg *message.Message + w.Write([]byte("foo\n")) + msg = <-msgChan + + // Assert that only the original tag is present (source_host tag should not be added) + assert.Equal(t, []string{"test:tag"}, msg.Tags(), "source_host tag should not be added when flag is disabled") + + tailer.Stop() +} + func read(tailer *Tailer) ([]byte, string, error) { inBuf := make([]byte, 4096) n, err := tailer.Conn.Read(inBuf) @@ -66,3 +110,13 @@ func read(tailer *Tailer) ([]byte, string, error) { } return inBuf[:n], "", nil } + +func readWithIP(tailer *Tailer) ([]byte, string, error) { + inBuf := make([]byte, 4096) + n, err := tailer.Conn.Read(inBuf) + if err != nil { + return nil, "", err + } + mockIPAddress := "192.168.1.100:8080" + return inBuf[:n], mockIPAddress, nil +} diff --git a/releasenotes/notes/fix-duplicate-tags-e97e8eeb6492235f.yaml b/releasenotes/notes/fix-duplicate-tags-e97e8eeb6492235f.yaml new file mode 100644 index 0000000000000..8612a09bd2115 --- /dev/null +++ b/releasenotes/notes/fix-duplicate-tags-e97e8eeb6492235f.yaml @@ -0,0 +1,12 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +fixes: + - | + Fix duplicate tags in UDP/TCP logs. + diff --git a/test/new-e2e/tests/agent-metrics-logs/log-agent/utils/file_tailing_utils.go b/test/new-e2e/tests/agent-metrics-logs/log-agent/utils/file_tailing_utils.go index 7f9ab472bb80f..63e8398c33541 100644 --- a/test/new-e2e/tests/agent-metrics-logs/log-agent/utils/file_tailing_utils.go +++ b/test/new-e2e/tests/agent-metrics-logs/log-agent/utils/file_tailing_utils.go @@ -155,7 +155,7 @@ func FetchAndFilterLogs(fakeIntake *components.FakeIntake, service, content stri return logs, nil } -// CheckLogsExpected verifies the presence of expected logs. +// CheckLogsExpected verifies the presence of expected logs, and verifies that there are no duplicate tags. func CheckLogsExpected(t *testing.T, fakeIntake *components.FakeIntake, service, content string, expectedTags ddtags) { t.Helper() @@ -167,6 +167,14 @@ func CheckLogsExpected(t *testing.T, fakeIntake *components.FakeIntake, service, if assert.NotEmpty(c, logs, "Expected logs with content: '%s' not found. Instead, found: %s", content, intakeLog) { t.Logf("Logs from service: '%s' with content: '%s' collected", service, content) log := logs[0] + // Use a map to check for duplicate tags + seenTags := make(map[string]struct{}) + for _, tag := range log.Tags { + if _, exists := seenTags[tag]; exists { + t.Errorf("Duplicate tag found: %s", tag) + } + seenTags[tag] = struct{}{} // Mark the tag as seen + } for _, expectedTag := range expectedTags { assert.Contains(t, log.Tags, expectedTag) }