diff --git a/config.go b/config.go index 46d663916..7b997acb1 100644 --- a/config.go +++ b/config.go @@ -113,5 +113,7 @@ type Config struct { } `yaml:"veneur_metrics_scopes"` XrayAddress string `yaml:"xray_address"` XrayAnnotationTags []string `yaml:"xray_annotation_tags"` + XrayFaultTag string `yaml:"xray_fault_tag"` XraySamplePercentage int `yaml:"xray_sample_percentage"` + XrayThrottleTag string `yaml:"xray_throttle_tag"` } diff --git a/example.yaml b/example.yaml index ac832a99c..b17cd4c1c 100644 --- a/example.yaml +++ b/example.yaml @@ -375,6 +375,16 @@ xray_sample_percentage: 100 xray_annotation_tags: - "" +# If not `""` then any spans with the specified tag key (values are ignored) +# will be marked as `throttle` when sent to X-Ray. Note that this tag will be +# not be included in the span's metadata. +xray_throttle_tag: "" + +# If not `""` then any spans with the specified tag key (values are ignored) +# will be marked as `fault` when sent to X-Ray. Note that this tag will be +# not be included in the span's metadata. +xray_fault_tag: "" + # == LightStep == # LightStep can be a sink for trace spans. diff --git a/server.go b/server.go index 6426057ff..93de27023 100644 --- a/server.go +++ b/server.go @@ -534,7 +534,7 @@ func NewFromConfig(logger *logrus.Logger, conf Config) (*Server, error) { annotationTags = append(annotationTags, strings.Split(tag, ":")[0]) } - xraySink, err := xray.NewXRaySpanSink(conf.XrayAddress, conf.XraySamplePercentage, ret.TagsAsMap, annotationTags, log) + xraySink, err := xray.NewXRaySpanSink(conf.XrayAddress, conf.XraySamplePercentage, ret.TagsAsMap, annotationTags, conf.XrayThrottleTag, conf.XrayFaultTag, log) if err != nil { return ret, err } diff --git a/sinks/xray/testdata/xray_segment.json b/sinks/xray/testdata/xray_segment.json index da918554c..8a77816ce 100644 --- a/sinks/xray/testdata/xray_segment.json +++ b/sinks/xray/testdata/xray_segment.json @@ -1,2 +1,2 @@ {"format": "json", "version": 1} -{"name":"farts-srv","id":"0000000000000002","trace_id":"1-5a7f1b99-000000003fdd0f60394d200c","parent_id":"0000000000000001","start_time":1518279577,"end_time":1518279579,"namespace":"remote","error":false,"annotations":{"baz":"qux","indicator":"false","mind":"crystal"},"metadata":{"baz":"qux","feelings":"magenta","foo":"bar","indicator":"false","mind":"crystal"},"http":{"request":{"url":"farts-srv:farting farty farts"},"response":{}}} +{"name":"farts-srv","id":"0000000000000002","trace_id":"1-5a7f1b99-000000003fdd0f60394d200c","parent_id":"0000000000000001","start_time":1518279577,"end_time":1518279579,"namespace":"remote","error":false,"throttle":false,"fault":false,"annotations":{"baz":"qux","indicator":"false","mind":"crystal"},"metadata":{"baz":"qux","feelings":"magenta","foo":"bar","indicator":"false","mind":"crystal"},"http":{"request":{"url":"farts-srv:farting farty farts"},"response":{}}} diff --git a/sinks/xray/testdata/xray_segment_fault.json b/sinks/xray/testdata/xray_segment_fault.json new file mode 100644 index 000000000..d2096fefa --- /dev/null +++ b/sinks/xray/testdata/xray_segment_fault.json @@ -0,0 +1,2 @@ +{"format": "json", "version": 1} +{"name":"farts-srv","id":"0000000000000002","trace_id":"1-5a7f1b99-000000003fdd0f60394d200c","parent_id":"0000000000000001","start_time":1518279577,"end_time":1518279579,"namespace":"remote","error":false,"throttle":false,"fault":true,"annotations":{"baz":"qux","indicator":"false","mind":"crystal"},"metadata":{"baz":"qux","feelings":"magenta","foo":"bar","indicator":"false","mind":"crystal"},"http":{"request":{"url":"farts-srv:farting farty farts"},"response":{}}} diff --git a/sinks/xray/testdata/xray_segment_throttle.json b/sinks/xray/testdata/xray_segment_throttle.json new file mode 100644 index 000000000..ba1e98f2b --- /dev/null +++ b/sinks/xray/testdata/xray_segment_throttle.json @@ -0,0 +1,2 @@ +{"format": "json", "version": 1} +{"name":"farts-srv","id":"0000000000000002","trace_id":"1-5a7f1b99-000000003fdd0f60394d200c","parent_id":"0000000000000001","start_time":1518279577,"end_time":1518279579,"namespace":"remote","error":false,"throttle":true,"fault":false,"annotations":{"baz":"qux","indicator":"false","mind":"crystal"},"metadata":{"baz":"qux","feelings":"magenta","foo":"bar","indicator":"false","mind":"crystal"},"http":{"request":{"url":"farts-srv:farting farty farts"},"response":{}}} diff --git a/sinks/xray/xray.go b/sinks/xray/xray.go index 9be75c365..172c16863 100644 --- a/sinks/xray/xray.go +++ b/sinks/xray/xray.go @@ -55,6 +55,8 @@ type XRaySegment struct { EndTime float64 `json:"end_time"` Namespace string `json:"namespace"` Error bool `json:"error"` + Throttle bool `json:"throttle"` + Fault bool `json:"fault"` Annotations map[string]string `json:"annotations,omitempty"` Metadata map[string]string `json:"metadata,omitempty"` HTTP XRaySegmentHTTP `json:"http,omitempty"` @@ -68,6 +70,8 @@ type XRaySpanSink struct { sampleThreshold uint32 commonTags map[string]string annotationTags map[string]struct{} + throttleTag string + faultTag string log *logrus.Logger spansDropped int64 spansHandled int64 @@ -77,7 +81,7 @@ type XRaySpanSink struct { var _ sinks.SpanSink = &XRaySpanSink{} // NewXRaySpanSink creates a new instance of a XRaySpanSink. -func NewXRaySpanSink(daemonAddr string, sampleRatePercentage int, commonTags map[string]string, annotationTags []string, log *logrus.Logger) (*XRaySpanSink, error) { +func NewXRaySpanSink(daemonAddr string, sampleRatePercentage int, commonTags map[string]string, annotationTags []string, throttleTag string, faultTag string, log *logrus.Logger) (*XRaySpanSink, error) { log.WithFields(logrus.Fields{ "Address": daemonAddr, @@ -116,6 +120,8 @@ func NewXRaySpanSink(daemonAddr string, sampleRatePercentage int, commonTags map commonTags: commonTags, log: log, nameRegex: reg, + throttleTag: throttleTag, + faultTag: faultTag, annotationTags: annotationTagsMap, }, nil } @@ -156,12 +162,24 @@ func (x *XRaySpanSink) Ingest(ssfSpan *ssf.SSFSpan) error { return nil } + // Set these so we can review them during the loop of tags + fault := false + throttle := false + metadata := map[string]string{} annotations := map[string]string{} for k, v := range x.commonTags { metadata[k] = v } for k, v := range ssfSpan.Tags { + if x.faultTag != "" && k == x.faultTag { + fault = true + continue // don't add it to anything + } + if x.throttleTag != "" && k == x.throttleTag { + throttle = true + continue // don't add it to anything + } if k != XRayTagNameClientIP { metadata[k] = v } @@ -200,6 +218,8 @@ func (x *XRaySpanSink) Ingest(ssfSpan *ssf.SSFSpan) error { Metadata: metadata, Namespace: "remote", Error: ssfSpan.Error, + Throttle: throttle, + Fault: fault, // Because X-Ray doesn't offer another way to get this data in, we pretend // it's HTTP for now. It's likely that as X-Ray and/or Veneur develop this // will change. diff --git a/sinks/xray/xray_test.go b/sinks/xray/xray_test.go index 5e191f9bd..782f2d384 100644 --- a/sinks/xray/xray_test.go +++ b/sinks/xray/xray_test.go @@ -18,11 +18,13 @@ import ( func TestConstructor(t *testing.T) { logger := logrus.StandardLogger() - sink, err := NewXRaySpanSink("127.0.0.1:2000", 100, map[string]string{"foo": "bar"}, nil, logger) + sink, err := NewXRaySpanSink("127.0.0.1:2000", 100, map[string]string{"foo": "bar"}, nil, "xray_throttle", "xray_fault", logger) assert.NoError(t, err) assert.Equal(t, "xray", sink.Name()) assert.Equal(t, "bar", sink.commonTags["foo"]) assert.Equal(t, "127.0.0.1:2000", sink.daemonAddr) + assert.Equal(t, "xray_throttle", sink.throttleTag) + assert.Equal(t, "xray_fault", sink.faultTag) } func TestIngestSpans(t *testing.T) { @@ -55,7 +57,7 @@ func TestIngestSpans(t *testing.T) { } }() - sink, err := NewXRaySpanSink(fmt.Sprintf("127.0.0.1:%d", port), 100, map[string]string{"foo": "bar"}, []string{"baz", "mind"}, logrus.New()) + sink, err := NewXRaySpanSink(fmt.Sprintf("127.0.0.1:%d", port), 100, map[string]string{"foo": "bar"}, []string{"baz", "mind"}, "xray_throttle", "xray_fault", logrus.New()) assert.NoError(t, err) err = sink.Start(nil) assert.NoError(t, err) @@ -126,7 +128,7 @@ func TestSampleSpans(t *testing.T) { } }() - sink, err := NewXRaySpanSink(fmt.Sprintf("127.0.0.1:%d", port), 50, map[string]string{"foo": "bar"}, []string{"baz", "mind"}, logrus.New()) + sink, err := NewXRaySpanSink(fmt.Sprintf("127.0.0.1:%d", port), 50, map[string]string{"foo": "bar"}, []string{"baz", "mind"}, "", "", logrus.New()) assert.NoError(t, err) err = sink.Start(nil) assert.NoError(t, err) @@ -197,3 +199,145 @@ func TestSampleSpans(t *testing.T) { sink.Flush() assert.Equal(t, int64(0), sink.spansHandled) } + +func TestThrottleSpan(t *testing.T) { + + // Load up a fixture to compare the output to what we get over UDP + reader, err := os.Open(filepath.Join("testdata", "xray_segment_throttle.json")) + assert.NoError(t, err) + defer reader.Close() + fixtureSegment, err := ioutil.ReadAll(reader) + assert.NoError(t, err) + + // Don't use a port so we get one auto-assigned + udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + assert.NoError(t, err) + sock, _ := net.ListenUDP("udp", udpAddr) + defer sock.Close() + // Grab the port we got assigned so we can use it. + port := sock.LocalAddr().(*net.UDPAddr).Port + + segments := make(chan string) + + buf := make([]byte, 1024) + go func() { + for { + n, _, serr := sock.ReadFromUDP(buf) + segments <- string(buf[0:n]) + if serr != nil { + assert.NoError(t, serr) + } + } + }() + + sink, err := NewXRaySpanSink(fmt.Sprintf("127.0.0.1:%d", port), 100, map[string]string{"foo": "bar"}, []string{"baz", "mind"}, "xray_throttle", "", logrus.New()) + assert.NoError(t, err) + err = sink.Start(nil) + assert.NoError(t, err) + + // Because xray uses the timestamp as part of the trace id, this must remain + // fixed for the fixture comparison to work! + start := time.Unix(1518279577, 0) + end := start.Add(2 * time.Second) + + testSpan := &ssf.SSFSpan{ + TraceId: 4601851300195147788, // This one will be sampled! + ParentId: 1, + Id: 2, + StartTimestamp: int64(start.UnixNano()), + EndTimestamp: int64(end.UnixNano()), + Error: false, + Service: "farts-srv", + Tags: map[string]string{ + "baz": "qux", + "mind": "crystal", + "feelings": "magenta", + "xray_throttle": "true", + }, + Indicator: false, + Name: "farting farty farts", + } + assert.NoError(t, sink.Ingest(testSpan)) + + select { + case seg := <-segments: + assert.Equal(t, strings.TrimSpace(string(fixtureSegment)), seg) + case <-time.After(1 * time.Second): + assert.Fail(t, "Did not receive segment from xray ingest") + } + + assert.Equal(t, int64(1), sink.spansHandled) + sink.Flush() + assert.Equal(t, int64(0), sink.spansHandled) +} + +func TestFaultSpan(t *testing.T) { + + // Load up a fixture to compare the output to what we get over UDP + reader, err := os.Open(filepath.Join("testdata", "xray_segment_fault.json")) + assert.NoError(t, err) + defer reader.Close() + fixtureSegment, err := ioutil.ReadAll(reader) + assert.NoError(t, err) + + // Don't use a port so we get one auto-assigned + udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + assert.NoError(t, err) + sock, _ := net.ListenUDP("udp", udpAddr) + defer sock.Close() + // Grab the port we got assigned so we can use it. + port := sock.LocalAddr().(*net.UDPAddr).Port + + segments := make(chan string) + + buf := make([]byte, 1024) + go func() { + for { + n, _, serr := sock.ReadFromUDP(buf) + segments <- string(buf[0:n]) + if serr != nil { + assert.NoError(t, serr) + } + } + }() + + sink, err := NewXRaySpanSink(fmt.Sprintf("127.0.0.1:%d", port), 100, map[string]string{"foo": "bar"}, []string{"baz", "mind"}, "", "xray_fault", logrus.New()) + assert.NoError(t, err) + err = sink.Start(nil) + assert.NoError(t, err) + + // Because xray uses the timestamp as part of the trace id, this must remain + // fixed for the fixture comparison to work! + start := time.Unix(1518279577, 0) + end := start.Add(2 * time.Second) + + testSpan := &ssf.SSFSpan{ + TraceId: 4601851300195147788, // This one will be sampled! + ParentId: 1, + Id: 2, + StartTimestamp: int64(start.UnixNano()), + EndTimestamp: int64(end.UnixNano()), + Error: false, + Service: "farts-srv", + Tags: map[string]string{ + "baz": "qux", + "mind": "crystal", + "feelings": "magenta", + "xray_fault": "true", + }, + Indicator: false, + Name: "farting farty farts", + } + assert.NoError(t, sink.Ingest(testSpan)) + + select { + case seg := <-segments: + assert.Equal(t, strings.TrimSpace(string(fixtureSegment)), seg) + case <-time.After(1 * time.Second): + assert.Fail(t, "Did not receive segment from xray ingest") + } + + assert.Equal(t, int64(1), sink.spansHandled) + sink.Flush() + assert.Equal(t, int64(0), sink.spansHandled) +}