From 678b2db94b76c8132e19827d46d19d24acde9abf Mon Sep 17 00:00:00 2001 From: Mark Henderson Date: Wed, 2 Dec 2015 18:12:19 -0500 Subject: [PATCH] cmd/scollector: Added ExtraHop collector that will collect L7 bytes and packet counts --- .../github.com/kylebrandt/gohop/gohop.go | 342 ++++++++++++++++++ cmd/scollector/collectors/extrahop.go | 172 +++++++++ cmd/scollector/conf/conf.go | 8 + cmd/scollector/doc.go | 21 ++ cmd/scollector/main.go | 5 + 5 files changed, 548 insertions(+) create mode 100644 _third_party/github.com/kylebrandt/gohop/gohop.go create mode 100644 cmd/scollector/collectors/extrahop.go diff --git a/_third_party/github.com/kylebrandt/gohop/gohop.go b/_third_party/github.com/kylebrandt/gohop/gohop.go new file mode 100644 index 0000000000..0aef823e62 --- /dev/null +++ b/_third_party/github.com/kylebrandt/gohop/gohop.go @@ -0,0 +1,342 @@ +package gohop + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "strconv" + "strings" + + "bosun.org/opentsdb" +) + +type Client struct { + APIKey string + APIUrl *url.URL + APIHost string +} + +// NewClient creates an instance of a ExtraHop REST API v1 client. +func NewClient(APIUrl, APIKey string) *Client { + u, err := url.Parse(APIUrl) + + if err != nil { + log.Fatal(err) + } + + return &Client{ + APIKey: APIKey, + APIUrl: u, + APIHost: u.Host, + } +} + +func (c *Client) request(path, method string, data interface{}, dst interface{}) error { + url := fmt.Sprintf("%s/api/v1/%s", c.APIUrl, path) + var d []byte + var err error + if data != nil { + d, err = json.Marshal(&data) + if err != nil { + return err + } + } + req, err := http.NewRequest(method, url, bytes.NewReader(d)) + if err != nil { + return err + } + req.Header.Add("Authorization", fmt.Sprintf("ExtraHop apikey=%s", c.APIKey)) + req.Header.Add("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if dst == nil { + return nil + } + if resp.StatusCode != 200 { + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + return fmt.Errorf("Response Code %v: %s", resp.StatusCode, b) + } + return json.NewDecoder(resp.Body).Decode(dst) +} + +func (c *Client) post(path string, data interface{}, dst interface{}) error { + return c.request(path, "POST", data, dst) +} + +func (c *Client) get(path string, data interface{}, dst interface{}) error { + return c.request(path, "GET", data, dst) +} + +// Posible Values for the Cycle Parameter of a Metric Query +var ( + CycleAuto = "auto" + Cycle30Sec = "30sec" + Cycle5Min = "5min" + Cycle1Hr = "1hr" + Cycle24Hr = "24hr" +) + +//Metrics +type MetricQuery struct { + // Can be"auto", "30sec", "5min", "1hr", "24hr" + Cycle string `json:"cycle:` + From int64 `json:"from"` + // Currently these seem secret, can be net or app though + Category string `json:"metric_category"` + //Stats.Values in the response becomes increases in length when there are more than one stat MetricSpec Requested + Specs []MetricSpec `json:"metric_specs"` + // OID becomes + ObjectIds []int64 `json:"object_ids"` + //Can be "network", "device", "application", "vlan", "device_group", and "activity_group". + Type string `json:"object_type"` + Until int64 `json:"until"` +} + +type KeyPair struct { + Key1Regex string `json:"key1,omitempty"` + Key2Regex string `json:"key2,omitempty"` // I can't find an example using 2 keys at the moment + OpenTSDBKey1 string `json:"-"` + Key2OpenTSDBKey2 string `json:"-"` +} + +type MetricSpec struct { + Name string `json:"name"` + CalcType string `json:"calc_type"` + // The type of Stats.Values changes when there are keys added. It goes from []ints to an [][]structs, so the tag can be included + KeyPair + Percentiles []int64 `json:"percentiles,omitempty"` + // The following are not part of the extrahop API + OpenTSDBMetric string `json:"-"` +} + +type MetricStat struct { + Duration int64 `json:"duration"` + Oid int64 `json:"oid"` + Time int64 `json:"time"` +} + +type MetricStatSimple struct { + MetricStat + Values []float64 `json:"values"` +} + +type MetricStatKeyed struct { + MetricStat + Values [][]struct { + Key struct { + KeyType string `json:"key_type"` + Str string `json:"str"` + } `json:"key"` + Value int64 `json:"value"` + Vtype string `json:"vtype"` + } `json:"values"` +} + +type MetricResponseBase struct { + Cycle string `json:"cycle"` + From int64 `json:"from"` + NodeID int64 `json:"node_id"` + Until int64 `json:"until"` +} + +type MetricResponseSimple struct { + MetricResponseBase + Stats []MetricStatSimple `json:"stats"` +} + +type MetricResponseKeyed struct { + MetricResponseBase + Stats []MetricStatKeyed `json:"stats"` +} + +func (mr *MetricResponseSimple) OpenTSDBDataPoints(metricNames []string, objectKey string, objectIdToName map[int64]string) (opentsdb.MultiDataPoint, error) { + // Each position in Values should corespond to the order of + // of Specs object. So len of Values == len(mq.Specs) I think. + // Each item in Stats has a UID, which will map to the + // requested object IDs + var md opentsdb.MultiDataPoint + for _, s := range mr.Stats { + name, ok := objectIdToName[s.Oid] + var tagSet opentsdb.TagSet + if objectKey != "" && name != "" { + tagSet = opentsdb.TagSet{objectKey: name} + } else { + tagSet = opentsdb.TagSet{} + } + if !ok { + return md, fmt.Errorf("no name found for oid %s", s.Oid) + } + time := s.Time + if time < 1 { + return md, fmt.Errorf("encountered a time less than 1") + } + for i, v := range s.Values { + if len(metricNames) < i { + return md, fmt.Errorf("no corresponding metric name at index %v", i) + } + metricName := metricNames[i] + md = append(md, &opentsdb.DataPoint{ + Metric: metricName, + Timestamp: time / 1000, + Tags: tagSet, + Value: v, + }) + } + } + return md, nil +} + +// Simple Metric query is for when you are making a query that doesn't +// have any facets ("Keys"). +func (c *Client) SimpleMetricQuery(cycle, category, objectType string, fromMS, untilMS int64, metrics []MetricSpec, objectIds []int64) (MetricResponseSimple, error) { + mq := MetricQuery{ + Cycle: cycle, + Category: category, + ObjectIds: objectIds, + Type: objectType, + From: fromMS, + Until: untilMS, + } + for _, spec := range metrics { + mq.Specs = append(mq.Specs, spec) + } + m := MetricResponseSimple{} + err := c.post("metrics", &mq, &m) + return m, err +} + +// Keyed Metric query is for when you are making a query that has facets ("Keys"). For example bytes "By L7 Protocol" +func (mr *MetricResponseKeyed) OpenTSDBDataPoints(metrics []MetricSpec, objectKey string, objectIdToName map[int64]string) (opentsdb.MultiDataPoint, error) { + // Only tested against one key, didn't find example with 2 keys yet + var md opentsdb.MultiDataPoint + for _, s := range mr.Stats { + name, ok := objectIdToName[s.Oid] + if !ok { + return md, fmt.Errorf("no name found for oid %s", s.Oid) + } + time := s.Time + if time < 1 { + return md, fmt.Errorf("encountered a time less than 1") + } + for i, values := range s.Values { + if len(metrics) < i { + return md, fmt.Errorf("no corresponding metric name at index %v", i) + } + metricName := metrics[i].OpenTSDBMetric + key1 := metrics[i].OpenTSDBKey1 + for _, v := range values { + md = append(md, &opentsdb.DataPoint{ + Metric: metricName, + Timestamp: time / 1000, + Tags: opentsdb.TagSet{objectKey: name, key1: v.Key.Str}, + Value: v.Value, + }) + } + } + } + return md, nil +} + +func (c *Client) KeyedMetricQuery(cycle, category, objectType string, fromMS, untilMS int64, metrics []MetricSpec, + objectIds []int64) (MetricResponseKeyed, error) { + mq := MetricQuery{ + Cycle: cycle, + Category: category, + ObjectIds: objectIds, + Type: objectType, + From: fromMS, + Until: untilMS, + } + for _, spec := range metrics { + mq.Specs = append(mq.Specs, spec) + } + m := MetricResponseKeyed{} + err := c.post("metrics", &mq, &m) + return m, err +} + +type NetworkList []struct { + Id int64 `json:"id"` + NodeId int64 `json:"node_id"` + Description string `json:"description"` + Name string `json:"name"` + Idle bool `json:"idle"` + Vlans VlanList //This is not part of the JSON returned by ExtraHop's /network endpoint, so this gets populated by a 2nd step. +} + +type VlanList []struct { + Id int64 `json:"id"` + NetworkId int64 `json:"network_id"` + VlanId int64 `json:"vlanid"` + Name string `json:"name"` + Description string `json:"description"` +} + +func (c *Client) GetNetworkList(FetchVlans bool) (NetworkList, error) { + l := NetworkList{} + err := c.get("networks", "", &l) + if err != nil { + return nil, err + } + if FetchVlans { + for k, dp := range l { + err := c.GetVlanList(dp.Id, &dp.Vlans) + if err == nil { + l[k] = dp + } + } + } + return l, err +} + +func (c *Client) GetVlanList(NetworkId int64, l *VlanList) error { + url := fmt.Sprintf("networks/%d/vlans", NetworkId) + err := c.get(url, "", &l) + return err +} + +type ExtraHopMetric struct { + ObjectType string + ObjectId int64 + MetricCategory string + MetricSpecName string + MetricSpecCalcType string +} + +func StoEHMetric(i string) (ExtraHopMetric, error) { + v := strings.Split(i, ".") + + if len(v) < 4 || len(v) > 5 { + return ExtraHopMetric{}, errors.New(fmt.Sprintf("Provided metric (%s) had %d parts. Metric must have either 4 or 5 parts", i, len(v))) + } + + if len(v) == 4 { + v = append(v, "") + } + + oid, err := strconv.Atoi(v[1]) + + if err != nil { + return ExtraHopMetric{}, errors.New(fmt.Sprintf("Provided metric (%s) does not have a number as its second part (%s) ", i, v[1])) + } + + return ExtraHopMetric{ + ObjectType: v[0], + ObjectId: int64(oid), + MetricCategory: v[2], + MetricSpecName: v[3], + MetricSpecCalcType: v[4], + }, nil + +} diff --git a/cmd/scollector/collectors/extrahop.go b/cmd/scollector/collectors/extrahop.go new file mode 100644 index 0000000000..39fe61109d --- /dev/null +++ b/cmd/scollector/collectors/extrahop.go @@ -0,0 +1,172 @@ +package collectors + +import ( + "fmt" + "net/url" + "strings" + "time" + + "bosun.org/_third_party/github.com/kylebrandt/gohop" + "bosun.org/metadata" + "bosun.org/opentsdb" +) + +const extraHopIntervalSeconds int = 30 + +var extraHopFilterProtoBy string //What to filter the traffic by. Valid values are "namedprotocols", "toppercent" or "none" +var extraHopTopProtoPerc int //Only log the top % of protocols by volume +var extraHopOtherProtoName string //What name to log the "other" data under. +var extraHopL7Description string //What to append to the end of the L7 description metadata to explain what is and isn't filtered out + +//Register a collector for ExtraHop +func ExtraHop(host, apikey, filterby string, filterpercent int) error { + if host == "" || apikey == "" { + return fmt.Errorf("Empty host or API key for ExtraHop.") + } + extraHopFilterProtoBy = filterby + switch filterby { //Set up options + case "toppercent": + extraHopL7Description = fmt.Sprintf("Only the top %d percent of traffic has its protocols logged, the remainder is tagged as as proto=otherprotos", extraHopTopProtoPerc) + extraHopOtherProtoName = "otherprotos" + if filterpercent > 0 && filterpercent < 100 { + extraHopTopProtoPerc = filterpercent + } else { + return fmt.Errorf("Invalid ExtraHop FilterPercent value (%d). Number should be between 1 and 99.", filterpercent) + } + case "namedprotocols": + extraHopL7Description = "Only named protocols are logged. Any unnamed protocol (A protocol name starting with tcp, udp or ssl) is tagged as proto=unnamed" + extraHopOtherProtoName = "unnamed" + //There is also case "none", but in that case the options we need to keep as default, so there's actually nothing to do here. + default: + return fmt.Errorf("Invalid ExtraHop FilterBy option (%s). Valid options are namedprotocols, toppercent or none.", filterby) + + } + //Add the metadata for the L7 types, as now we have enough information to know what they're going to be + for l7type, l7s := range l7types { + xhMetricName := fmt.Sprintf("extrahop.l7.%s", l7type) + metadata.AddMeta(xhMetricName, nil, "rate", l7s.Rate, false) + metadata.AddMeta(xhMetricName, nil, "unit", l7s.Unit, false) + metadata.AddMeta(xhMetricName, nil, "desc", fmt.Sprintf("%s %s", l7s.Description, extraHopL7Description), false) + } + u, err := url.Parse(host) + if err != nil { + return err + } + collectors = append(collectors, &IntervalCollector{ + F: func() (opentsdb.MultiDataPoint, error) { + return c_extrahop(host, apikey) + }, + name: fmt.Sprintf("extrahop-%s", u.Host), + Interval: time.Second * time.Duration(extraHopIntervalSeconds), + }) + return nil + +} + +func c_extrahop(host, apikey string) (opentsdb.MultiDataPoint, error) { + c := gohop.NewClient(host, apikey) + var md opentsdb.MultiDataPoint + if err := extraHopNetworks(c, &md); err != nil { + return nil, err + } + return md, nil +} + +/* + This grabs the complex metrics of the L7 traffic from ExtraHop. It is a complex type because the data is not just a simple time series, + the data needs to be tagged with vlan, protocol, etc. We can do the network and vlan tagging ourselves, but the protocol tagging comes + from ExtraHop itself. +*/ +func extraHopNetworks(c *gohop.Client, md *opentsdb.MultiDataPoint) error { + nl, err := c.GetNetworkList(true) //Fetch the network list from ExtraHop, and include VLAN information + if err != nil { + return err + } + for _, net := range nl { //All found networks + for _, vlan := range net.Vlans { //All vlans inside this network + for l7type := range l7types { //All the types of data we want to retrieve for the vlan + xhMetricName := fmt.Sprintf("extrahop.l7.%s", l7type) + metricsDropped, metricsKept := 0, 0 //Counters for debugging purposes + otherValues := make(map[int64]int64) //Container to put any extra time series data that we need to add, for consolidating unnamed or dropped protocols, etc. + ms := []gohop.MetricSpec{ //Build a metric spec to tell ExtraHop what we want to grab from ExtraHop + {Name: l7type, KeyPair: gohop.KeyPair{Key1Regex: "", Key2Regex: "", OpenTSDBKey1: "proto", Key2OpenTSDBKey2: ""}, OpenTSDBMetric: xhMetricName}, //ExtraHop breaks this by L7 protocol on its own, but we need to tell TSDB what tag to add, which is in this case "proto" + } + mrk, err := c.KeyedMetricQuery(gohop.Cycle30Sec, "app", "vlan", int64(extraHopIntervalSeconds)*-1000, 0, ms, []int64{vlan.VlanId}) //Get the data from ExtraHop + if err != nil { + return err + } + md2, err := mrk.OpenTSDBDataPoints(ms, "vlan", map[int64]string{vlan.VlanId: fmt.Sprintf("%d", vlan.VlanId)}) //Get the OpenTSDBDataPoints from the ExtraHop data + if err != nil { + return err + } + valueCutoff := calculateDataCutoff(mrk) //Calculate what the cutoff value will be (used later on when we decide whether or not to consolidate the data) + for _, dp := range md2 { //We need to manually process the TSDB datapoints that we've got + dp.Tags["host"] = c.APIHost + dp.Tags["network"] = net.Name + switch extraHopFilterProtoBy { //These are our filter options from the the configuration file. Filter by %, named, or none + case "toppercent": //Only include protocols that make up a certain % of the traffic + if dp.Value.(int64) >= valueCutoff[dp.Timestamp] { //It's in the top percent so log it as-is + *md = append(*md, dp) + metricsKept++ + } else { + otherValues[dp.Timestamp] += dp.Value.(int64) + metricsDropped++ + } + case "namedprotocols": //Only include protocols that have an actual name (SSL443 excepted) + if strings.Index(dp.Tags["proto"], "tcp") != 0 && strings.Index(dp.Tags["proto"], "udp") != 0 && (strings.Index(dp.Tags["proto"], "SSL") != 0 || dp.Tags["proto"] == "SSL443") { //The first characters are not tcp or udp. + *md = append(*md, dp) + metricsKept++ + } else { + otherValues[dp.Timestamp] += dp.Value.(int64) + metricsDropped++ + } + case "none": //Log everything. Is OK for viewing short timespans, but calculating, 2,000+ protocols over a multi-day window is bad for Bosun's performance + *md = append(*md, dp) + metricsKept++ + } + + } + //Take the consolidated values and add them now too + for k, v := range otherValues { + *md = append(*md, &opentsdb.DataPoint{ + Metric: xhMetricName, + Timestamp: k, + Tags: opentsdb.TagSet{"vlan": fmt.Sprintf("%d", vlan.VlanId), "proto": extraHopOtherProtoName, "host": c.APIHost, "network": net.Name}, + Value: v, + }) + } + } + } + } + return nil +} + +//These are used when looping through which L7 traffic to get. We want byte counts and packet counts, and this is the metadata that goes with them. +var l7types = map[string]L7Stats{ + "bytes": {Rate: metadata.Gauge, Unit: metadata.Bytes, Description: "The number of bytes transmitted on this network.You can drill down by server, network, vlan and protocol for further investigations."}, + "pkts": {Rate: metadata.Gauge, Unit: metadata.Counter, Description: "The number of packets transmitted on this network. You can drill down by server, network, vlan and protocol for further investigations."}, +} + +type L7Stats struct { + Rate metadata.RateType + Unit metadata.Unit + Description string +} + +//Given the % value in the configuration file, calculate what the actual minimum value is for each of the time points returned by ExtraHop +func calculateDataCutoff(k gohop.MetricResponseKeyed) map[int64]int64 { + sums := make(map[int64]int64) + rets := make(map[int64]int64) + for _, dp := range k.Stats { + for _, dv := range dp.Values { + for _, dw := range dv { + sums[dp.Time/1000] += dw.Value + } + + } + } + for k, v := range sums { + rets[k] = int64(float64(v) * (1 - float64(extraHopTopProtoPerc)/100)) + } + return rets +} diff --git a/cmd/scollector/conf/conf.go b/cmd/scollector/conf/conf.go index 12c108ac4e..9927ace763 100644 --- a/cmd/scollector/conf/conf.go +++ b/cmd/scollector/conf/conf.go @@ -60,6 +60,7 @@ type Conf struct { GoogleAnalytics []GoogleAnalytics Cadvisor []Cadvisor RedisCounters []RedisCounters + ExtraHop []ExtraHop } type HAProxy struct { @@ -174,3 +175,10 @@ type RedisCounters struct { Server string Database int } + +type ExtraHop struct { + Host string + APIKey string + FilterBy string + FilterPercent int +} diff --git a/cmd/scollector/doc.go b/cmd/scollector/doc.go index ef1a561bbb..5ab1f7b1f3 100644 --- a/cmd/scollector/doc.go +++ b/cmd/scollector/doc.go @@ -247,6 +247,27 @@ RedisCounters: Reads a hash of metric/counters from a redis database. Expects data populated via bosun's udp listener in the "scollectorCounters" hash. +ExtraHop (array of table): ExtraHop hosts to poll. The two filter options specify how +scollector should filter out traffic from being submitted. The valid options are: + + - namedprotocols (Only protocols that have an explicit name are submitted. The rest of the + traffic will be pushed into proto=unnamed. So any protocol that begins with + "tcp", "udp" or "SSL" will not be submitted (with the exception of SSL443). + - toppercent (The top n% of traffic by volume will be submitted. The rest of the traffic + will be pushed into proto=otherproto) + - none (All protocols of any size will be submitted) + +FilterPercent applies when the FilterBy option is set to "toppercent". Only protocols that account +for this much traffic will be logged. For example, if this is set to 90, then if the protocol +accounts for less than 10% of the traffic, it will be dropped. This is OK if your traffic is +heavilly dominated by asmall set of protocols, but if you have a fairly even spread of protocols +then this filtering loses its usefulness. + + [[ExtraHop]] + Host = "extrahop01" + APIkey = "abcdef1234567890" + FilterBy = "toppercent" + FilterPercent = 75 Windows diff --git a/cmd/scollector/main.go b/cmd/scollector/main.go index c9015a5f15..613c9f5968 100644 --- a/cmd/scollector/main.go +++ b/cmd/scollector/main.go @@ -137,6 +137,11 @@ func main() { for _, r := range conf.Riak { check(collectors.Riak(r.URL)) } + + for _, x := range conf.ExtraHop { + check(collectors.ExtraHop(x.Host, x.APIKey, x.FilterBy, x.FilterPercent)) + } + if err != nil { slog.Fatal(err) }