Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions telemetry/flow-enricher/cmd/flow-enricher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"crypto/tls"
"flag"
"fmt"
"log/slog"
Expand All @@ -12,6 +13,7 @@ import (
"syscall"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/malbeclabs/doublezero/config"
"github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability"
enricher "github.com/malbeclabs/doublezero/telemetry/flow-enricher/internal/flow-enricher"
Expand Down Expand Up @@ -54,30 +56,48 @@ func main() {

reg := prometheus.WrapRegistererWithPrefix("enricher_", prometheus.DefaultRegisterer)

// setup output writer (ClickHouse or stdout)
var chWriter enricher.Clicker
// setup output writer (ClickHouse or stdout) and optional ClickHouse reader for annotators
var chWriter enricher.FlowWriter
var chReader enricher.ClickhouseReader // Only set when using ClickHouse
if *stdoutOutput {
chWriter = enricher.NewStdoutWriter()
} else {
chOpts := []enricher.ClickhouseOption{}
if os.Getenv("CLICKHOUSE_TLS_DISABLED") == "true" {
chOpts = append(chOpts, enricher.WithTLSDisabled(true))
}

chOpts = append(chOpts, enricher.WithClickhouseAddr(os.Getenv("CLICKHOUSE_ADDR")),
enricher.WithClickhouseDB(getEnvOrDefault("CLICKHOUSE_DB", "default")),
chAddr := os.Getenv("CLICKHOUSE_ADDR")
chDB := getEnvOrDefault("CLICKHOUSE_DB", "default")
chUser := os.Getenv("CLICKHOUSE_USER")
chPass := os.Getenv("CLICKHOUSE_PASS")
tlsDisabled := os.Getenv("CLICKHOUSE_TLS_DISABLED") == "true"

chOpts := []enricher.ClickhouseOption{
enricher.WithClickhouseAddr(chAddr),
enricher.WithClickhouseDB(chDB),
enricher.WithClickhouseTable(getEnvOrDefault("CLICKHOUSE_TABLE", "flows")),
enricher.WithClickhouseUser(os.Getenv("CLICKHOUSE_USER")),
enricher.WithClickhousePassword(os.Getenv("CLICKHOUSE_PASS")),
enricher.WithClickhouseUser(chUser),
enricher.WithClickhousePassword(chPass),
enricher.WithTLSDisabled(tlsDisabled),
enricher.WithClickhouseLogger(logger),
enricher.WithClickhouseMetrics(enricher.NewClickhouseMetrics(reg)),
)
}
var err error
chWriter, err = enricher.NewClickhouseWriter(chOpts...)
if err != nil {
logger.Error("error creating clickhouse writer", "error", err)
os.Exit(1)
}

// Create a *sql.DB for annotators that need to query ClickHouse
connOpts := &clickhouse.Options{
Addr: []string{chAddr},
Auth: clickhouse.Auth{
Database: chDB,
Username: chUser,
Password: chPass,
},
}
if !tlsDisabled {
connOpts.TLS = &tls.Config{}
}
chReader = clickhouse.OpenDB(connOpts)
}

// setup input consumer (Kafka or pcap)
Expand Down Expand Up @@ -134,6 +154,11 @@ func main() {

e.AddAnnotator(enricher.NewServiceabilityAnnotator(e.ServiceabilityData))

// Add IfName annotator when using ClickHouse
if chReader != nil {
e.AddAnnotator(enricher.NewIfNameAnnotator(chReader, logger))
}

// start prometheus
go func() {
mux := http.NewServeMux()
Expand Down
12 changes: 6 additions & 6 deletions telemetry/flow-enricher/internal/flow-enricher/enricher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type FlowConsumer interface {
Close() error
}

// Clicker defines the minimal interface the Enricher needs to interact with ClickHouse.
type Clicker interface {
// FlowWriter defines the minimal interface the Enricher needs to write flows.
type FlowWriter interface {
BatchInsert(context.Context, []FlowSample) error
}

Expand All @@ -39,8 +39,8 @@ type ServiceabilityFetcher interface {

type EnricherOption func(*Enricher)

// WithClickhouseWriter injects a Clicker implementation into the Enricher.
func WithClickhouseWriter(writer Clicker) EnricherOption {
// WithClickhouseWriter injects a FlowWriter implementation into the Enricher.
func WithClickhouseWriter(writer FlowWriter) EnricherOption {
return func(e *Enricher) {
e.chWriter = writer
}
Expand Down Expand Up @@ -79,7 +79,7 @@ func WithServiceabilityFetchInterval(interval time.Duration) EnricherOption {
}

type Enricher struct {
chWriter Clicker
chWriter FlowWriter
flowConsumer FlowConsumer
serviceability ServiceabilityFetcher
annotators []Annotator
Expand Down Expand Up @@ -171,7 +171,7 @@ func (e *Enricher) Run(ctx context.Context) error {
}

if err := e.chWriter.BatchInsert(ctx, samples); err != nil {
e.logger.Error("error inserting batch via Clicker", "error", err)
e.logger.Error("error inserting batch via FlowWriter", "error", err)
e.metrics.ClickhouseInsertErrors.Inc()
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,17 @@ func TestFlowEnrichment(t *testing.T) {
)
enricher.AddAnnotator(NewServiceabilityAnnotator(enricher.ServiceabilityData))

// Create a *sql.DB for the IfNameAnnotator using the same connection params
chDB := ch.OpenDB(&ch.Options{
Addr: []string{chConn},
Auth: ch.Auth{
Database: chDbname,
Username: chUser,
Password: chPassword,
},
})
enricher.AddAnnotator(NewIfNameAnnotator(chDB, logger))

go func() {
if err := enricher.Run(ctx); err != nil {
logger.Error("error during enrichment", "error", err)
Expand Down Expand Up @@ -288,26 +299,29 @@ func TestFlowEnrichment(t *testing.T) {
})

type flowRow struct {
SrcAddr string `db:"src_addr"`
DstAddr string `db:"dst_addr"`
SrcPort uint16 `db:"src_port"`
DstPort uint16 `db:"dst_port"`
Proto string `db:"proto"`
EType string `db:"etype"`
SrcDeviceCode string `db:"src_device_code"`
DstDeviceCode string `db:"dst_device_code"`
SrcLocation string `db:"src_location"`
DstLocation string `db:"dst_location"`
SrcExchange string `db:"src_exchange"`
DstExchange string `db:"dst_exchange"`
SrcAddr string `db:"src_addr"`
DstAddr string `db:"dst_addr"`
SrcPort uint16 `db:"src_port"`
DstPort uint16 `db:"dst_port"`
Proto string `db:"proto"`
EType string `db:"etype"`
SrcDeviceCode string `db:"src_device_code"`
DstDeviceCode string `db:"dst_device_code"`
SrcLocation string `db:"src_location"`
DstLocation string `db:"dst_location"`
SrcExchange string `db:"src_exchange"`
DstExchange string `db:"dst_exchange"`
InputInterface string `db:"in_ifname"`
OutputInterface string `db:"out_ifname"`
}

var rows []flowRow
require.Eventually(t, func() bool {
rows = nil // Reset at the start of each attempt
dbRows, err := conn.Query(fmt.Sprintf(`
SELECT src_addr, dst_addr, src_port, dst_port, proto, etype,
src_device_code, dst_device_code, src_location, dst_location, src_exchange, dst_exchange
src_device_code, dst_device_code, src_location, dst_location, src_exchange, dst_exchange,
in_ifname, out_ifname
FROM %s.%s
`, chDbname, chTable))
if err != nil {
Expand All @@ -321,6 +335,7 @@ func TestFlowEnrichment(t *testing.T) {
if err := dbRows.Scan(
&row.SrcAddr, &row.DstAddr, &row.SrcPort, &row.DstPort, &row.Proto, &row.EType,
&row.SrcDeviceCode, &row.DstDeviceCode, &row.SrcLocation, &row.DstLocation, &row.SrcExchange, &row.DstExchange,
&row.InputInterface, &row.OutputInterface,
); err != nil {
t.Logf("error scanning row: %v", err)
return false
Expand Down Expand Up @@ -351,5 +366,10 @@ func TestFlowEnrichment(t *testing.T) {
require.Equal(t, "", row.DstDeviceCode, "unexpected dst_device_code")
require.Equal(t, "", row.DstLocation, "unexpected dst_location")
require.Equal(t, "", row.DstExchange, "unexpected dst_exchange")

// Validate enriched interface names from device_ifindex table
// Sampler 137.174.145.144 with ifindex 8001063 -> eth0, ifindex 8001134 -> eth1
require.Equal(t, "eth0", row.InputInterface, "unexpected in_ifname")
require.Equal(t, "eth1", row.OutputInterface, "unexpected out_ifname")
}
}
18 changes: 9 additions & 9 deletions telemetry/flow-enricher/internal/flow-enricher/enricher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ func (m *MockFlowConsumer) Close() error {
return nil
}

type MockClicker struct {
type MockClickhouseWriter struct {
mu sync.Mutex
ReceivedSamples []FlowSample
InsertError error
}

func (m *MockClicker) BatchInsert(ctx context.Context, samples []FlowSample) error {
func (m *MockClickhouseWriter) BatchInsert(ctx context.Context, samples []FlowSample) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.InsertError != nil {
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestEnricher(t *testing.T) {
mockConsumer := &MockFlowConsumer{
SamplesToReturn: [][]FlowSample{expectedSamples},
}
mockWriter := &MockClicker{}
mockWriter := &MockClickhouseWriter{}
mockServiceability := &MockServiceabilityFetcher{
programData: &serviceability.ProgramData{},
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestEnricherMetrics(t *testing.T) {
tests := []struct {
name string
mockConsumer *MockFlowConsumer
mockWriter *MockClicker
mockWriter *MockClickhouseWriter
expectedFlowsProcessed float64
expectedClickhouseErrs float64
expectedKafkaCommitErrs float64
Expand All @@ -182,15 +182,15 @@ func TestEnricherMetrics(t *testing.T) {
mockConsumer: &MockFlowConsumer{
SamplesToReturn: [][]FlowSample{{{SrcAddress: net.IP("1.1.1.1")}, {SrcAddress: net.IP("2.2.2.2")}}},
},
mockWriter: &MockClicker{},
mockWriter: &MockClickhouseWriter{},
expectedFlowsProcessed: 2,
},
{
name: "ClickHouse insert error increments ClickhouseInsertErrors",
mockConsumer: &MockFlowConsumer{
SamplesToReturn: [][]FlowSample{{{SrcAddress: net.IP("1.1.1.1")}, {SrcAddress: net.IP("2.2.2.2")}}},
},
mockWriter: &MockClicker{
mockWriter: &MockClickhouseWriter{
InsertError: errors.New("clickhouse failed"),
},
expectedClickhouseErrs: 1,
Expand All @@ -201,15 +201,15 @@ func TestEnricherMetrics(t *testing.T) {
SamplesToReturn: [][]FlowSample{{{SrcAddress: net.IP("1.1.1.1")}, {SrcAddress: net.IP("2.2.2.2")}}},
CommitError: errors.New("kafka commit failed"),
},
mockWriter: &MockClicker{},
mockWriter: &MockClickhouseWriter{},
expectedKafkaCommitErrs: 1,
},
{
name: "No samples processed does not increment metrics",
mockConsumer: &MockFlowConsumer{
SamplesToReturn: [][]FlowSample{},
},
mockWriter: &MockClicker{},
mockWriter: &MockClickhouseWriter{},
},
}

Expand Down Expand Up @@ -255,7 +255,7 @@ func TestEnricherServiceabilityFetching(t *testing.T) {

enricher := NewEnricher(
WithFlowConsumer(&MockFlowConsumer{}),
WithClickhouseWriter(&MockClicker{}),
WithClickhouseWriter(&MockClickhouseWriter{}),
WithServiceabilityFetcher(mockFetcher),
WithEnricherMetrics(metrics),
WithLogger(logger),
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
INSERT INTO default.device_ifindex (*) Values ('aaa', 8001013, '204.16.241.241', 'Switch1/1/1', now()), ('aaa', 8001014, '204.16.241.241', 'Switch1/1/2', now());
INSERT INTO default.device_ifindex (*) VALUES ('aaa', 8001013, '204.16.241.241', 'Switch1/1/1', now()), ('aaa', 8001014, '204.16.241.241', 'Switch1/1/2', now()), ('test-device', 8001063, '137.174.145.144', 'eth0', now()), ('test-device', 8001134, '137.174.145.144', 'eth1', now());
Loading
Loading