Skip to content

Commit

Permalink
Add back in flow tee (#741)
Browse files Browse the repository at this point in the history
Allows -tee_flow http://127.0.0.2:8080/chf  to bypass filters and go to another kt instance.
  • Loading branch information
i3149 authored Aug 27, 2024
1 parent 83a2bb7 commit b5579b6
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 11 deletions.
8 changes: 6 additions & 2 deletions cmd/ktranslate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var (
syslog string
httpInput bool
enricher string
teeFlow string
)

func init() {
Expand All @@ -72,12 +73,13 @@ func init() {
flag.IntVar(&threadsInput, "input_threads", 1, "Number of threads to run for input processing")
flag.IntVar(&maxThreads, "max_threads", 1, "Dynamically grow threads up to this number")
flag.StringVar(&format, "format", "flat_json", "Format to convert kflow to: (json|flat_json|avro|netflow|influx|carbon|prometheus|new_relic|new_relic_metric|splunk|elasticsearch|kflow|ddog)")
flag.StringVar(&formatRollup, "format_rollup", "", "Format to convert rollups to: (json|avro|netflow|influx|prometheus|new_relic|new_relic_metric|splunk|elasticsearch|kflow)")
flag.StringVar(&formatMetric, "format_metric", "", "Format to convert metrics to: (json|avro|netflow|influx|prometheus|new_relic|new_relic_metric|splunk|elasticsearch|kflow)")
flag.StringVar(&formatRollup, "format_rollup", "", "Format to convert rollups to: (json|avro|netflow|influx|prometheus|new_relic|new_relic_metric|splunk|elasticsearch|kflow|parquet)")
flag.StringVar(&formatMetric, "format_metric", "", "Format to convert metrics to: (json|avro|netflow|influx|prometheus|new_relic|new_relic_metric|splunk|elasticsearch|kflow|parquet)")
flag.StringVar(&compression, "compression", "none", "compression algo to use (none|gzip|snappy|deflate|null)")
flag.StringVar(&sinks, "sinks", "stdout", "List of sinks to send data to. Options: (kafka|stdout|new_relic|kentik|net|http|splunk|prometheus|file|s3|gcloud|ddog)")
flag.IntVar(&maxFlows, "max_flows_per_message", 10000, "Max number of flows to put in each emitted message")
flag.IntVar(&dumpRollups, "rollup_interval", 0, "Export timer for rollups in seconds")
flag.StringVar(&teeFlow, "tee_flow", "", "If set, tee flow to another ktranslate instance here.")
flag.BoolVar(&rollupAndAlpha, "rollup_and_alpha", false, "Send both rollups and alpha inputs to sinks")
flag.IntVar(&sample, "sample_rate", kt.LookupEnvInt("KENTIK_SAMPLE_RATE", 1), "Sampling rate to use. 1 -> 1:1 sampling, 2 -> 1:2 sampling and so on.")
flag.IntVar(&sampleMin, "max_before_sample", 1, "Only sample when a set of inputs is at least this many")
Expand Down Expand Up @@ -406,6 +408,8 @@ func applyFlags(cfg *ktranslate.Config) error {
cfg.EnableHTTPInput = v
case "http.remote_ip":
cfg.HttpRemoteIp = val
case "tee_flow":
cfg.TeeFlow = val
case "enricher":
if _, err := os.Stat(val); err == nil { // If this is a file on disk, run as a script.
cfg.EnricherScript = val
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ type Config struct {
EnableHTTPInput bool
HttpRemoteIp string
EnricherURL string
TeeFlow string
EnricherSource string
EnricherScript string

Expand Down
5 changes: 5 additions & 0 deletions pkg/cat/kentik.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ func (kc *KTranslate) handleFlow(w http.ResponseWriter, r *http.Request) {

}

// Tee any flows on to another ktrans instance if this is set up.
if kc.tee != nil {
kc.tee.Send(context.Background(), kt.NewOutputWithProviderAndCompanySender(evt[offset:], kt.ProviderKflow, kt.Cid(cid), kt.EventOutput, senderId))
}

// decompress and read (capnproto "packed" representation)
decoder := capn.NewPackedDecoder(bytes.NewBuffer(evt[offset:]))
decoder.MaxMessageSize = kentikDefaultCapnprotoDecodeLimit
Expand Down
19 changes: 19 additions & 0 deletions pkg/cat/kkc.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,17 @@ func NewKTranslate(config *ktranslate.Config, log logger.ContextL, registry go_m
kc.log.Infof("Using sink %s", sink)
}

// Set up a tee if we need to.
if config.TeeFlow != "" {
sink := ss.Sink("kentik")
snk, err := ss.NewSink(sink, log.GetLogger().GetUnderlyingLogger(), registry, kc.tooBig, nil, kc.config)
if err != nil {
return nil, fmt.Errorf("Invalid tee: %s, %v", sink, err)
}
kc.tee = snk
kc.log.Infof("Using ktrans tee at %s", config.TeeFlow)
}

// IP based rules
rule, err := rule.NewRuleSet(config.ApplicationFile, log)
if err != nil {
Expand Down Expand Up @@ -634,6 +645,14 @@ func (kc *KTranslate) Run(ctx context.Context) error {
}
}

// Connect the tee to send.
if kc.tee != nil {
err := kc.tee.Init(ctx, format, compression, kc.format)
if err != nil {
return err
}
}

// If there's a objmgr, init this also.
if kc.objmgr != nil {
err := kc.objmgr.Init(ctx, format, compression, kc.format)
Expand Down
1 change: 1 addition & 0 deletions pkg/cat/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type KTranslate struct {
confMgr config.ConfigManager
shutdown func(string)
objmgr sinks.CloudObjectManager
tee sinks.SinkImpl
}

type CustomMapper struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/kt/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const (
ProviderTrapUnknown Provider = "kentik-trap-device"
ProviderHttpDevice Provider = "kentik-http"
ProviderMerakiCloud Provider = "meraki-cloud-controller"
ProviderKflow Provider = "kentik-kflow"
)

const (
Expand Down
28 changes: 19 additions & 9 deletions pkg/sinks/kentik/kentik.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type KentikSink struct {
isKentik bool
config *ktranslate.Config
sendMaxDuration time.Duration
compression kt.Compression
}

type KentikMetric struct {
Expand All @@ -66,12 +67,16 @@ func NewSink(log logger.Underlying, registry go_metrics.Registry, cfg *ktranslat
}

func (s *KentikSink) Init(ctx context.Context, format formats.Format, compression kt.Compression, fmtr formats.Formatter) error {
if s.config.KentikCreds == nil || len(s.config.KentikCreds) == 0 {
return fmt.Errorf("Kentik requires -kentik_email and KENTIK_API_TOKEN env var to be set")
}
s.KentikUrl = strings.ReplaceAll(s.config.APIBaseURL, "api.", "flow.") + "/chf"
if v := s.config.KentikSink.RelayURL; v != "" { // If this is set, override and go directly here instead.
s.KentikUrl = v
if s.config.TeeFlow == "" {
if s.config.KentikCreds == nil || len(s.config.KentikCreds) == 0 {
return fmt.Errorf("Kentik requires -kentik_email and KENTIK_API_TOKEN env var to be set")
}
s.KentikUrl = strings.ReplaceAll(s.config.APIBaseURL, "api.", "flow.") + "/chf"
if v := s.config.KentikSink.RelayURL; v != "" { // If this is set, override and go directly here instead.
s.KentikUrl = v
}
} else {
s.KentikUrl = s.config.TeeFlow
}

s.isKentik = strings.Contains(strings.ToLower(s.KentikUrl), "kentik.com") // Make sure we can't feed data back into kentik in a loop.
Expand All @@ -80,6 +85,7 @@ func (s *KentikSink) Init(ctx context.Context, format formats.Format, compressio
TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
}
s.client = &http.Client{Transport: s.tr}
s.compression = compression

s.Infof("Exporting to Kentik at %s (isKentik=%v)", s.KentikUrl, s.isKentik)

Expand Down Expand Up @@ -120,10 +126,14 @@ func (s *KentikSink) sendKentik(ctx context.Context, payload []byte, cid int, se
return
}

req.Header.Set("X-CH-Auth-Email", s.config.KentikCreds[0].APIEmail)
req.Header.Set("X-CH-Auth-API-Token", s.config.KentikCreds[0].APIToken)
if s.config.KentikCreds != nil && len(s.config.KentikCreds) >= 1 {
req.Header.Set("X-CH-Auth-Email", s.config.KentikCreds[0].APIEmail)
req.Header.Set("X-CH-Auth-API-Token", s.config.KentikCreds[0].APIToken)
}
req.Header.Set("Content-Type", CHF_TYPE)
req.Header.Set("Content-Encoding", "gzip")
if s.compression == kt.CompressionGzip {
req.Header.Set("Content-Encoding", "gzip")
}

resp, err := s.client.Do(req)
if err != nil {
Expand Down

0 comments on commit b5579b6

Please sign in to comment.