Skip to content

Commit

Permalink
Fix duplicate tags in TCP/UDP logs (#29780)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewqian2001datadog authored Oct 4, 2024
1 parent 2d845f9 commit 7991059
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 6 deletions.
7 changes: 2 additions & 5 deletions pkg/logs/tailers/socket/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ func (t *Tailer) readForever() {
log.Warnf("Couldn't read message from connection: %v", err)
return
}
copiedTags := make([]string, len(t.source.Config.Tags))
copy(copiedTags, t.source.Config.Tags)
msg := decoder.NewInput(data)
if ipAddress != "" && pkgconfigsetup.Datadog().GetBool("logs_config.use_sourcehost_tag") {
lastColonIndex := strings.LastIndex(ipAddress, ":")
var ipAddressWithoutPort string
Expand All @@ -109,10 +108,8 @@ func (t *Tailer) readForever() {
ipAddressWithoutPort = ipAddress
}
sourceHostTag := fmt.Sprintf("source_host:%s", ipAddressWithoutPort)
copiedTags = append(copiedTags, sourceHostTag)
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, sourceHostTag)
}
msg := decoder.NewInput(data)
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, copiedTags...)
t.decoder.InputChan <- msg
}
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/logs/tailers/socket/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/comp/logs/agent/config"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/sources"
)
Expand Down Expand Up @@ -58,6 +59,49 @@ func TestReadShouldFailWithError(t *testing.T) {
tailer.Stop()
}

func TestSourceHostTag(t *testing.T) {
msgChan := make(chan *message.Message)
r, w := net.Pipe()
logsConfig := &config.LogsConfig{
Tags: []string{"test:tag"},
}

logSource := sources.NewLogSource("test-source", logsConfig)
tailer := NewTailer(logSource, r, msgChan, readWithIP)
tailer.Start()

var msg *message.Message
w.Write([]byte("foo\n"))
msg = <-msgChan
assert.Equal(t, []string{"source_host:192.168.1.100", "test:tag"}, msg.Tags())
tailer.Stop()
}

func TestSourceHostTagFlagDisabled(t *testing.T) {
// Set the config flag for source_host tag to false
pkgconfigsetup.Datadog().BindEnvAndSetDefault("logs_config.use_sourcehost_tag", false)

// Set up test components
msgChan := make(chan *message.Message)
r, w := net.Pipe()
logsConfig := &config.LogsConfig{
Tags: []string{"test:tag"},
}

logSource := sources.NewLogSource("test-source", logsConfig)
tailer := NewTailer(logSource, r, msgChan, readWithIP)
tailer.Start()

var msg *message.Message
w.Write([]byte("foo\n"))
msg = <-msgChan

// Assert that only the original tag is present (source_host tag should not be added)
assert.Equal(t, []string{"test:tag"}, msg.Tags(), "source_host tag should not be added when flag is disabled")

tailer.Stop()
}

func read(tailer *Tailer) ([]byte, string, error) {
inBuf := make([]byte, 4096)
n, err := tailer.Conn.Read(inBuf)
Expand All @@ -66,3 +110,13 @@ func read(tailer *Tailer) ([]byte, string, error) {
}
return inBuf[:n], "", nil
}

func readWithIP(tailer *Tailer) ([]byte, string, error) {
inBuf := make([]byte, 4096)
n, err := tailer.Conn.Read(inBuf)
if err != nil {
return nil, "", err
}
mockIPAddress := "192.168.1.100:8080"
return inBuf[:n], mockIPAddress, nil
}
12 changes: 12 additions & 0 deletions releasenotes/notes/fix-duplicate-tags-e97e8eeb6492235f.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
fixes:
- |
Fix duplicate tags in UDP/TCP logs.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func FetchAndFilterLogs(fakeIntake *components.FakeIntake, service, content stri
return logs, nil
}

// CheckLogsExpected verifies the presence of expected logs.
// CheckLogsExpected verifies the presence of expected logs, and verifies that there are no duplicate tags.
func CheckLogsExpected(t *testing.T, fakeIntake *components.FakeIntake, service, content string, expectedTags ddtags) {
t.Helper()

Expand All @@ -167,6 +167,14 @@ func CheckLogsExpected(t *testing.T, fakeIntake *components.FakeIntake, service,
if assert.NotEmpty(c, logs, "Expected logs with content: '%s' not found. Instead, found: %s", content, intakeLog) {
t.Logf("Logs from service: '%s' with content: '%s' collected", service, content)
log := logs[0]
// Use a map to check for duplicate tags
seenTags := make(map[string]struct{})
for _, tag := range log.Tags {
if _, exists := seenTags[tag]; exists {
t.Errorf("Duplicate tag found: %s", tag)
}
seenTags[tag] = struct{}{} // Mark the tag as seen
}
for _, expectedTag := range expectedTags {
assert.Contains(t, log.Tags, expectedTag)
}
Expand Down

0 comments on commit 7991059

Please sign in to comment.