diff --git a/Gopkg.lock b/Gopkg.lock index 70ea399e660..281c59d95b4 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -479,12 +479,12 @@ version = "v1.0.1" [[projects]] - digest = "1:f14364057165381ea296e49f8870a9ffce2b8a95e34d6ae06c759106aaef428c" + digest = "1:6eb58a12ff2e21abe77271d29981dfc74211ae13500a5f34d34c5a098945d8b7" name = "github.com/hashicorp/go-uuid" packages = ["."] pruneopts = "UT" - revision = "4f571afc59f3043a65f8fe6bf46d887b10a01d43" - version = "v1.0.1" + revision = "6195a4f20692188e0a389def25559731d1e130f9" + version = "v1.0.2" [[projects]] digest = "1:c0d19ab64b32ce9fe5cf4ddceba78d5bc9807f0016db6b1183599da3dcc24d10" @@ -606,7 +606,7 @@ [[projects]] branch = "master" - digest = "1:b0eb641b43b44b196880360efaf95e5916cb2bea747935932694b7003ea8adb3" + digest = "1:7463ef8877138e91bdb6e980dfcb6ce16358dd753f54c14dea23b0261e68f54a" name = "github.com/mmcloughlin/avo" packages = [ "attr", @@ -624,7 +624,7 @@ "x86", ] pruneopts = "UT" - revision = "205fc6a3d76b01529817bc64d037340b558d2c05" + revision = "b0ac74488c88a329ab76bdc649d3118b5ce2fcd3" [[projects]] digest = "1:66b0a65aba488ca6c72f77132d5b8d7e2c5baf07d577dee64502b69a2c90c791" @@ -748,8 +748,8 @@ "model", ] pruneopts = "UT" - revision = "287d3e634a1e550c9e463dd7e5a75a422c614505" - version = "v0.7.0" + revision = "629b6ff9b3aafafba54ab96663903fde9544f037" + version = "v0.8.0" [[projects]] digest = "1:ec0ff4bd619a67065e34d6477711ed0117e335f99059a4c508e0fe21cfe7b304" @@ -900,13 +900,14 @@ version = "v1.2.0" [[projects]] - digest = "1:b2871caffbaef2e8f6d6f42318b985a16297626df8669be365da99e9d4bab10f" + digest = "1:23117f249348b0f4c544a946e408801f2b8c79514620c1d8418ea195d2e16248" name = "github.com/uber/jaeger-client-go" packages = [ ".", "config", "internal/baggage", "internal/baggage/remote", + "internal/reporterstats", "internal/spanlog", "internal/throttler", "internal/throttler/remote", @@ -924,8 +925,8 @@ "utils", ] pruneopts = "UT" - revision = "f2e1f58485aacf2975cdde9c9f5396e6d98c35ba" - version = "v2.21.1" + revision = "56611ffff7c29c5b9aa926b84faadda4a609cc59" + version = "v2.22.0" [[projects]] digest = "1:abbb7762b4200f8b1d82193dcc02a3d8a62efc0a0ffc8ec4f4e9ef8abf797a9c" @@ -1047,7 +1048,7 @@ "pbkdf2", ] pruneopts = "UT" - revision = "5d647ca1575777a812e903a7e98177174d8c295a" + revision = "6d4e4cb37c7d6416dfea8472e751c7b6615267a6" [[projects]] branch = "master" @@ -1062,7 +1063,7 @@ [[projects]] branch = "master" - digest = "1:be532d3e419ae0a9001a05f78754b803473c76c55cfb94af715fc06eaf311b13" + digest = "1:a2a9476b79b62b73139a62e961257141f948a4434047964677220b2a0f01144f" name = "golang.org/x/net" packages = [ "bpf", @@ -1082,7 +1083,7 @@ "trace", ] pruneopts = "UT" - revision = "c0dbc17a35534bf2e581d7a942408dc936316da4" + revision = "6afb5195e5aab057fda82e27171243402346b0ad" [[projects]] branch = "master" @@ -1093,7 +1094,7 @@ "windows", ] pruneopts = "UT" - revision = "548cf772de5052aa878ccb47cdeb7d262b75c8ec" + revision = "86b910548bc16777f40503131aa424ae0a092199" [[projects]] digest = "1:66a2f252a58b4fbbad0e4e180e1d85a83c222b6bce09c3dcdef3dc87c72eda7c" @@ -1123,7 +1124,7 @@ [[projects]] branch = "master" - digest = "1:1025e9e880900028dc2ec8d648c486ba65e33c9bf1ce3498d3cc1984cd3a8a9c" + digest = "1:dbe7880b52645cfd2772470ca142d644847011c922585fe1e03076d0bc0472e8" name = "golang.org/x/tools" packages = [ "go/analysis", @@ -1144,7 +1145,7 @@ "internal/semver", ] pruneopts = "UT" - revision = "89082a3841783366cb82b3dc4ce9258fb3dad1a9" + revision = "fdfa0def04cc58c406724040930652748fdf97ab" [[projects]] branch = "master" @@ -1155,7 +1156,7 @@ "googleapis/rpc/status", ] pruneopts = "UT" - revision = "bd8f9a0ef82f9870cb10caef4f23c348069600cb" + revision = "e1de0a7b01eb2fc11d735e4bfb79d2e53ec9edb3" [[projects]] digest = "1:b2c9ea388537a402c7f618e11eda34ec5358b67c3c6aef9df113c7022b46e0b1" @@ -1240,7 +1241,7 @@ version = "v1.0.1" [[projects]] - digest = "1:dc01a587d07be012625ba63df6d4224ae6d7a83e79bfebde6d987c10538d66dd" + digest = "1:1d0a2dd8403cd5ee29761aadd7fd941be7dcf31e4ba77eeb923ac1d2b49611b6" name = "gopkg.in/jcmturner/gokrb5.v7" packages = [ "asn1tools", @@ -1275,8 +1276,8 @@ "types", ] pruneopts = "UT" - revision = "363118e62befa8a14ff01031c025026077fe5d6d" - version = "v7.3.0" + revision = "8a3a3d700460d6dee4e98b6c06bf26296d2fd2c8" + version = "v7.4.0" [[projects]] digest = "1:0f16d9c577198e3b8d3209f5a89aabe679525b2aba2a7548714e973035c0e232" diff --git a/Gopkg.toml b/Gopkg.toml index 7f53b7c2096..0e66a8e4d69 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -105,7 +105,7 @@ required = [ [[constraint]] name = "github.com/uber/jaeger-client-go" - version = "^2.19.0" + version = "^2.22.0" [[constraint]] name = "github.com/uber/jaeger-lib" diff --git a/Makefile b/Makefile index 5fc99a04da4..fc70a2084e4 100644 --- a/Makefile +++ b/Makefile @@ -203,6 +203,10 @@ docker-hotrod: GOOS=linux $(MAKE) build-examples docker build -t $(DOCKER_NAMESPACE)/example-hotrod:${DOCKER_TAG} ./examples/hotrod +.PHONY: run-all-in-one +run-all-in-one: + go run -tags ui ./cmd/all-in-one --log-level debug + .PHONY: build-ui build-ui: cd jaeger-ui && yarn install --frozen-lockfile && cd packages/jaeger-ui && yarn build diff --git a/cmd/agent/app/processors/thrift_processor.go b/cmd/agent/app/processors/thrift_processor.go index 068b74ffe43..db0032e38f0 100644 --- a/cmd/agent/app/processors/thrift_processor.go +++ b/cmd/agent/app/processors/thrift_processor.go @@ -44,7 +44,10 @@ type ThriftProcessor struct { } } -// AgentProcessor handler used by the processor to process thrift and call the reporter with the deserialized struct +// AgentProcessor handler used by the processor to process thrift and call the reporter +// with the deserialized struct. This interface is implemented directly by Thrift generated +// code, e.g. jaegerThrift.NewAgentProcessor(handler), where handler implements the Agent +// Thrift service interface, which is invoked with the the deserialized struct. type AgentProcessor interface { Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) } diff --git a/cmd/agent/app/reporter/client_metrics.go b/cmd/agent/app/reporter/client_metrics.go new file mode 100644 index 00000000000..fbeafb9409b --- /dev/null +++ b/cmd/agent/app/reporter/client_metrics.go @@ -0,0 +1,229 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "sync" + "time" + + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" +) + +const ( + defaultExpireFrequency = 15 * time.Minute + defaultExpireTTL = time.Hour +) + +// clientMetrics are maintained only for data submitted in Jaeger Thrift format. +type clientMetrics struct { + BatchesSent metrics.Counter `metric:"batches_sent" help:"Total count of batches sent by clients"` + ConnectedClients metrics.Gauge `metric:"connected_clients" help:"Total count of unique clients sending data to the agent"` + + // NB: The following three metrics all have the same name, but different "cause" tags. + // Only the first one is given a "help" struct tag, because Prometheus client combines + // them into one help entry in the /metrics endpoint, e.g. + // + // # HELP jaeger_agent_client_stats_spans_dropped_total Total count of spans dropped by clients + // # TYPE jaeger_agent_client_stats_spans_dropped_total counter + // jaeger_agent_client_stats_spans_dropped_total{cause="full-queue"} 0 + // jaeger_agent_client_stats_spans_dropped_total{cause="send-failure"} 0 + // jaeger_agent_client_stats_spans_dropped_total{cause="too-large"} 0 + + // Total count of spans dropped by clients because their internal queue were full. + FullQueueDroppedSpans metrics.Counter `metric:"spans_dropped" tags:"cause=full-queue" help:"Total count of spans dropped by clients"` + + // Total count of spans dropped by clients because they were larger than max packet size. + TooLargeDroppedSpans metrics.Counter `metric:"spans_dropped" tags:"cause=too-large"` + + // Total count of spans dropped by clients because they failed Thrift encoding or submission. + FailedToEmitSpans metrics.Counter `metric:"spans_dropped" tags:"cause=send-failure"` +} + +type lastReceivedClientStats struct { + lock sync.Mutex + lastUpdated time.Time + + // Thrift stats are reported as signed i64, so keep the type to avoid multiple conversions back and forth. + batchSeqNo int64 + fullQueueDroppedSpans int64 + tooLargeDroppedSpans int64 + failedToEmitSpans int64 +} + +// ClientMetricsReporter is a decorator that emits data loss metrics on behalf of clients. +// The clients must send a Process.Tag `client-uuid` with a unique string for each client instance. +type ClientMetricsReporter struct { + params ClientMetricsReporterParams + clientMetrics *clientMetrics + shutdown chan struct{} + closed *atomic.Bool + + // map from client-uuid to *lastReceivedClientStats + lastReceivedClientStats sync.Map +} + +// ClientMetricsReporterParams is used as input to WrapWithClientMetrics. +type ClientMetricsReporterParams struct { + Reporter Reporter // required + Logger *zap.Logger // required + MetricsFactory metrics.Factory // required + ExpireFrequency time.Duration + ExpireTTL time.Duration +} + +// WrapWithClientMetrics creates ClientMetricsReporter. +func WrapWithClientMetrics(params ClientMetricsReporterParams) *ClientMetricsReporter { + if params.ExpireFrequency == 0 { + params.ExpireFrequency = defaultExpireFrequency + } + if params.ExpireTTL == 0 { + params.ExpireTTL = defaultExpireTTL + } + cm := new(clientMetrics) + metrics.MustInit(cm, params.MetricsFactory.Namespace(metrics.NSOptions{Name: "client_stats"}), nil) + r := &ClientMetricsReporter{ + params: params, + clientMetrics: cm, + shutdown: make(chan struct{}), + closed: atomic.NewBool(false), + } + go r.expireClientMetricsLoop() + return r +} + +// EmitZipkinBatch delegates to underlying Reporter. +func (r *ClientMetricsReporter) EmitZipkinBatch(spans []*zipkincore.Span) error { + return r.params.Reporter.EmitZipkinBatch(spans) +} + +// EmitBatch processes client data loss metrics and delegates to the underlying reporter. +func (r *ClientMetricsReporter) EmitBatch(batch *jaeger.Batch) error { + r.updateClientMetrics(batch) + return r.params.Reporter.EmitBatch(batch) +} + +// Close stops background gc goroutine for client stats map. +func (r *ClientMetricsReporter) Close() { + if r.closed.CAS(false, true) { + close(r.shutdown) + } +} + +func (r *ClientMetricsReporter) expireClientMetricsLoop() { + ticker := time.NewTicker(r.params.ExpireFrequency) + defer ticker.Stop() + for { + select { + case now := <-ticker.C: + r.expireClientMetrics(now) + case <-r.shutdown: + return + } + } +} + +func (r *ClientMetricsReporter) expireClientMetrics(t time.Time) { + var size int64 + r.lastReceivedClientStats.Range(func(k, v interface{}) bool { + stats := v.(*lastReceivedClientStats) + stats.lock.Lock() + defer stats.lock.Unlock() + + if !stats.lastUpdated.IsZero() && t.Sub(stats.lastUpdated) > r.params.ExpireTTL { + r.lastReceivedClientStats.Delete(k) + r.params.Logger.Debug("have not heard from a client for a while, freeing stats", + zap.Any("client-uuid", k), + zap.Time("last-message", stats.lastUpdated), + ) + } + size++ + return true // keep running through all values in the map + }) + r.clientMetrics.ConnectedClients.Update(size) +} + +func (r *ClientMetricsReporter) updateClientMetrics(batch *jaeger.Batch) { + clientUUID := clientUUID(batch) + if clientUUID == "" { + return + } + if batch.SeqNo == nil { + return + } + entry, found := r.lastReceivedClientStats.Load(clientUUID) + if !found { + ent, loaded := r.lastReceivedClientStats.LoadOrStore(clientUUID, &lastReceivedClientStats{}) + if !loaded { + r.params.Logger.Debug("received batch from a new client, starting to keep stats", + zap.String("client-uuid", clientUUID), + ) + } + entry = ent + } + clientStats := entry.(*lastReceivedClientStats) + clientStats.update(*batch.SeqNo, batch.Stats, r.clientMetrics) +} + +func (s *lastReceivedClientStats) update( + batchSeqNo int64, + stats *jaeger.ClientStats, + metrics *clientMetrics, +) { + s.lock.Lock() + defer s.lock.Unlock() + + if s.batchSeqNo >= batchSeqNo { + // Ignore out of order batches. Once we receive a batch with a larger-than-seen number, + // it will contain new cumulative counts, which we will use to update the metrics. + // That makes the metrics slightly off in time, but accurate in aggregate. + return + } + + metrics.BatchesSent.Inc(batchSeqNo - s.batchSeqNo) + + if stats != nil { + metrics.FailedToEmitSpans.Inc(stats.FailedToEmitSpans - s.failedToEmitSpans) + metrics.TooLargeDroppedSpans.Inc(stats.TooLargeDroppedSpans - s.tooLargeDroppedSpans) + metrics.FullQueueDroppedSpans.Inc(stats.FullQueueDroppedSpans - s.fullQueueDroppedSpans) + + s.failedToEmitSpans = stats.FailedToEmitSpans + s.tooLargeDroppedSpans = stats.TooLargeDroppedSpans + s.fullQueueDroppedSpans = stats.FullQueueDroppedSpans + } + + s.lastUpdated = time.Now() + s.batchSeqNo = batchSeqNo +} + +func clientUUID(batch *jaeger.Batch) string { + if batch.Process == nil { + return "" + } + for _, tag := range batch.Process.Tags { + if tag.Key != "client-uuid" { + continue + } + if tag.VStr == nil { + return "" + } + return *tag.VStr + } + return "" +} diff --git a/cmd/agent/app/reporter/client_metrics_test.go b/cmd/agent/app/reporter/client_metrics_test.go new file mode 100644 index 00000000000..709bb7a7d26 --- /dev/null +++ b/cmd/agent/app/reporter/client_metrics_test.go @@ -0,0 +1,258 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporter + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics/metricstest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + + "github.com/jaegertracing/jaeger/cmd/agent/app/testutils" + "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" +) + +type clientMetricsTest struct { + mr *testutils.InMemoryReporter + r *ClientMetricsReporter + logs *observer.ObservedLogs + mb *metricstest.Factory +} + +func (tr *clientMetricsTest) assertLog(t *testing.T, msg, clientUUID string) { + logs := tr.logs.FilterMessageSnippet(msg) + if clientUUID == "" { + assert.Equal(t, 0, logs.Len(), "not expecting log '%s", msg) + } else { + if assert.Equal(t, 1, logs.Len(), "expecting one log '%s'", msg) { + field := logs.All()[0].ContextMap()["client-uuid"] + assert.Equal(t, clientUUID, field, "client-uuid should be logged") + } + } +} + +func testClientMetrics(fn func(tr *clientMetricsTest)) { + testClientMetricsWithParams(ClientMetricsReporterParams{}, fn) +} + +func testClientMetricsWithParams(params ClientMetricsReporterParams, fn func(tr *clientMetricsTest)) { + r1 := testutils.NewInMemoryReporter() + zapCore, logs := observer.New(zap.DebugLevel) + mb := metricstest.NewFactory(time.Hour) + + params.Reporter = r1 + params.Logger = zap.New(zapCore) + params.MetricsFactory = mb + + r := WrapWithClientMetrics(params) + defer r.Close() + + tr := &clientMetricsTest{ + mr: r1, + r: r, + logs: logs, + mb: mb, + } + fn(tr) +} + +func TestClientMetricsReporter_Zipkin(t *testing.T) { + testClientMetrics(func(tr *clientMetricsTest) { + assert.NoError(t, tr.r.EmitZipkinBatch([]*zipkincore.Span{{}})) + assert.Len(t, tr.mr.ZipkinSpans(), 1) + }) +} + +func TestClientMetricsReporter_Jaeger(t *testing.T) { + testClientMetrics(func(tr *clientMetricsTest) { + blank := "" + clientUUID := "foobar" + nPtr := func(v int64) *int64 { return &v } + const prefix = "client_stats." + tag := func(name, value string) map[string]string { return map[string]string{name: value} } + + tests := []struct { + clientUUID *string + seqNo *int64 + stats *jaeger.ClientStats + runExpire bool // invoke expireClientMetrics to update the gauge + expLog string + expCounters []metricstest.ExpectedMetric + expGauges []metricstest.ExpectedMetric + }{ + {}, + {clientUUID: &blank}, + {clientUUID: &clientUUID}, + { + clientUUID: &clientUUID, + seqNo: nPtr(100), + expLog: clientUUID, + runExpire: true, + expCounters: []metricstest.ExpectedMetric{ + {Name: prefix + "batches_sent", Value: 100}, + }, + expGauges: []metricstest.ExpectedMetric{ + {Name: prefix + "connected_clients", Value: 1}, + }, + }, + { + clientUUID: &clientUUID, + seqNo: nPtr(101), + expCounters: []metricstest.ExpectedMetric{ + {Name: prefix + "batches_sent", Value: 101}, + }, + }, + { + clientUUID: &clientUUID, + seqNo: nPtr(90), // out of order batch will be ignored + expCounters: []metricstest.ExpectedMetric{ + {Name: prefix + "batches_sent", Value: 101}, // unchanged! + }, + }, + { + clientUUID: &clientUUID, + seqNo: nPtr(110), + stats: &jaeger.ClientStats{ + FullQueueDroppedSpans: 5, + TooLargeDroppedSpans: 6, + FailedToEmitSpans: 7, + }, expCounters: []metricstest.ExpectedMetric{ + {Name: prefix + "batches_sent", Value: 110}, + {Name: prefix + "spans_dropped", Tags: tag("cause", "full-queue"), Value: 5}, + {Name: prefix + "spans_dropped", Tags: tag("cause", "too-large"), Value: 6}, + {Name: prefix + "spans_dropped", Tags: tag("cause", "send-failure"), Value: 7}, + }, + }, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("iter%d", i), func(t *testing.T) { + tr.logs.TakeAll() + + batch := &jaeger.Batch{ + Spans: []*jaeger.Span{{}}, + Process: &jaeger.Process{ + ServiceName: "blah", + }, + SeqNo: test.seqNo, + Stats: test.stats, + } + if test.clientUUID != nil { + batch.Process.Tags = []*jaeger.Tag{{Key: "client-uuid", VStr: test.clientUUID}} + } + + err := tr.r.EmitBatch(batch) + assert.NoError(t, err) + assert.Len(t, tr.mr.Spans(), i+1) + + tr.assertLog(t, "new client", test.expLog) + + if test.runExpire { + tr.r.expireClientMetrics(time.Now()) + } + + if m := test.expCounters; len(m) > 0 { + tr.mb.AssertCounterMetrics(t, m...) + } + if m := test.expGauges; len(m) > 0 { + tr.mb.AssertGaugeMetrics(t, m...) + } + }) + } + }) +} + +func TestClientMetricsReporter_ClientUUID(t *testing.T) { + id := "my-client-id" + tests := []struct { + process *jaeger.Process + clientUUID string + }{ + {process: nil, clientUUID: ""}, + {process: &jaeger.Process{}, clientUUID: ""}, + {process: &jaeger.Process{Tags: []*jaeger.Tag{}}, clientUUID: ""}, + {process: &jaeger.Process{Tags: []*jaeger.Tag{{Key: "blah"}}}, clientUUID: ""}, + {process: &jaeger.Process{Tags: []*jaeger.Tag{{Key: "client-uuid"}}}, clientUUID: ""}, + {process: &jaeger.Process{Tags: []*jaeger.Tag{{Key: "client-uuid", VStr: &id}}}, clientUUID: id}, + } + for i, test := range tests { + t.Run(fmt.Sprintf("iter%d", i), func(t *testing.T) { + assert.Equal(t, test.clientUUID, clientUUID(&jaeger.Batch{Process: test.process})) + }) + } +} + +func TestClientMetricsReporter_Expire(t *testing.T) { + params := ClientMetricsReporterParams{ + ExpireFrequency: 100 * time.Microsecond, + ExpireTTL: 5 * time.Millisecond, + } + testClientMetricsWithParams(params, func(tr *clientMetricsTest) { + nPtr := func(v int64) *int64 { return &v } + clientUUID := "blah" + batch := &jaeger.Batch{ + Spans: []*jaeger.Span{{}}, + Process: &jaeger.Process{ + ServiceName: "blah", + Tags: []*jaeger.Tag{{Key: "client-uuid", VStr: &clientUUID}}, + }, + SeqNo: nPtr(1), + } + + err := tr.r.EmitBatch(batch) + assert.NoError(t, err) + assert.Len(t, tr.mr.Spans(), 1) + tr.mb.AssertCounterMetrics(t, + metricstest.ExpectedMetric{Name: "client_stats.batches_sent", Value: 1}) + + // here we test that a connected-client gauge is updated to 1 by the auto-scheduled expire loop, + // and then reset to 0 once the client entry expires. + tests := []struct { + expGauge int + expLog string + }{ + {expGauge: 1, expLog: "new client"}, + {expGauge: 0, expLog: "freeing stats"}, + } + start := time.Now() + for i, test := range tests { + t.Run(fmt.Sprintf("iter%d:gauge=%d,log=%s", i, test.expGauge, test.expLog), func(t *testing.T) { + // Expire loop runs every 100us, and removes the client after 5ms. + // We check for condition in each test for up to 5ms (10*500us). + for i := 0; i < 10; i++ { + _, gauges := tr.mb.Snapshot() + if gauges["client_stats.connected_clients"] == int64(test.expGauge) { + break + } + time.Sleep(500 * time.Microsecond) + } + tr.mb.AssertGaugeMetrics(t, + metricstest.ExpectedMetric{Name: "client_stats.connected_clients", Value: test.expGauge}) + tr.assertLog(t, test.expLog, clientUUID) + + // sleep between tests long enough to exceed the 5ms TTL. + if i == 0 { + elapsed := time.Since(start) + time.Sleep(5*time.Millisecond - elapsed) + } + }) + } + }) +} diff --git a/cmd/agent/app/reporter/grpc/collector_proxy.go b/cmd/agent/app/reporter/grpc/collector_proxy.go index 058dd4b0f1a..c09223a15a3 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy.go @@ -27,7 +27,7 @@ import ( // ProxyBuilder holds objects communicating with collector type ProxyBuilder struct { - reporter aReporter.Reporter + reporter *reporter.ClientMetricsReporter manager configmanager.ClientConfigManager conn *grpc.ClientConn } @@ -39,9 +39,16 @@ func NewCollectorProxy(builder *ConnBuilder, agentTags map[string]string, mFacto return nil, err } grpcMetrics := mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}}) + r1 := NewReporter(conn, agentTags, logger) + r2 := reporter.WrapWithMetrics(r1, grpcMetrics) + r3 := reporter.WrapWithClientMetrics(reporter.ClientMetricsReporterParams{ + Reporter: r2, + Logger: logger, + MetricsFactory: mFactory, + }) return &ProxyBuilder{ conn: conn, - reporter: reporter.WrapWithMetrics(NewReporter(conn, agentTags, logger), grpcMetrics), + reporter: r3, manager: configmanager.WrapWithMetrics(grpcManager.NewConfigManager(conn), grpcMetrics), }, nil } @@ -63,5 +70,6 @@ func (b ProxyBuilder) GetManager() configmanager.ClientConfigManager { // Close closes connections used by proxy. func (b ProxyBuilder) Close() error { + b.reporter.Close() return b.conn.Close() } diff --git a/cmd/agent/app/reporter/metrics.go b/cmd/agent/app/reporter/metrics.go index c4bf8a34ce7..2a52fd8bb87 100644 --- a/cmd/agent/app/reporter/metrics.go +++ b/cmd/agent/app/reporter/metrics.go @@ -26,7 +26,6 @@ const ( zipkinBatches = "zipkin" ) -// ReporterMetrics holds metrics related to reporter type batchMetrics struct { // Number of successful batch submissions to collector BatchesSubmitted metrics.Counter `metric:"batches.submitted"` @@ -47,6 +46,8 @@ type batchMetrics struct { // MetricsReporter is reporter with metrics integration. type MetricsReporter struct { wrapped Reporter + + // counters grouped by the type of data format (Jaeger or Zipkin). metrics map[string]batchMetrics } @@ -55,7 +56,12 @@ func WrapWithMetrics(reporter Reporter, mFactory metrics.Factory) *MetricsReport batchesMetrics := map[string]batchMetrics{} for _, s := range []string{zipkinBatches, jaegerBatches} { bm := batchMetrics{} - metrics.Init(&bm, mFactory.Namespace(metrics.NSOptions{Name: "reporter", Tags: map[string]string{"format": s}}), nil) + metrics.MustInit(&bm, + mFactory.Namespace(metrics.NSOptions{ + Name: "reporter", + Tags: map[string]string{"format": s}, + }), + nil) batchesMetrics[s] = bm } return &MetricsReporter{wrapped: reporter, metrics: batchesMetrics} @@ -64,7 +70,7 @@ func WrapWithMetrics(reporter Reporter, mFactory metrics.Factory) *MetricsReport // EmitZipkinBatch emits batch to collector. func (r *MetricsReporter) EmitZipkinBatch(spans []*zipkincore.Span) error { err := r.wrapped.EmitZipkinBatch(spans) - withMetrics(r.metrics[zipkinBatches], int64(len(spans)), err) + updateMetrics(r.metrics[zipkinBatches], int64(len(spans)), err) return err } @@ -75,11 +81,11 @@ func (r *MetricsReporter) EmitBatch(batch *jaeger.Batch) error { size = int64(len(batch.GetSpans())) } err := r.wrapped.EmitBatch(batch) - withMetrics(r.metrics[jaegerBatches], size, err) + updateMetrics(r.metrics[jaegerBatches], size, err) return err } -func withMetrics(m batchMetrics, size int64, err error) { +func updateMetrics(m batchMetrics, size int64, err error) { if err != nil { m.BatchesFailures.Inc(1) m.SpansFailures.Inc(size) diff --git a/cmd/agent/app/reporter/metrics_test.go b/cmd/agent/app/reporter/metrics_test.go index b1241042493..de0331b35e9 100644 --- a/cmd/agent/app/reporter/metrics_test.go +++ b/cmd/agent/app/reporter/metrics_test.go @@ -26,24 +26,12 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -type noopReporter struct { - err error -} - -func (r *noopReporter) EmitZipkinBatch(spans []*zipkincore.Span) error { - return r.err -} - -func (r *noopReporter) EmitBatch(batch *jaeger.Batch) error { - return r.err -} - func TestMetricsReporter(t *testing.T) { tests := []struct { expectedCounters []metricstest.ExpectedMetric expectedGauges []metricstest.ExpectedMetric action func(reporter Reporter) - rep *noopReporter + rep *mockReporter }{ {expectedCounters: []metricstest.ExpectedMetric{ {Name: "reporter.batches.submitted", Tags: map[string]string{"format": "jaeger"}, Value: 1}, @@ -55,7 +43,7 @@ func TestMetricsReporter(t *testing.T) { }, action: func(reporter Reporter) { err := reporter.EmitBatch(nil) require.NoError(t, err) - }, rep: &noopReporter{}}, + }, rep: &mockReporter{}}, {expectedCounters: []metricstest.ExpectedMetric{ {Name: "reporter.batches.submitted", Tags: map[string]string{"format": "jaeger"}, Value: 1}, {Name: "reporter.batches.failures", Tags: map[string]string{"format": "jaeger"}, Value: 0}, @@ -66,7 +54,7 @@ func TestMetricsReporter(t *testing.T) { }, action: func(reporter Reporter) { err := reporter.EmitBatch(&jaeger.Batch{Spans: []*jaeger.Span{{}}}) require.NoError(t, err) - }, rep: &noopReporter{}}, + }, rep: &mockReporter{}}, {expectedCounters: []metricstest.ExpectedMetric{ {Name: "reporter.batches.submitted", Tags: map[string]string{"format": "zipkin"}, Value: 1}, {Name: "reporter.batches.failures", Tags: map[string]string{"format": "zipkin"}, Value: 0}, @@ -77,7 +65,7 @@ func TestMetricsReporter(t *testing.T) { }, action: func(reporter Reporter) { err := reporter.EmitZipkinBatch(nil) require.NoError(t, err) - }, rep: &noopReporter{}}, + }, rep: &mockReporter{}}, {expectedCounters: []metricstest.ExpectedMetric{ {Name: "reporter.batches.submitted", Tags: map[string]string{"format": "zipkin"}, Value: 1}, {Name: "reporter.batches.failures", Tags: map[string]string{"format": "zipkin"}, Value: 0}, @@ -88,7 +76,7 @@ func TestMetricsReporter(t *testing.T) { }, action: func(reporter Reporter) { err := reporter.EmitZipkinBatch([]*zipkincore.Span{{}}) require.NoError(t, err) - }, rep: &noopReporter{}}, + }, rep: &mockReporter{}}, {expectedCounters: []metricstest.ExpectedMetric{ {Name: "reporter.batches.submitted", Tags: map[string]string{"format": "jaeger"}, Value: 0}, {Name: "reporter.batches.failures", Tags: map[string]string{"format": "jaeger"}, Value: 1}, @@ -99,7 +87,7 @@ func TestMetricsReporter(t *testing.T) { }, action: func(reporter Reporter) { err := reporter.EmitBatch(&jaeger.Batch{Spans: []*jaeger.Span{{}}}) require.Error(t, err) - }, rep: &noopReporter{err: errors.New("foo")}}, + }, rep: &mockReporter{err: errors.New("foo")}}, {expectedCounters: []metricstest.ExpectedMetric{ {Name: "reporter.batches.failures", Tags: map[string]string{"format": "zipkin"}, Value: 1}, {Name: "reporter.spans.failures", Tags: map[string]string{"format": "zipkin"}, Value: 2}, @@ -108,7 +96,7 @@ func TestMetricsReporter(t *testing.T) { }, action: func(reporter Reporter) { err := reporter.EmitZipkinBatch([]*zipkincore.Span{{}, {}}) require.Error(t, err) - }, rep: &noopReporter{errors.New("foo")}}, + }, rep: &mockReporter{errors.New("foo")}}, } for _, test := range tests { diff --git a/cmd/agent/app/reporter/reporter_test.go b/cmd/agent/app/reporter/reporter_test.go index 6529a07a6fe..37ba306e23c 100644 --- a/cmd/agent/app/reporter/reporter_test.go +++ b/cmd/agent/app/reporter/reporter_test.go @@ -49,7 +49,7 @@ func TestMultiReporter(t *testing.T) { func TestMultiReporterErrors(t *testing.T) { errMsg := "doh!" err := errors.New(errMsg) - r1, r2 := alwaysFailReporter{err: err}, alwaysFailReporter{err: err} + r1, r2 := mockReporter{err: err}, mockReporter{err: err} r := NewMultiReporter(r1, r2) e1 := r.EmitZipkinBatch([]*zipkincore.Span{ {}, @@ -63,14 +63,14 @@ func TestMultiReporterErrors(t *testing.T) { assert.EqualError(t, e2, fmt.Sprintf("[%s, %s]", errMsg, errMsg)) } -type alwaysFailReporter struct { +type mockReporter struct { err error } -func (r alwaysFailReporter) EmitZipkinBatch(spans []*zipkincore.Span) error { +func (r mockReporter) EmitZipkinBatch(spans []*zipkincore.Span) error { return r.err } -func (r alwaysFailReporter) EmitBatch(batch *jaeger.Batch) error { +func (r mockReporter) EmitBatch(batch *jaeger.Batch) error { return r.err } diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy.go b/cmd/agent/app/reporter/tchannel/collector_proxy.go index 8b044d3558f..b8e3e082cc6 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy.go +++ b/cmd/agent/app/reporter/tchannel/collector_proxy.go @@ -25,22 +25,28 @@ import ( // ProxyBuilder holds objects communicating with collector type ProxyBuilder struct { - reporter reporter.Reporter + reporter *reporter.ClientMetricsReporter manager configmanager.ClientConfigManager tchanRep *Reporter } // NewCollectorProxy creates ProxyBuilder func NewCollectorProxy(builder *Builder, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { - r, err := builder.CreateReporter(logger) + tchanRep, err := builder.CreateReporter(logger) if err != nil { return nil, err } - tchannelMetrics := mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "tchannel"}}) + tchanMetrics := mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "tchannel"}}) + r1 := reporter.WrapWithMetrics(tchanRep, tchanMetrics) + r2 := reporter.WrapWithClientMetrics(reporter.ClientMetricsReporterParams{ + Reporter: r1, + Logger: logger, + MetricsFactory: mFactory, + }) return &ProxyBuilder{ - tchanRep: r, - reporter: reporter.WrapWithMetrics(r, tchannelMetrics), - manager: configmanager.WrapWithMetrics(tchannel.NewConfigManager(r.CollectorServiceName(), r.Channel()), tchannelMetrics), + tchanRep: tchanRep, + reporter: r2, + manager: configmanager.WrapWithMetrics(tchannel.NewConfigManager(tchanRep.CollectorServiceName(), tchanRep.Channel()), tchanMetrics), }, nil } @@ -57,5 +63,6 @@ func (b ProxyBuilder) GetManager() configmanager.ClientConfigManager { // Close closes connections used by proxy. func (b ProxyBuilder) Close() error { b.tchanRep.Channel().Close() + b.reporter.Close() return nil } diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go index db1dae4e622..a9bdd762bef 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go +++ b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go @@ -42,7 +42,7 @@ func TestCreate(t *testing.T) { require.NoError(t, err) assert.NotNil(t, b) r, _ := cfg.CreateReporter(logger) - assert.Equal(t, reporter.WrapWithMetrics(r, mFactory), b.GetReporter()) + assert.IsType(t, new(reporter.ClientMetricsReporter), b.GetReporter()) m := tchannel.NewConfigManager(r.CollectorServiceName(), r.Channel()) assert.Equal(t, configmanager.WrapWithMetrics(m, mFactory), b.GetManager()) assert.Nil(t, b.Close()) diff --git a/idl b/idl index e85e7d60535..cfd3d58c9ac 160000 --- a/idl +++ b/idl @@ -1 +1 @@ -Subproject commit e85e7d605353e91e970d9178813c921117ca6974 +Subproject commit cfd3d58c9ac66bb410ff378de1d77ad378142c15 diff --git a/thrift-gen/agent/agent.go b/thrift-gen/agent/agent.go index 7ce6521d51d..874b14723aa 100644 --- a/thrift-gen/agent/agent.go +++ b/thrift-gen/agent/agent.go @@ -6,7 +6,6 @@ package agent import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" diff --git a/thrift-gen/agent/constants.go b/thrift-gen/agent/constants.go index 5c3ebdca92b..3cfe4787f7f 100644 --- a/thrift-gen/agent/constants.go +++ b/thrift-gen/agent/constants.go @@ -6,7 +6,6 @@ package agent import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" diff --git a/thrift-gen/agent/ttypes.go b/thrift-gen/agent/ttypes.go index 5f27c397147..7ad797b0480 100644 --- a/thrift-gen/agent/ttypes.go +++ b/thrift-gen/agent/ttypes.go @@ -6,7 +6,6 @@ package agent import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" diff --git a/thrift-gen/baggage/baggagerestrictionmanager.go b/thrift-gen/baggage/baggagerestrictionmanager.go index 69bc8841b39..1931cd72c48 100644 --- a/thrift-gen/baggage/baggagerestrictionmanager.go +++ b/thrift-gen/baggage/baggagerestrictionmanager.go @@ -6,7 +6,6 @@ package baggage import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" ) diff --git a/thrift-gen/baggage/constants.go b/thrift-gen/baggage/constants.go index 425a621532d..6668424a59c 100644 --- a/thrift-gen/baggage/constants.go +++ b/thrift-gen/baggage/constants.go @@ -6,7 +6,6 @@ package baggage import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" ) diff --git a/thrift-gen/baggage/tchan-baggage.go b/thrift-gen/baggage/tchan-baggage.go index f273df890aa..7ceea4d1531 100644 --- a/thrift-gen/baggage/tchan-baggage.go +++ b/thrift-gen/baggage/tchan-baggage.go @@ -43,10 +43,6 @@ func (c *tchanBaggageRestrictionManagerClient) GetBaggageRestrictions(ctx thrift } success, err := c.client.Call(ctx, c.thriftService, "getBaggageRestrictions", &args, &resp) if err == nil && !success { - switch { - default: - err = fmt.Errorf("received no result or unknown exception for getBaggageRestrictions") - } } return resp.GetSuccess(), err diff --git a/thrift-gen/baggage/ttypes.go b/thrift-gen/baggage/ttypes.go index c0169dd449e..be442fbd91a 100644 --- a/thrift-gen/baggage/ttypes.go +++ b/thrift-gen/baggage/ttypes.go @@ -6,7 +6,6 @@ package baggage import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" ) diff --git a/thrift-gen/jaeger/collector.go b/thrift-gen/jaeger/collector.go index f9bd36f7ce8..d5750bd9bad 100644 --- a/thrift-gen/jaeger/collector.go +++ b/thrift-gen/jaeger/collector.go @@ -6,7 +6,6 @@ package jaeger import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" ) diff --git a/thrift-gen/jaeger/constants.go b/thrift-gen/jaeger/constants.go index 414aa673ba9..250474222a4 100644 --- a/thrift-gen/jaeger/constants.go +++ b/thrift-gen/jaeger/constants.go @@ -6,7 +6,6 @@ package jaeger import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" ) diff --git a/thrift-gen/jaeger/tchan-jaeger.go b/thrift-gen/jaeger/tchan-jaeger.go index 89e1e9cdab1..393b8f10ebf 100644 --- a/thrift-gen/jaeger/tchan-jaeger.go +++ b/thrift-gen/jaeger/tchan-jaeger.go @@ -43,10 +43,6 @@ func (c *tchanCollectorClient) SubmitBatches(ctx thrift.Context, batches []*Batc } success, err := c.client.Call(ctx, c.thriftService, "submitBatches", &args, &resp) if err == nil && !success { - switch { - default: - err = fmt.Errorf("received no result or unknown exception for submitBatches") - } } return resp.GetSuccess(), err diff --git a/thrift-gen/jaeger/ttypes.go b/thrift-gen/jaeger/ttypes.go index 836afc1b577..bb07424c441 100644 --- a/thrift-gen/jaeger/ttypes.go +++ b/thrift-gen/jaeger/ttypes.go @@ -6,7 +6,6 @@ package jaeger import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" ) @@ -1577,12 +1576,193 @@ func (p *Process) String() string { return fmt.Sprintf("Process(%+v)", *p) } +// Attributes: +// - FullQueueDroppedSpans +// - TooLargeDroppedSpans +// - FailedToEmitSpans +type ClientStats struct { + FullQueueDroppedSpans int64 `thrift:"fullQueueDroppedSpans,1,required" json:"fullQueueDroppedSpans"` + TooLargeDroppedSpans int64 `thrift:"tooLargeDroppedSpans,2,required" json:"tooLargeDroppedSpans"` + FailedToEmitSpans int64 `thrift:"failedToEmitSpans,3,required" json:"failedToEmitSpans"` +} + +func NewClientStats() *ClientStats { + return &ClientStats{} +} + +func (p *ClientStats) GetFullQueueDroppedSpans() int64 { + return p.FullQueueDroppedSpans +} + +func (p *ClientStats) GetTooLargeDroppedSpans() int64 { + return p.TooLargeDroppedSpans +} + +func (p *ClientStats) GetFailedToEmitSpans() int64 { + return p.FailedToEmitSpans +} +func (p *ClientStats) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + var issetFullQueueDroppedSpans bool = false + var issetTooLargeDroppedSpans bool = false + var issetFailedToEmitSpans bool = false + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.readField1(iprot); err != nil { + return err + } + issetFullQueueDroppedSpans = true + case 2: + if err := p.readField2(iprot); err != nil { + return err + } + issetTooLargeDroppedSpans = true + case 3: + if err := p.readField3(iprot); err != nil { + return err + } + issetFailedToEmitSpans = true + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + if !issetFullQueueDroppedSpans { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field FullQueueDroppedSpans is not set")) + } + if !issetTooLargeDroppedSpans { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field TooLargeDroppedSpans is not set")) + } + if !issetFailedToEmitSpans { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field FailedToEmitSpans is not set")) + } + return nil +} + +func (p *ClientStats) readField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 1: ", err) + } else { + p.FullQueueDroppedSpans = v + } + return nil +} + +func (p *ClientStats) readField2(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 2: ", err) + } else { + p.TooLargeDroppedSpans = v + } + return nil +} + +func (p *ClientStats) readField3(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 3: ", err) + } else { + p.FailedToEmitSpans = v + } + return nil +} + +func (p *ClientStats) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("ClientStats"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if err := p.writeField1(oprot); err != nil { + return err + } + if err := p.writeField2(oprot); err != nil { + return err + } + if err := p.writeField3(oprot); err != nil { + return err + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *ClientStats) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("fullQueueDroppedSpans", thrift.I64, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:fullQueueDroppedSpans: ", p), err) + } + if err := oprot.WriteI64(int64(p.FullQueueDroppedSpans)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.fullQueueDroppedSpans (1) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:fullQueueDroppedSpans: ", p), err) + } + return err +} + +func (p *ClientStats) writeField2(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("tooLargeDroppedSpans", thrift.I64, 2); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:tooLargeDroppedSpans: ", p), err) + } + if err := oprot.WriteI64(int64(p.TooLargeDroppedSpans)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.tooLargeDroppedSpans (2) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 2:tooLargeDroppedSpans: ", p), err) + } + return err +} + +func (p *ClientStats) writeField3(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("failedToEmitSpans", thrift.I64, 3); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:failedToEmitSpans: ", p), err) + } + if err := oprot.WriteI64(int64(p.FailedToEmitSpans)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.failedToEmitSpans (3) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 3:failedToEmitSpans: ", p), err) + } + return err +} + +func (p *ClientStats) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("ClientStats(%+v)", *p) +} + // Attributes: // - Process // - Spans +// - SeqNo +// - Stats type Batch struct { - Process *Process `thrift:"process,1,required" json:"process"` - Spans []*Span `thrift:"spans,2,required" json:"spans"` + Process *Process `thrift:"process,1,required" json:"process"` + Spans []*Span `thrift:"spans,2,required" json:"spans"` + SeqNo *int64 `thrift:"seqNo,3" json:"seqNo,omitempty"` + Stats *ClientStats `thrift:"stats,4" json:"stats,omitempty"` } func NewBatch() *Batch { @@ -1601,10 +1781,36 @@ func (p *Batch) GetProcess() *Process { func (p *Batch) GetSpans() []*Span { return p.Spans } + +var Batch_SeqNo_DEFAULT int64 + +func (p *Batch) GetSeqNo() int64 { + if !p.IsSetSeqNo() { + return Batch_SeqNo_DEFAULT + } + return *p.SeqNo +} + +var Batch_Stats_DEFAULT *ClientStats + +func (p *Batch) GetStats() *ClientStats { + if !p.IsSetStats() { + return Batch_Stats_DEFAULT + } + return p.Stats +} func (p *Batch) IsSetProcess() bool { return p.Process != nil } +func (p *Batch) IsSetSeqNo() bool { + return p.SeqNo != nil +} + +func (p *Batch) IsSetStats() bool { + return p.Stats != nil +} + func (p *Batch) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -1632,6 +1838,14 @@ func (p *Batch) Read(iprot thrift.TProtocol) error { return err } issetSpans = true + case 3: + if err := p.readField3(iprot); err != nil { + return err + } + case 4: + if err := p.readField4(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -1681,6 +1895,23 @@ func (p *Batch) readField2(iprot thrift.TProtocol) error { return nil } +func (p *Batch) readField3(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 3: ", err) + } else { + p.SeqNo = &v + } + return nil +} + +func (p *Batch) readField4(iprot thrift.TProtocol) error { + p.Stats = &ClientStats{} + if err := p.Stats.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Stats), err) + } + return nil +} + func (p *Batch) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("Batch"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -1691,6 +1922,12 @@ func (p *Batch) Write(oprot thrift.TProtocol) error { if err := p.writeField2(oprot); err != nil { return err } + if err := p.writeField3(oprot); err != nil { + return err + } + if err := p.writeField4(oprot); err != nil { + return err + } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) } @@ -1734,6 +1971,36 @@ func (p *Batch) writeField2(oprot thrift.TProtocol) (err error) { return err } +func (p *Batch) writeField3(oprot thrift.TProtocol) (err error) { + if p.IsSetSeqNo() { + if err := oprot.WriteFieldBegin("seqNo", thrift.I64, 3); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:seqNo: ", p), err) + } + if err := oprot.WriteI64(int64(*p.SeqNo)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.seqNo (3) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 3:seqNo: ", p), err) + } + } + return err +} + +func (p *Batch) writeField4(oprot thrift.TProtocol) (err error) { + if p.IsSetStats() { + if err := oprot.WriteFieldBegin("stats", thrift.STRUCT, 4); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 4:stats: ", p), err) + } + if err := p.Stats.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Stats), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 4:stats: ", p), err) + } + } + return err +} + func (p *Batch) String() string { if p == nil { return "" diff --git a/thrift-gen/sampling/constants.go b/thrift-gen/sampling/constants.go index 1c6ade8e210..728988b8387 100644 --- a/thrift-gen/sampling/constants.go +++ b/thrift-gen/sampling/constants.go @@ -6,7 +6,6 @@ package sampling import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" ) diff --git a/thrift-gen/sampling/samplingmanager.go b/thrift-gen/sampling/samplingmanager.go index 1c28aaf0e30..563e2b4c9e6 100644 --- a/thrift-gen/sampling/samplingmanager.go +++ b/thrift-gen/sampling/samplingmanager.go @@ -6,7 +6,6 @@ package sampling import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" ) diff --git a/thrift-gen/sampling/tchan-sampling.go b/thrift-gen/sampling/tchan-sampling.go index 203b3049dee..d63cb9f311c 100644 --- a/thrift-gen/sampling/tchan-sampling.go +++ b/thrift-gen/sampling/tchan-sampling.go @@ -43,10 +43,6 @@ func (c *tchanSamplingManagerClient) GetSamplingStrategy(ctx thrift.Context, ser } success, err := c.client.Call(ctx, c.thriftService, "getSamplingStrategy", &args, &resp) if err == nil && !success { - switch { - default: - err = fmt.Errorf("received no result or unknown exception for getSamplingStrategy") - } } return resp.GetSuccess(), err diff --git a/thrift-gen/sampling/ttypes.go b/thrift-gen/sampling/ttypes.go index 490c4b07dcd..3e831af4a65 100644 --- a/thrift-gen/sampling/ttypes.go +++ b/thrift-gen/sampling/ttypes.go @@ -6,7 +6,6 @@ package sampling import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" ) diff --git a/thrift-gen/zipkincore/constants.go b/thrift-gen/zipkincore/constants.go index c84b0fdb309..0f0b6bc03b7 100644 --- a/thrift-gen/zipkincore/constants.go +++ b/thrift-gen/zipkincore/constants.go @@ -6,7 +6,6 @@ package zipkincore import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" ) diff --git a/thrift-gen/zipkincore/tchan-zipkincore.go b/thrift-gen/zipkincore/tchan-zipkincore.go index 0522ee1e1b4..590fcc27aa3 100644 --- a/thrift-gen/zipkincore/tchan-zipkincore.go +++ b/thrift-gen/zipkincore/tchan-zipkincore.go @@ -43,10 +43,6 @@ func (c *tchanZipkinCollectorClient) SubmitZipkinBatch(ctx thrift.Context, spans } success, err := c.client.Call(ctx, c.thriftService, "submitZipkinBatch", &args, &resp) if err == nil && !success { - switch { - default: - err = fmt.Errorf("received no result or unknown exception for submitZipkinBatch") - } } return resp.GetSuccess(), err diff --git a/thrift-gen/zipkincore/ttypes.go b/thrift-gen/zipkincore/ttypes.go index ed599ae44b9..bccf51f5627 100644 --- a/thrift-gen/zipkincore/ttypes.go +++ b/thrift-gen/zipkincore/ttypes.go @@ -6,7 +6,6 @@ package zipkincore import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" ) @@ -730,7 +729,7 @@ func (p *BinaryAnnotation) String() string { // precise value possible. For example, gettimeofday or syncing nanoTime // against a tick of currentTimeMillis. // -// For compatibilty with instrumentation that precede this field, collectors +// For compatibility with instrumentation that precede this field, collectors // or span stores can derive this via Annotation.timestamp. // For example, SERVER_RECV.timestamp or CLIENT_SEND.timestamp. // @@ -742,7 +741,7 @@ func (p *BinaryAnnotation) String() string { // precise measurement decoupled from problems of clocks, such as skew or NTP // updates causing time to move backwards. // -// For compatibilty with instrumentation that precede this field, collectors +// For compatibility with instrumentation that precede this field, collectors // or span stores can derive this by subtracting Annotation.timestamp. // For example, SERVER_SEND.timestamp - SERVER_RECV.timestamp. // diff --git a/thrift-gen/zipkincore/zipkincollector.go b/thrift-gen/zipkincore/zipkincollector.go index ff35f002bf1..bcc54f36efb 100644 --- a/thrift-gen/zipkincore/zipkincollector.go +++ b/thrift-gen/zipkincore/zipkincollector.go @@ -6,7 +6,6 @@ package zipkincore import ( "bytes" "fmt" - "github.com/apache/thrift/lib/go/thrift" )