diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 285d9364f8e..6c947809dc6 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -29,8 +29,6 @@ import ( "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" "github.com/jaegertracing/jaeger/cmd/agent/app/processors" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" - "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" - "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/cmd/agent/app/servers" "github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp" "github.com/jaegertracing/jaeger/ports" @@ -214,28 +212,24 @@ func defaultInt(value int, defaultVal int) int { return value } +// ProxyBuilderOptions holds config for CollectorProxyBuilder +type ProxyBuilderOptions struct { + reporter.Options + Logger *zap.Logger + Metrics metrics.Factory +} + +// CollectorProxyBuilder is a func which builds CollectorProxy. +type CollectorProxyBuilder func(ProxyBuilderOptions) (CollectorProxy, error) + // CreateCollectorProxy creates collector proxy func CreateCollectorProxy( - opts *reporter.Options, - tchanBuilder *tchannel.Builder, - grpcBuilder *grpc.ConnBuilder, - logger *zap.Logger, - mFactory metrics.Factory, + opts ProxyBuilderOptions, + builders map[reporter.Type]CollectorProxyBuilder, ) (CollectorProxy, error) { - // GRPC type is set as default in viper, but we check for legacy flags - // to keep backward compatibility - if opts.ReporterType == reporter.GRPC && - len(tchanBuilder.CollectorHostPorts) > 0 && - len(grpcBuilder.CollectorHostPorts) == 0 { - logger.Warn("Using deprecated configuration", zap.String("option", "--collector-host.port")) - return tchannel.NewCollectorProxy(tchanBuilder, mFactory, logger) - } - switch opts.ReporterType { - case reporter.GRPC: - return grpc.NewCollectorProxy(grpcBuilder, opts.AgentTags, mFactory, logger) - case reporter.TCHANNEL: - return tchannel.NewCollectorProxy(tchanBuilder, mFactory, logger) - default: + builder, ok := builders[opts.ReporterType] + if !ok { return nil, fmt.Errorf("unknown reporter type %s", string(opts.ReporterType)) } + return builder(opts) } diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index 0bea462db05..d9967f3c271 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -35,7 +35,8 @@ import ( "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" - "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" + "github.com/jaegertracing/jaeger/tchannel/agent/app/reporter/tchannel" + "github.com/jaegertracing/jaeger/tchannel/collector/app" "github.com/jaegertracing/jaeger/thrift-gen/baggage" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/sampling" @@ -201,8 +202,8 @@ func TestCreateCollectorProxy(t *testing.T) { err: "at least one collector hostPort address is required when resolver is not available", }, { - flags: []string{"--collector.host-port=foo"}, - metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1}, + flags: []string{"--collector.host-port=foo"}, + err: "at least one collector hostPort address is required when resolver is not available", }, { flags: []string{"--reporter.type=tchannel"}, @@ -213,12 +214,8 @@ func TestCreateCollectorProxy(t *testing.T) { metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1}, }, { - flags: []string{"--reporter.type=grpc", "--collector.host-port=foo"}, - metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1}, - }, - { - flags: []string{"--reporter.type=grpc", "--collector.host-port=foo"}, - metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1}, + flags: []string{"--reporter.type=grpc", "--collector.host-port=foo"}, + err: "at least one collector hostPort address is required when resolver is not available", }, { flags: []string{"--reporter.type=grpc", "--reporter.grpc.host-port=foo", "--collector.host-port=foo"}, @@ -235,6 +232,7 @@ func TestCreateCollectorProxy(t *testing.T) { tchannel.AddFlags(flags) grpc.AddFlags(flags) reporter.AddFlags(flags) + app.AddFlags(flags) command := cobra.Command{} command.PersistentFlags().AddGoFlagSet(flags) @@ -249,7 +247,16 @@ func TestCreateCollectorProxy(t *testing.T) { grpcBuilder := grpc.NewConnBuilder().InitFromViper(v) metricsFactory := metricstest.NewFactory(time.Microsecond) - proxy, err := CreateCollectorProxy(rOpts, tchan, grpcBuilder, zap.NewNop(), metricsFactory) + + builders := map[reporter.Type]CollectorProxyBuilder{ + reporter.GRPC: GRPCCollectorProxyBuilder(grpcBuilder), + tchannel.ReporterType: TCollectorProxyBuilder(tchan), + } + proxy, err := CreateCollectorProxy(ProxyBuilderOptions{ + Options: *rOpts, + Metrics: metricsFactory, + Logger: zap.NewNop(), + }, builders) if test.err != "" { assert.EqualError(t, err, test.err) assert.Nil(t, proxy) @@ -262,11 +269,14 @@ func TestCreateCollectorProxy(t *testing.T) { } func TestCreateCollectorProxy_UnknownReporter(t *testing.T) { - rOpts := new(reporter.Options) tchan := tchannel.NewBuilder() grpcBuilder := grpc.NewConnBuilder() - proxy, err := CreateCollectorProxy(rOpts, tchan, grpcBuilder, zap.NewNop(), metrics.NullFactory) + builders := map[reporter.Type]CollectorProxyBuilder{ + reporter.GRPC: GRPCCollectorProxyBuilder(grpcBuilder), + tchannel.ReporterType: TCollectorProxyBuilder(tchan), + } + proxy, err := CreateCollectorProxy(ProxyBuilderOptions{}, builders) assert.Nil(t, proxy) assert.EqualError(t, err, "unknown reporter type ") } diff --git a/cmd/agent/app/processors/thrift_processor_test.go b/cmd/agent/app/processors/thrift_processor_test.go index 6e7264f29f5..385e786b8e1 100644 --- a/cmd/agent/app/processors/thrift_processor_test.go +++ b/cmd/agent/app/processors/thrift_processor_test.go @@ -28,10 +28,10 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" - tchreporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/cmd/agent/app/servers" "github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp" "github.com/jaegertracing/jaeger/cmd/agent/app/testutils" + tchreporter "github.com/jaegertracing/jaeger/tchannel/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/thrift-gen/agent" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" diff --git a/cmd/agent/app/proxy_builders.go b/cmd/agent/app/proxy_builders.go new file mode 100644 index 00000000000..9c031a93cce --- /dev/null +++ b/cmd/agent/app/proxy_builders.go @@ -0,0 +1,34 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" + "github.com/jaegertracing/jaeger/tchannel/agent/app/reporter/tchannel" +) + +// GRPCCollectorProxyBuilder creates CollectorProxyBuilder for GRPC reporter +func GRPCCollectorProxyBuilder(builder *grpc.ConnBuilder) CollectorProxyBuilder { + return func(opts ProxyBuilderOptions) (proxy CollectorProxy, err error) { + return grpc.NewCollectorProxy(builder, opts.AgentTags, opts.Metrics, opts.Logger) + } +} + +// TCollectorProxyBuilder creates CollectorProxyBuilder for Tchannel reporter +func TCollectorProxyBuilder(builder *tchannel.Builder) CollectorProxyBuilder { + return func(opts ProxyBuilderOptions) (proxy CollectorProxy, err error) { + return tchannel.NewCollectorProxy(builder, opts.Metrics, opts.Logger) + } +} diff --git a/cmd/agent/app/reporter/flags.go b/cmd/agent/app/reporter/flags.go index cf395094bda..1ad6fc0f1f0 100644 --- a/cmd/agent/app/reporter/flags.go +++ b/cmd/agent/app/reporter/flags.go @@ -31,8 +31,6 @@ const ( // Agent tags agentTagsDeprecated = "jaeger.tags" agentTags = "agent.tags" - // TCHANNEL is name of tchannel reporter. - TCHANNEL Type = "tchannel" // GRPC is name of gRPC reporter. GRPC Type = "grpc" ) @@ -48,7 +46,7 @@ type Options struct { // AddFlags adds flags for Options. func AddFlags(flags *flag.FlagSet) { - flags.String(reporterType, string(GRPC), fmt.Sprintf("Reporter type to use e.g. %s, %s[%s]", string(GRPC), string(TCHANNEL), "NOTE: Deprecated since 1.16")) + flags.String(reporterType, string(GRPC), fmt.Sprintf("Reporter type to use e.g. %s", string(GRPC))) if !setupcontext.IsAllInOne() { flags.String(agentTagsDeprecated, "", "(deprecated) see --"+agentTags) flags.String(agentTags, "", "One or more tags to be added to the Process tags of all spans passing through this agent. Ex: key1=value1,key2=${envVar:defaultValue}") @@ -58,9 +56,6 @@ func AddFlags(flags *flag.FlagSet) { // InitFromViper initializes Options with properties retrieved from Viper. func (b *Options) InitFromViper(v *viper.Viper, logger *zap.Logger) *Options { b.ReporterType = Type(v.GetString(reporterType)) - if b.ReporterType == TCHANNEL { - logger.Warn("Using deprecated reporter type", zap.Any(reporterType, TCHANNEL)) - } if !setupcontext.IsAllInOne() { if len(v.GetString(agentTagsDeprecated)) > 0 { logger.Warn("Using deprecated configuration", zap.String("option", agentTagsDeprecated)) diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 1d0c2474519..66fa61a97c9 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -28,12 +28,12 @@ import ( "github.com/jaegertracing/jaeger/cmd/agent/app" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" - "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/cmd/docs" "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/version" "github.com/jaegertracing/jaeger/ports" + "github.com/jaegertracing/jaeger/tchannel/agent/app/reporter/tchannel" ) func main() { @@ -57,7 +57,15 @@ func main() { rOpts := new(reporter.Options).InitFromViper(v, logger) tchanBuilder := tchannel.NewBuilder().InitFromViper(v, logger) grpcBuilder := grpc.NewConnBuilder().InitFromViper(v) - cp, err := app.CreateCollectorProxy(rOpts, tchanBuilder, grpcBuilder, logger, mFactory) + builders := map[reporter.Type]app.CollectorProxyBuilder{ + reporter.GRPC: app.GRPCCollectorProxyBuilder(grpcBuilder), + tchannel.ReporterType: app.TCollectorProxyBuilder(tchanBuilder), + } + cp, err := app.CreateCollectorProxy(app.ProxyBuilderOptions{ + Options: *rOpts, + Logger: logger, + Metrics: mFactory, + }, builders) if err != nil { logger.Fatal("Could not create collector proxy", zap.Error(err)) } diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 3c62cfadbd5..fd3b52bbe1e 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -33,7 +33,6 @@ import ( agentApp "github.com/jaegertracing/jaeger/cmd/agent/app" agentRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" agentGrpcRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" - agentTchanRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/cmd/all-in-one/setupcontext" collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app" "github.com/jaegertracing/jaeger/cmd/docs" @@ -50,6 +49,8 @@ import ( "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" + agentTchanRep "github.com/jaegertracing/jaeger/tchannel/agent/app/reporter/tchannel" + tCollector "github.com/jaegertracing/jaeger/tchannel/collector/app" ) // all-in-one/main is a standalone full-stack jaeger backend, backed by a memory store @@ -130,11 +131,24 @@ by default uses only in-memory database.`, HealthCheck: svc.HC(), }) c.Start(cOpts) + tCollectorOpts := new(tCollector.Options).InitFromViper(v) + tc, err := tCollector.Start("jaeger-collector", tCollectorOpts, logger, c.SpanHandlers(), strategyStore) + if err != nil { + logger.Fatal("Could not start Tchannel thrift collector", zap.Error(err)) + } // agent grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort)) agentMetricsFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil}) - cp, err := agentApp.CreateCollectorProxy(repOpts, tchanBuilder, grpcBuilder, logger, agentMetricsFactory) + builders := map[agentRep.Type]agentApp.CollectorProxyBuilder{ + agentRep.GRPC: agentApp.GRPCCollectorProxyBuilder(grpcBuilder), + agentTchanRep.ReporterType: agentApp.TCollectorProxyBuilder(tchanBuilder), + } + cp, err := agentApp.CreateCollectorProxy(agentApp.ProxyBuilderOptions{ + Options: *repOpts, + Logger: logger, + Metrics: agentMetricsFactory, + }, builders) if err != nil { logger.Fatal("Could not create collector proxy", zap.Error(err)) } @@ -151,6 +165,7 @@ by default uses only in-memory database.`, agent.Stop() cp.Close() c.Close() + tc.Close() querySrv.Close() if closer, ok := spanWriter.(io.Closer); ok { err := closer.Close() @@ -178,6 +193,7 @@ by default uses only in-memory database.`, agentTchanRep.AddFlags, agentGrpcRep.AddFlags, collectorApp.AddFlags, + tCollector.AddFlags, queryApp.AddFlags, strategyStoreFactory.AddFlags, ) diff --git a/cmd/collector/app/builder_flags.go b/cmd/collector/app/builder_flags.go index ca207f6b37e..e725a35dfba 100644 --- a/cmd/collector/app/builder_flags.go +++ b/cmd/collector/app/builder_flags.go @@ -29,7 +29,6 @@ const ( collectorDynQueueSizeMemory = "collector.queue-size-memory" collectorQueueSize = "collector.queue-size" collectorNumWorkers = "collector.num-workers" - collectorPort = "collector.port" collectorHTTPPort = "collector.http-port" collectorGRPCPort = "collector.grpc-port" collectorTags = "collector.tags" @@ -52,8 +51,6 @@ type CollectorOptions struct { QueueSize int // NumWorkers is the number of internal workers in a collector NumWorkers int - // CollectorPort is the port that the collector service listens in on for tchannel requests - CollectorPort int // CollectorHTTPPort is the port that the collector service listens in on for http requests CollectorHTTPPort int // CollectorGRPCPort is the port that the collector service listens in on for gRPC requests @@ -75,7 +72,6 @@ func AddFlags(flags *flag.FlagSet) { flags.Uint(collectorDynQueueSizeMemory, 0, "(experimental) The max memory size in MiB to use for the dynamic queue.") flags.Int(collectorQueueSize, DefaultQueueSize, "The queue size of the collector") flags.Int(collectorNumWorkers, DefaultNumWorkers, "The number of workers pulling items from the queue") - flags.Int(collectorPort, ports.CollectorTChannel, "The TChannel port for the collector service") flags.Int(collectorHTTPPort, ports.CollectorHTTP, "The HTTP port for the collector service") flags.Int(collectorGRPCPort, ports.CollectorGRPC, "The gRPC port for the collector service") flags.String(collectorTags, "", "One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}") @@ -90,7 +86,6 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions { cOpts.DynQueueSizeMemory = v.GetUint(collectorDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes cOpts.QueueSize = v.GetInt(collectorQueueSize) cOpts.NumWorkers = v.GetInt(collectorNumWorkers) - cOpts.CollectorPort = v.GetInt(collectorPort) cOpts.CollectorHTTPPort = v.GetInt(collectorHTTPPort) cOpts.CollectorGRPCPort = v.GetInt(collectorGRPCPort) cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(collectorTags)) diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index f96d1dcd59e..7af28c001a4 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -20,7 +20,6 @@ import ( "time" "github.com/uber/jaeger-lib/metrics" - "github.com/uber/tchannel-go" "go.uber.org/zap" "google.golang.org/grpc" @@ -47,7 +46,6 @@ type Collector struct { hServer *http.Server zkServer *http.Server grpcServer *grpc.Server - tchServer *tchannel.Channel } // CollectorParams to construct a new Jaeger Collector. @@ -84,19 +82,6 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error { c.spanProcessor = handlerBuilder.BuildSpanProcessor() c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor) - if tchServer, err := server.StartThriftServer(&server.ThriftServerParams{ - ServiceName: c.serviceName, - Port: builderOpts.CollectorPort, - JaegerBatchesHandler: c.spanHandlers.JaegerBatchesHandler, - ZipkinSpansHandler: c.spanHandlers.ZipkinSpansHandler, - StrategyStore: c.strategyStore, - Logger: c.logger, - }); err != nil { - c.logger.Fatal("could not start Thrift collector", zap.Error(err)) - } else { - c.tchServer = tchServer - } - if grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{ Port: builderOpts.CollectorGRPCPort, Handler: c.spanHandlers.GRPCHandler, @@ -144,11 +129,6 @@ func (c *Collector) Close() error { c.grpcServer.GracefulStop() } - // TChannel server - if c.tchServer != nil { - c.tchServer.Close() - } - // HTTP server if c.hServer != nil { timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -175,3 +155,8 @@ func (c *Collector) Close() error { return nil } + +// SpanHandlers returns span handlers used by the Collector. +func (c *Collector) SpanHandlers() *SpanHandlers { + return c.spanHandlers +} diff --git a/cmd/collector/main.go b/cmd/collector/main.go index e67ad4447fc..c4fe02c4cff 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -36,6 +36,7 @@ import ( ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore" "github.com/jaegertracing/jaeger/plugin/storage" "github.com/jaegertracing/jaeger/ports" + tCollector "github.com/jaegertracing/jaeger/tchannel/collector/app" ) const serviceName = "jaeger-collector" @@ -93,6 +94,11 @@ func main() { }) collectorOpts := new(app.CollectorOptions).InitFromViper(v) c.Start(collectorOpts) + tCollectorOpts := new(tCollector.Options).InitFromViper(v) + tc, err := tCollector.Start("jaeger-collector", tCollectorOpts, logger, c.SpanHandlers(), strategyStore) + if err != nil { + logger.Fatal("Could not start Tchannel thrift collector", zap.Error(err)) + } svc.RunAndThen(func() { if closer, ok := spanWriter.(io.Closer); ok { @@ -105,6 +111,9 @@ func main() { if err := c.Close(); err != nil { logger.Error("failed to cleanly close the collector", zap.Error(err)) } + if err := tc.Close(); err != nil { + logger.Error("failed to cleanly close tchannel collector", zap.Error(err)) + } }) return nil }, @@ -119,6 +128,7 @@ func main() { command, svc.AddFlags, app.AddFlags, + tCollector.AddFlags, storageFactory.AddFlags, strategyStoreFactory.AddFlags, ) diff --git a/ports/ports.go b/ports/ports.go index 9873f191119..be94961e300 100644 --- a/ports/ports.go +++ b/ports/ports.go @@ -28,8 +28,6 @@ const ( // CollectorGRPC is the default port for gRPC server for sending spans CollectorGRPC = 14250 - // CollectorTChannel is the default port for TChannel server for sending spans - CollectorTChannel = 14267 // CollectorHTTP is the default port for HTTP server for sending spans (e.g. /api/traces endpoint) CollectorHTTP = 14268 // CollectorAdminHTTP is the default admin HTTP port (health check, metrics, etc.) diff --git a/cmd/agent/app/configmanager/tchannel/manager.go b/tchannel/agent/app/configmanager/tchannel/manager.go similarity index 100% rename from cmd/agent/app/configmanager/tchannel/manager.go rename to tchannel/agent/app/configmanager/tchannel/manager.go diff --git a/cmd/agent/app/configmanager/tchannel/manager_test.go b/tchannel/agent/app/configmanager/tchannel/manager_test.go similarity index 100% rename from cmd/agent/app/configmanager/tchannel/manager_test.go rename to tchannel/agent/app/configmanager/tchannel/manager_test.go diff --git a/cmd/agent/app/reporter/tchannel/builder.go b/tchannel/agent/app/reporter/tchannel/builder.go similarity index 100% rename from cmd/agent/app/reporter/tchannel/builder.go rename to tchannel/agent/app/reporter/tchannel/builder.go diff --git a/cmd/agent/app/reporter/tchannel/builder_test.go b/tchannel/agent/app/reporter/tchannel/builder_test.go similarity index 100% rename from cmd/agent/app/reporter/tchannel/builder_test.go rename to tchannel/agent/app/reporter/tchannel/builder_test.go diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy.go b/tchannel/agent/app/reporter/tchannel/collector_proxy.go similarity index 96% rename from cmd/agent/app/reporter/tchannel/collector_proxy.go rename to tchannel/agent/app/reporter/tchannel/collector_proxy.go index 996686880f1..885414ff538 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy.go +++ b/tchannel/agent/app/reporter/tchannel/collector_proxy.go @@ -19,8 +19,8 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager" - "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/tchannel" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" + "github.com/jaegertracing/jaeger/tchannel/agent/app/configmanager/tchannel" ) // ProxyBuilder holds objects communicating with collector diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go b/tchannel/agent/app/reporter/tchannel/collector_proxy_test.go similarity index 95% rename from cmd/agent/app/reporter/tchannel/collector_proxy_test.go rename to tchannel/agent/app/reporter/tchannel/collector_proxy_test.go index b7cafd4ac5b..1e4eb5aebb8 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go +++ b/tchannel/agent/app/reporter/tchannel/collector_proxy_test.go @@ -24,8 +24,8 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager" - "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/tchannel" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" + "github.com/jaegertracing/jaeger/tchannel/agent/app/configmanager/tchannel" ) var _ io.Closer = (*ProxyBuilder)(nil) diff --git a/cmd/agent/app/reporter/tchannel/flags.go b/tchannel/agent/app/reporter/tchannel/flags.go similarity index 95% rename from cmd/agent/app/reporter/tchannel/flags.go rename to tchannel/agent/app/reporter/tchannel/flags.go index 8cbb2f7d509..af15a75074b 100644 --- a/cmd/agent/app/reporter/tchannel/flags.go +++ b/tchannel/agent/app/reporter/tchannel/flags.go @@ -21,6 +21,8 @@ import ( "github.com/spf13/viper" "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" ) const ( @@ -31,6 +33,9 @@ const ( discoveryMinPeers = "discovery.min-peers" discoveryConnCheckTimeout = "discovery.conn-check-timeout" reportTimeout = "report-timeout" + + // ReporterType defines the type of the reporter. + ReporterType reporter.Type = "tchannel" ) // AddFlags adds flags for Builder. diff --git a/cmd/agent/app/reporter/tchannel/flags_test.go b/tchannel/agent/app/reporter/tchannel/flags_test.go similarity index 100% rename from cmd/agent/app/reporter/tchannel/flags_test.go rename to tchannel/agent/app/reporter/tchannel/flags_test.go diff --git a/cmd/agent/app/reporter/tchannel/reporter.go b/tchannel/agent/app/reporter/tchannel/reporter.go similarity index 100% rename from cmd/agent/app/reporter/tchannel/reporter.go rename to tchannel/agent/app/reporter/tchannel/reporter.go diff --git a/cmd/agent/app/reporter/tchannel/reporter_test.go b/tchannel/agent/app/reporter/tchannel/reporter_test.go similarity index 100% rename from cmd/agent/app/reporter/tchannel/reporter_test.go rename to tchannel/agent/app/reporter/tchannel/reporter_test.go diff --git a/tchannel/collector/app/collector.go b/tchannel/collector/app/collector.go new file mode 100644 index 00000000000..de0a68199b6 --- /dev/null +++ b/tchannel/collector/app/collector.go @@ -0,0 +1,61 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "github.com/uber/tchannel-go" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/collector/app" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + "github.com/jaegertracing/jaeger/tchannel/collector/app/server" +) + +// Collector returns tchannel collector. +type Collector struct { + tchServer *tchannel.Channel +} + +// Start starts the tchannel collector +func Start( + serviceName string, + cOpts *Options, + logger *zap.Logger, + spanHandlers *app.SpanHandlers, + strategyStore strategystore.StrategyStore, +) (*Collector, error) { + c := &Collector{} + tchServer, err := server.StartThriftServer(&server.ThriftServerParams{ + ServiceName: serviceName, + Port: cOpts.CollectorPort, + JaegerBatchesHandler: spanHandlers.JaegerBatchesHandler, + ZipkinSpansHandler: spanHandlers.ZipkinSpansHandler, + StrategyStore: strategyStore, + Logger: logger, + }) + if err != nil { + return c, err + } + c.tchServer = tchServer + return c, nil +} + +// Close the component and all its underlying dependencies. +func (c *Collector) Close() error { + if c.tchServer != nil { + c.tchServer.Close() + } + return nil +} diff --git a/tchannel/collector/app/empty_test.go b/tchannel/collector/app/empty_test.go new file mode 100644 index 00000000000..31115ecbf4c --- /dev/null +++ b/tchannel/collector/app/empty_test.go @@ -0,0 +1,15 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app diff --git a/tchannel/collector/app/flags.go b/tchannel/collector/app/flags.go new file mode 100644 index 00000000000..6c7d1281c89 --- /dev/null +++ b/tchannel/collector/app/flags.go @@ -0,0 +1,43 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "flag" + + "github.com/spf13/viper" +) + +const ( + // CollectorTChannel is the default port for TChannel server for sending spans + CollectorTChannel = 14267 + collectorPort = "collector.port" +) + +// Options holds tchannel collector configuration. +type Options struct { + CollectorPort int +} + +// AddFlags add Options's flags. +func AddFlags(flags *flag.FlagSet) { + flags.Int(collectorPort, CollectorTChannel, "The TChannel port for the collector service") +} + +// InitFromViper initializes Options from viper. +func (cOpts *Options) InitFromViper(v *viper.Viper) *Options { + cOpts.CollectorPort = v.GetInt(collectorPort) + return cOpts +} diff --git a/cmd/collector/app/handler/tchannel_handler.go b/tchannel/collector/app/handler/tchannel_handler.go similarity index 81% rename from cmd/collector/app/handler/tchannel_handler.go rename to tchannel/collector/app/handler/tchannel_handler.go index 42c1713f2c6..6496cd0a294 100644 --- a/cmd/collector/app/handler/tchannel_handler.go +++ b/tchannel/collector/app/handler/tchannel_handler.go @@ -17,6 +17,7 @@ package handler import ( "github.com/uber/tchannel-go/thrift" + "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" @@ -24,14 +25,14 @@ import ( // TChannelHandler implements jaeger.TChanCollector and zipkincore.TChanZipkinCollector. type TChannelHandler struct { - jaegerHandler JaegerBatchesHandler - zipkinHandler ZipkinSpansHandler + jaegerHandler handler.JaegerBatchesHandler + zipkinHandler handler.ZipkinSpansHandler } // NewTChannelHandler creates new handler that implements both Jaeger and Zipkin endpoints. func NewTChannelHandler( - jaegerHandler JaegerBatchesHandler, - zipkinHandler ZipkinSpansHandler, + jaegerHandler handler.JaegerBatchesHandler, + zipkinHandler handler.ZipkinSpansHandler, ) *TChannelHandler { return &TChannelHandler{ jaegerHandler: jaegerHandler, @@ -44,7 +45,7 @@ func (h *TChannelHandler) SubmitZipkinBatch( _ thrift.Context, spans []*zipkincore.Span, ) ([]*zipkincore.Response, error) { - return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{ + return h.zipkinHandler.SubmitZipkinBatch(spans, handler.SubmitBatchOptions{ InboundTransport: processor.TChannelTransport, }) } @@ -54,7 +55,7 @@ func (h *TChannelHandler) SubmitBatches( _ thrift.Context, batches []*jaeger.Batch, ) ([]*jaeger.BatchSubmitResponse, error) { - return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{ + return h.jaegerHandler.SubmitBatches(batches, handler.SubmitBatchOptions{ InboundTransport: processor.TChannelTransport, }) } diff --git a/cmd/collector/app/handler/tchannel_handler_test.go b/tchannel/collector/app/handler/tchannel_handler_test.go similarity index 71% rename from cmd/collector/app/handler/tchannel_handler_test.go rename to tchannel/collector/app/handler/tchannel_handler_test.go index fa908ec5b77..56fef93864c 100644 --- a/cmd/collector/app/handler/tchannel_handler_test.go +++ b/tchannel/collector/app/handler/tchannel_handler_test.go @@ -15,10 +15,12 @@ package handler import ( + "sync" "testing" "github.com/stretchr/testify/assert" + "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) @@ -33,7 +35,7 @@ type mockZipkinHandler struct { spans []*zipkincore.Span } -func (p *mockZipkinHandler) SubmitZipkinBatch(spans []*zipkincore.Span, opts SubmitBatchOptions) ([]*zipkincore.Response, error) { +func (p *mockZipkinHandler) SubmitZipkinBatch(spans []*zipkincore.Span, opts handler.SubmitBatchOptions) ([]*zipkincore.Response, error) { p.spans = append(p.spans, spans...) return nil, nil } @@ -57,3 +59,22 @@ func TestTChannelHandler(t *testing.T) { }) assert.Len(t, zh.spans, 1) } + +type mockJaegerHandler struct { + err error + mux sync.Mutex + batches []*jaeger.Batch +} + +func (p *mockJaegerHandler) SubmitBatches(batches []*jaeger.Batch, _ handler.SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) { + p.mux.Lock() + defer p.mux.Unlock() + p.batches = append(p.batches, batches...) + return nil, p.err +} + +func (p *mockJaegerHandler) getBatches() []*jaeger.Batch { + p.mux.Lock() + defer p.mux.Unlock() + return p.batches +} diff --git a/tchannel/collector/app/server/empty_test.go b/tchannel/collector/app/server/empty_test.go new file mode 100644 index 00000000000..cfd3515141a --- /dev/null +++ b/tchannel/collector/app/server/empty_test.go @@ -0,0 +1,15 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server diff --git a/cmd/collector/app/server/thrift.go b/tchannel/collector/app/server/thrift.go similarity index 94% rename from cmd/collector/app/server/thrift.go rename to tchannel/collector/app/server/thrift.go index c0051c43ce6..d64187cf579 100644 --- a/cmd/collector/app/server/thrift.go +++ b/tchannel/collector/app/server/thrift.go @@ -25,6 +25,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + tHandler "github.com/jaegertracing/jaeger/tchannel/collector/app/handler" jc "github.com/jaegertracing/jaeger/thrift-gen/jaeger" sc "github.com/jaegertracing/jaeger/thrift-gen/sampling" zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" @@ -64,7 +65,7 @@ func StartThriftServer(params *ThriftServerParams) (*tchannel.Channel, error) { func serveThrift(tchServer *tchannel.Channel, listener net.Listener, params *ThriftServerParams) error { server := thrift.NewServer(tchServer) - batchHandler := handler.NewTChannelHandler(params.JaegerBatchesHandler, params.ZipkinSpansHandler) + batchHandler := tHandler.NewTChannelHandler(params.JaegerBatchesHandler, params.ZipkinSpansHandler) server.Register(jc.NewTChanCollectorServer(batchHandler)) server.Register(zc.NewTChanZipkinCollectorServer(batchHandler)) server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(params.StrategyStore)))