diff --git a/telemetry/flow-enricher/cmd/flow-enricher/main.go b/telemetry/flow-enricher/cmd/flow-enricher/main.go index ab3911fcc..71319938a 100644 --- a/telemetry/flow-enricher/cmd/flow-enricher/main.go +++ b/telemetry/flow-enricher/cmd/flow-enricher/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "crypto/tls" "flag" "fmt" "log/slog" @@ -12,6 +13,7 @@ import ( "syscall" "time" + "github.com/ClickHouse/clickhouse-go/v2" "github.com/malbeclabs/doublezero/config" "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" enricher "github.com/malbeclabs/doublezero/telemetry/flow-enricher/internal/flow-enricher" @@ -54,30 +56,48 @@ func main() { reg := prometheus.WrapRegistererWithPrefix("enricher_", prometheus.DefaultRegisterer) - // setup output writer (ClickHouse or stdout) - var chWriter enricher.Clicker + // setup output writer (ClickHouse or stdout) and optional ClickHouse reader for annotators + var chWriter enricher.FlowWriter + var chReader enricher.ClickhouseReader // Only set when using ClickHouse if *stdoutOutput { chWriter = enricher.NewStdoutWriter() } else { - chOpts := []enricher.ClickhouseOption{} - if os.Getenv("CLICKHOUSE_TLS_DISABLED") == "true" { - chOpts = append(chOpts, enricher.WithTLSDisabled(true)) - } - - chOpts = append(chOpts, enricher.WithClickhouseAddr(os.Getenv("CLICKHOUSE_ADDR")), - enricher.WithClickhouseDB(getEnvOrDefault("CLICKHOUSE_DB", "default")), + chAddr := os.Getenv("CLICKHOUSE_ADDR") + chDB := getEnvOrDefault("CLICKHOUSE_DB", "default") + chUser := os.Getenv("CLICKHOUSE_USER") + chPass := os.Getenv("CLICKHOUSE_PASS") + tlsDisabled := os.Getenv("CLICKHOUSE_TLS_DISABLED") == "true" + + chOpts := []enricher.ClickhouseOption{ + enricher.WithClickhouseAddr(chAddr), + enricher.WithClickhouseDB(chDB), enricher.WithClickhouseTable(getEnvOrDefault("CLICKHOUSE_TABLE", "flows")), - enricher.WithClickhouseUser(os.Getenv("CLICKHOUSE_USER")), - enricher.WithClickhousePassword(os.Getenv("CLICKHOUSE_PASS")), + enricher.WithClickhouseUser(chUser), + enricher.WithClickhousePassword(chPass), + enricher.WithTLSDisabled(tlsDisabled), enricher.WithClickhouseLogger(logger), enricher.WithClickhouseMetrics(enricher.NewClickhouseMetrics(reg)), - ) + } var err error chWriter, err = enricher.NewClickhouseWriter(chOpts...) if err != nil { logger.Error("error creating clickhouse writer", "error", err) os.Exit(1) } + + // Create a *sql.DB for annotators that need to query ClickHouse + connOpts := &clickhouse.Options{ + Addr: []string{chAddr}, + Auth: clickhouse.Auth{ + Database: chDB, + Username: chUser, + Password: chPass, + }, + } + if !tlsDisabled { + connOpts.TLS = &tls.Config{} + } + chReader = clickhouse.OpenDB(connOpts) } // setup input consumer (Kafka or pcap) @@ -134,6 +154,11 @@ func main() { e.AddAnnotator(enricher.NewServiceabilityAnnotator(e.ServiceabilityData)) + // Add IfName annotator when using ClickHouse + if chReader != nil { + e.AddAnnotator(enricher.NewIfNameAnnotator(chReader, logger)) + } + // start prometheus go func() { mux := http.NewServeMux() diff --git a/telemetry/flow-enricher/internal/flow-enricher/enricher.go b/telemetry/flow-enricher/internal/flow-enricher/enricher.go index f3f8d1656..c3d31ac63 100644 --- a/telemetry/flow-enricher/internal/flow-enricher/enricher.go +++ b/telemetry/flow-enricher/internal/flow-enricher/enricher.go @@ -28,8 +28,8 @@ type FlowConsumer interface { Close() error } -// Clicker defines the minimal interface the Enricher needs to interact with ClickHouse. -type Clicker interface { +// FlowWriter defines the minimal interface the Enricher needs to write flows. +type FlowWriter interface { BatchInsert(context.Context, []FlowSample) error } @@ -39,8 +39,8 @@ type ServiceabilityFetcher interface { type EnricherOption func(*Enricher) -// WithClickhouseWriter injects a Clicker implementation into the Enricher. -func WithClickhouseWriter(writer Clicker) EnricherOption { +// WithClickhouseWriter injects a FlowWriter implementation into the Enricher. +func WithClickhouseWriter(writer FlowWriter) EnricherOption { return func(e *Enricher) { e.chWriter = writer } @@ -79,7 +79,7 @@ func WithServiceabilityFetchInterval(interval time.Duration) EnricherOption { } type Enricher struct { - chWriter Clicker + chWriter FlowWriter flowConsumer FlowConsumer serviceability ServiceabilityFetcher annotators []Annotator @@ -171,7 +171,7 @@ func (e *Enricher) Run(ctx context.Context) error { } if err := e.chWriter.BatchInsert(ctx, samples); err != nil { - e.logger.Error("error inserting batch via Clicker", "error", err) + e.logger.Error("error inserting batch via FlowWriter", "error", err) e.metrics.ClickhouseInsertErrors.Inc() continue } diff --git a/telemetry/flow-enricher/internal/flow-enricher/enricher_integration_test.go b/telemetry/flow-enricher/internal/flow-enricher/enricher_integration_test.go index 27d744a0e..a93e82409 100644 --- a/telemetry/flow-enricher/internal/flow-enricher/enricher_integration_test.go +++ b/telemetry/flow-enricher/internal/flow-enricher/enricher_integration_test.go @@ -248,6 +248,17 @@ func TestFlowEnrichment(t *testing.T) { ) enricher.AddAnnotator(NewServiceabilityAnnotator(enricher.ServiceabilityData)) + // Create a *sql.DB for the IfNameAnnotator using the same connection params + chDB := ch.OpenDB(&ch.Options{ + Addr: []string{chConn}, + Auth: ch.Auth{ + Database: chDbname, + Username: chUser, + Password: chPassword, + }, + }) + enricher.AddAnnotator(NewIfNameAnnotator(chDB, logger)) + go func() { if err := enricher.Run(ctx); err != nil { logger.Error("error during enrichment", "error", err) @@ -288,18 +299,20 @@ func TestFlowEnrichment(t *testing.T) { }) type flowRow struct { - SrcAddr string `db:"src_addr"` - DstAddr string `db:"dst_addr"` - SrcPort uint16 `db:"src_port"` - DstPort uint16 `db:"dst_port"` - Proto string `db:"proto"` - EType string `db:"etype"` - SrcDeviceCode string `db:"src_device_code"` - DstDeviceCode string `db:"dst_device_code"` - SrcLocation string `db:"src_location"` - DstLocation string `db:"dst_location"` - SrcExchange string `db:"src_exchange"` - DstExchange string `db:"dst_exchange"` + SrcAddr string `db:"src_addr"` + DstAddr string `db:"dst_addr"` + SrcPort uint16 `db:"src_port"` + DstPort uint16 `db:"dst_port"` + Proto string `db:"proto"` + EType string `db:"etype"` + SrcDeviceCode string `db:"src_device_code"` + DstDeviceCode string `db:"dst_device_code"` + SrcLocation string `db:"src_location"` + DstLocation string `db:"dst_location"` + SrcExchange string `db:"src_exchange"` + DstExchange string `db:"dst_exchange"` + InputInterface string `db:"in_ifname"` + OutputInterface string `db:"out_ifname"` } var rows []flowRow @@ -307,7 +320,8 @@ func TestFlowEnrichment(t *testing.T) { rows = nil // Reset at the start of each attempt dbRows, err := conn.Query(fmt.Sprintf(` SELECT src_addr, dst_addr, src_port, dst_port, proto, etype, - src_device_code, dst_device_code, src_location, dst_location, src_exchange, dst_exchange + src_device_code, dst_device_code, src_location, dst_location, src_exchange, dst_exchange, + in_ifname, out_ifname FROM %s.%s `, chDbname, chTable)) if err != nil { @@ -321,6 +335,7 @@ func TestFlowEnrichment(t *testing.T) { if err := dbRows.Scan( &row.SrcAddr, &row.DstAddr, &row.SrcPort, &row.DstPort, &row.Proto, &row.EType, &row.SrcDeviceCode, &row.DstDeviceCode, &row.SrcLocation, &row.DstLocation, &row.SrcExchange, &row.DstExchange, + &row.InputInterface, &row.OutputInterface, ); err != nil { t.Logf("error scanning row: %v", err) return false @@ -351,5 +366,10 @@ func TestFlowEnrichment(t *testing.T) { require.Equal(t, "", row.DstDeviceCode, "unexpected dst_device_code") require.Equal(t, "", row.DstLocation, "unexpected dst_location") require.Equal(t, "", row.DstExchange, "unexpected dst_exchange") + + // Validate enriched interface names from device_ifindex table + // Sampler 137.174.145.144 with ifindex 8001063 -> eth0, ifindex 8001134 -> eth1 + require.Equal(t, "eth0", row.InputInterface, "unexpected in_ifname") + require.Equal(t, "eth1", row.OutputInterface, "unexpected out_ifname") } } diff --git a/telemetry/flow-enricher/internal/flow-enricher/enricher_test.go b/telemetry/flow-enricher/internal/flow-enricher/enricher_test.go index 32b6bcefe..1f439f455 100644 --- a/telemetry/flow-enricher/internal/flow-enricher/enricher_test.go +++ b/telemetry/flow-enricher/internal/flow-enricher/enricher_test.go @@ -58,13 +58,13 @@ func (m *MockFlowConsumer) Close() error { return nil } -type MockClicker struct { +type MockClickhouseWriter struct { mu sync.Mutex ReceivedSamples []FlowSample InsertError error } -func (m *MockClicker) BatchInsert(ctx context.Context, samples []FlowSample) error { +func (m *MockClickhouseWriter) BatchInsert(ctx context.Context, samples []FlowSample) error { m.mu.Lock() defer m.mu.Unlock() if m.InsertError != nil { @@ -123,7 +123,7 @@ func TestEnricher(t *testing.T) { mockConsumer := &MockFlowConsumer{ SamplesToReturn: [][]FlowSample{expectedSamples}, } - mockWriter := &MockClicker{} + mockWriter := &MockClickhouseWriter{} mockServiceability := &MockServiceabilityFetcher{ programData: &serviceability.ProgramData{}, } @@ -172,7 +172,7 @@ func TestEnricherMetrics(t *testing.T) { tests := []struct { name string mockConsumer *MockFlowConsumer - mockWriter *MockClicker + mockWriter *MockClickhouseWriter expectedFlowsProcessed float64 expectedClickhouseErrs float64 expectedKafkaCommitErrs float64 @@ -182,7 +182,7 @@ func TestEnricherMetrics(t *testing.T) { mockConsumer: &MockFlowConsumer{ SamplesToReturn: [][]FlowSample{{{SrcAddress: net.IP("1.1.1.1")}, {SrcAddress: net.IP("2.2.2.2")}}}, }, - mockWriter: &MockClicker{}, + mockWriter: &MockClickhouseWriter{}, expectedFlowsProcessed: 2, }, { @@ -190,7 +190,7 @@ func TestEnricherMetrics(t *testing.T) { mockConsumer: &MockFlowConsumer{ SamplesToReturn: [][]FlowSample{{{SrcAddress: net.IP("1.1.1.1")}, {SrcAddress: net.IP("2.2.2.2")}}}, }, - mockWriter: &MockClicker{ + mockWriter: &MockClickhouseWriter{ InsertError: errors.New("clickhouse failed"), }, expectedClickhouseErrs: 1, @@ -201,7 +201,7 @@ func TestEnricherMetrics(t *testing.T) { SamplesToReturn: [][]FlowSample{{{SrcAddress: net.IP("1.1.1.1")}, {SrcAddress: net.IP("2.2.2.2")}}}, CommitError: errors.New("kafka commit failed"), }, - mockWriter: &MockClicker{}, + mockWriter: &MockClickhouseWriter{}, expectedKafkaCommitErrs: 1, }, { @@ -209,7 +209,7 @@ func TestEnricherMetrics(t *testing.T) { mockConsumer: &MockFlowConsumer{ SamplesToReturn: [][]FlowSample{}, }, - mockWriter: &MockClicker{}, + mockWriter: &MockClickhouseWriter{}, }, } @@ -255,7 +255,7 @@ func TestEnricherServiceabilityFetching(t *testing.T) { enricher := NewEnricher( WithFlowConsumer(&MockFlowConsumer{}), - WithClickhouseWriter(&MockClicker{}), + WithClickhouseWriter(&MockClickhouseWriter{}), WithServiceabilityFetcher(mockFetcher), WithEnricherMetrics(metrics), WithLogger(logger), diff --git a/telemetry/flow-enricher/internal/flow-enricher/fixtures/insert_device_ifindex.sql b/telemetry/flow-enricher/internal/flow-enricher/fixtures/insert_device_ifindex.sql index a264692c7..84043ac58 100644 --- a/telemetry/flow-enricher/internal/flow-enricher/fixtures/insert_device_ifindex.sql +++ b/telemetry/flow-enricher/internal/flow-enricher/fixtures/insert_device_ifindex.sql @@ -1 +1 @@ -INSERT INTO default.device_ifindex (*) Values ('aaa', 8001013, '204.16.241.241', 'Switch1/1/1', now()), ('aaa', 8001014, '204.16.241.241', 'Switch1/1/2', now()); +INSERT INTO default.device_ifindex (*) VALUES ('aaa', 8001013, '204.16.241.241', 'Switch1/1/1', now()), ('aaa', 8001014, '204.16.241.241', 'Switch1/1/2', now()), ('test-device', 8001063, '137.174.145.144', 'eth0', now()), ('test-device', 8001134, '137.174.145.144', 'eth1', now()); diff --git a/telemetry/flow-enricher/internal/flow-enricher/ifindex.go b/telemetry/flow-enricher/internal/flow-enricher/ifindex.go index 25658c72e..8a6560dc5 100644 --- a/telemetry/flow-enricher/internal/flow-enricher/ifindex.go +++ b/telemetry/flow-enricher/internal/flow-enricher/ifindex.go @@ -4,12 +4,18 @@ import ( "context" "database/sql" "fmt" - "log" + "log/slog" "net" "sync" "time" ) +// ClickhouseReader is an interface for reading from ClickHouse. +// It's satisfied by *sql.DB and can be easily mocked for testing. +type ClickhouseReader interface { + QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) +} + type IfIndexRecord struct { Pubkey string `ch:"pubkey"` IfIndex uint64 `ch:"ifindex"` @@ -19,31 +25,40 @@ type IfIndexRecord struct { } type IfNameAnnotator struct { - name string - chConn *sql.DB - cache map[string]string // key is a composite key of exporterIp:ifindex; val is ifname - mu sync.RWMutex + name string + querier ClickhouseReader + logger *slog.Logger + cache map[string]string // key is a composite key of exporterIp:ifindex; val is ifname + mu sync.RWMutex } -func NewIfNameAnnotator() *IfNameAnnotator { +// NewIfNameAnnotator creates a new IfNameAnnotator with the given ClickhouseReader. +// The querier is used to query the device_ifindex table for interface name mappings. +// Typically, pass a *sql.DB connected to ClickHouse. +func NewIfNameAnnotator(querier ClickhouseReader, logger *slog.Logger) *IfNameAnnotator { return &IfNameAnnotator{ - name: "ifname annotator", - cache: make(map[string]string), + name: "ifname annotator", + querier: querier, + logger: logger, + cache: make(map[string]string), } } func (i *IfNameAnnotator) populateCache(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - row, err := i.chConn.QueryContext(ctx, "SELECT * from device_ifindex FINAL;") + rows, err := i.querier.QueryContext(ctx, "SELECT * from device_ifindex FINAL;") if err != nil { return fmt.Errorf("error querying clickhouse: %v", err) } + defer rows.Close() + var cache = make(map[string]string) - for row.Next() { + for rows.Next() { record := &IfIndexRecord{} - if err := row.Scan(&record.Pubkey, &record.IfIndex, &record.IPv4Address, &record.IfName, &record.Timestamp); err != nil { - log.Printf("An error while reading the data: %s\n", err) + if err := rows.Scan(&record.Pubkey, &record.IfIndex, &record.IPv4Address, &record.IfName, &record.Timestamp); err != nil { + i.logger.Warn("error scanning ifindex record", "error", err) + continue } if record.IPv4Address == nil || record.IfIndex == 0 || record.IfName == "" { continue @@ -63,8 +78,11 @@ func (i *IfNameAnnotator) populateCache(ctx context.Context) error { } // Init populates a local cache of per-device ifindexes to interface names. -func (i *IfNameAnnotator) Init(ctx context.Context, sql *sql.DB) error { - i.chConn = sql +// It starts a background goroutine that refreshes the cache every minute. +func (i *IfNameAnnotator) Init(ctx context.Context) error { + if i.querier == nil { + return fmt.Errorf("querier is required for ifname annotator") + } if err := i.populateCache(ctx); err != nil { return fmt.Errorf("error populating initial ifname cache: %v", err) } @@ -73,12 +91,12 @@ func (i *IfNameAnnotator) Init(ctx context.Context, sql *sql.DB) error { for { select { case <-ctx.Done(): - log.Println("ifname annotator closing due to signal") + i.logger.Info("ifname annotator closing due to signal") return case <-ticker.C: if err := i.populateCache(ctx); err != nil { // TODO: add metric - log.Printf("error populating ifname cache: %v", err) + i.logger.Warn("error populating ifname cache", "error", err) continue } } @@ -96,6 +114,8 @@ func (i *IfNameAnnotator) Annotate(flow *FlowSample) error { annotate := func(samplerAddr string, ifindex int) string { key := fmt.Sprintf("%s:%d", samplerAddr, ifindex) + i.mu.RLock() + defer i.mu.RUnlock() if val, ok := i.cache[key]; ok { return val } diff --git a/telemetry/flow-enricher/internal/flow-enricher/ifindex_test.go b/telemetry/flow-enricher/internal/flow-enricher/ifindex_test.go new file mode 100644 index 000000000..4d4d523f3 --- /dev/null +++ b/telemetry/flow-enricher/internal/flow-enricher/ifindex_test.go @@ -0,0 +1,253 @@ +package enricher + +import ( + "context" + "log/slog" + "net" + "os" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIfNameAnnotator_Annotate(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + + // Create annotator and manually populate its cache (bypassing Init/DB) + annotator := &IfNameAnnotator{ + name: "ifname annotator", + logger: logger, + cache: map[string]string{ + "10.0.0.1:100": "eth0", + "10.0.0.1:200": "eth1", + "10.0.0.2:300": "ge-0/0/0", + }, + } + + tests := []struct { + name string + flow *FlowSample + expectedInputInterface string + expectedOutputInterface string + expectError bool + }{ + { + name: "annotates both input and output interfaces", + flow: &FlowSample{ + SamplerAddress: net.ParseIP("10.0.0.1"), + InputIfIndex: 100, + OutputIfIndex: 200, + }, + expectedInputInterface: "eth0", + expectedOutputInterface: "eth1", + expectError: false, + }, + { + name: "annotates only input interface when output not found", + flow: &FlowSample{ + SamplerAddress: net.ParseIP("10.0.0.1"), + InputIfIndex: 100, + OutputIfIndex: 999, // Not in cache + }, + expectedInputInterface: "eth0", + expectedOutputInterface: "", + expectError: false, + }, + { + name: "annotates using different sampler address", + flow: &FlowSample{ + SamplerAddress: net.ParseIP("10.0.0.2"), + InputIfIndex: 300, + OutputIfIndex: 0, + }, + expectedInputInterface: "ge-0/0/0", + expectedOutputInterface: "", + expectError: false, + }, + { + name: "returns empty strings when sampler address not found", + flow: &FlowSample{ + SamplerAddress: net.ParseIP("10.0.0.99"), + InputIfIndex: 100, + OutputIfIndex: 200, + }, + expectedInputInterface: "", + expectedOutputInterface: "", + expectError: false, + }, + { + name: "returns error when sampler address is nil", + flow: &FlowSample{ + SamplerAddress: nil, + InputIfIndex: 100, + OutputIfIndex: 200, + }, + expectError: true, + }, + { + name: "skips annotation when ifindex is zero", + flow: &FlowSample{ + SamplerAddress: net.ParseIP("10.0.0.1"), + InputIfIndex: 0, + OutputIfIndex: 0, + }, + expectedInputInterface: "", + expectedOutputInterface: "", + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := annotator.Annotate(tt.flow) + + if tt.expectError { + assert.Error(t, err) + return + } + + require.NoError(t, err) + assert.Equal(t, tt.expectedInputInterface, tt.flow.InputInterface) + assert.Equal(t, tt.expectedOutputInterface, tt.flow.OutputInterface) + }) + } +} + +func TestIfNameAnnotator_Init_NilQuerier(t *testing.T) { + ctx := context.Background() + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + + annotator := NewIfNameAnnotator(nil, logger) + err := annotator.Init(ctx) + require.Error(t, err) + assert.Contains(t, err.Error(), "querier is required") +} + +func TestIfNameAnnotator_String(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + annotator := NewIfNameAnnotator(nil, logger) + assert.Equal(t, "ifname annotator", annotator.String()) +} + +func TestIfNameAnnotator_ConcurrentAccess(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + + // Create annotator with pre-populated cache + annotator := &IfNameAnnotator{ + name: "ifname annotator", + logger: logger, + cache: map[string]string{ + "10.0.0.1:100": "eth0", + }, + } + + // Run concurrent Annotate calls + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + flow := &FlowSample{ + SamplerAddress: net.ParseIP("10.0.0.1"), + InputIfIndex: 100, + OutputIfIndex: 100, + } + err := annotator.Annotate(flow) + assert.NoError(t, err) + assert.Equal(t, "eth0", flow.InputInterface) + } + }() + } + + // Wait with timeout + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for concurrent access test") + } +} + +func TestIfNameAnnotator_ConcurrentReadWrite(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + + // Create annotator with pre-populated cache + annotator := &IfNameAnnotator{ + name: "ifname annotator", + logger: logger, + cache: map[string]string{ + "10.0.0.1:100": "eth0", + }, + } + + // Simulate concurrent reads and cache updates + var wg sync.WaitGroup + + // Readers + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + flow := &FlowSample{ + SamplerAddress: net.ParseIP("10.0.0.1"), + InputIfIndex: 100, + OutputIfIndex: 0, + } + _ = annotator.Annotate(flow) + } + }() + } + + // Writers (simulating cache updates) + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + annotator.mu.Lock() + annotator.cache = map[string]string{ + "10.0.0.1:100": "eth0", + "10.0.0.1:200": "eth1", + } + annotator.mu.Unlock() + } + }() + } + + // Wait with timeout + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Success - no race condition + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for concurrent read/write test") + } +} + +func TestNewIfNameAnnotator(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + + annotator := NewIfNameAnnotator(nil, logger) + + assert.NotNil(t, annotator) + assert.Equal(t, "ifname annotator", annotator.name) + assert.NotNil(t, annotator.cache) + assert.Equal(t, logger, annotator.logger) + assert.Nil(t, annotator.querier) +} diff --git a/telemetry/flow-enricher/internal/flow-enricher/stdout_writer.go b/telemetry/flow-enricher/internal/flow-enricher/stdout_writer.go index 9d45b4816..b88cff4bb 100644 --- a/telemetry/flow-enricher/internal/flow-enricher/stdout_writer.go +++ b/telemetry/flow-enricher/internal/flow-enricher/stdout_writer.go @@ -7,7 +7,7 @@ import ( "os" ) -// StdoutWriter implements Clicker for writing FlowSamples as JSON to an io.Writer. +// StdoutWriter implements FlowWriter for writing FlowSamples as JSON to an io.Writer. type StdoutWriter struct { writer io.Writer encoder *json.Encoder