From d6e96e0103aab0c4ea178fd00d1a37a58db4a223 Mon Sep 17 00:00:00 2001 From: Bartlomiej Obecny Date: Wed, 22 Jul 2020 13:38:10 +0200 Subject: [PATCH] feat: adding proto over http for collector exporter (#1302) --- .../docker/collector-config.yaml | 8 +- .../docker/docker-compose.yaml | 6 - examples/collector-exporter-node/package.json | 3 +- examples/collector-exporter-node/start.js | 3 +- .../README.md | 27 ++- .../package.json | 4 +- .../src/enums.ts | 1 + .../browser/CollectorTraceExporter.ts | 2 +- .../node/CollectorExporterNodeBase.ts | 47 +++-- .../platform/node/CollectorMetricExporter.ts | 13 +- .../platform/node/CollectorTraceExporter.ts | 23 ++- .../src/platform/node/README.md | 2 +- .../src/platform/node/util.ts | 59 ++++++ .../src/platform/node/utilWithGrpc.ts | 31 ++- .../src/platform/node/utilWithJson.ts | 48 +---- .../src/platform/node/utilWithJsonProto.ts | 76 +++++++ .../src/types.ts | 6 + .../test/helper.ts | 131 ++++++++++++ .../node/CollectorExporterWithProto.test.ts | 188 ++++++++++++++++++ .../test/node/CollectorMetricExporter.test.ts | 2 +- .../test/node/CollectorTraceExporter.test.ts | 2 +- 21 files changed, 578 insertions(+), 104 deletions(-) create mode 100644 packages/opentelemetry-exporter-collector/src/platform/node/utilWithJsonProto.ts create mode 100644 packages/opentelemetry-exporter-collector/test/node/CollectorExporterWithProto.test.ts diff --git a/examples/collector-exporter-node/docker/collector-config.yaml b/examples/collector-exporter-node/docker/collector-config.yaml index 8ea773375be..7e0cd7e7590 100644 --- a/examples/collector-exporter-node/docker/collector-config.yaml +++ b/examples/collector-exporter-node/docker/collector-config.yaml @@ -1,5 +1,11 @@ receivers: - otlp: {} + otlp: + {} +# keep it when upgrading to version 0.5+ +# protocols: +# grpc: +# http: +# endpoint: localhost:55680 exporters: zipkin: diff --git a/examples/collector-exporter-node/docker/docker-compose.yaml b/examples/collector-exporter-node/docker/docker-compose.yaml index 53c635a0f43..df43b97e33a 100644 --- a/examples/collector-exporter-node/docker/docker-compose.yaml +++ b/examples/collector-exporter-node/docker/docker-compose.yaml @@ -3,8 +3,6 @@ services: # Collector collector: image: otel/opentelemetry-collector:0.4.0 - networks: - - otelcol command: ["--config=/conf/collector-config.yaml", "--log-level=DEBUG"] volumes: - ./collector-config.yaml:/conf/collector-config.yaml @@ -15,10 +13,6 @@ services: # Zipkin zipkin-all-in-one: image: openzipkin/zipkin:latest - networks: - - otelcol ports: - "9411:9411" -networks: - otelcol: diff --git a/examples/collector-exporter-node/package.json b/examples/collector-exporter-node/package.json index 5f5d7b39e68..ac987ef149f 100644 --- a/examples/collector-exporter-node/package.json +++ b/examples/collector-exporter-node/package.json @@ -6,7 +6,8 @@ "main": "index.js", "scripts": { "start": "node ./start.js", - "docker:start": "cd ./docker && docker-compose down && docker-compose up -d", + "docker:start": "cd ./docker && docker-compose down && docker-compose up", + "docker:startd": "cd ./docker && docker-compose down && docker-compose up -d", "docker:stop": "cd ./docker && docker-compose down" }, "repository": { diff --git a/examples/collector-exporter-node/start.js b/examples/collector-exporter-node/start.js index 3f8f939653b..138b6b41ec2 100644 --- a/examples/collector-exporter-node/start.js +++ b/examples/collector-exporter-node/start.js @@ -9,7 +9,8 @@ const exporter = new CollectorTraceExporter({ // headers: { // foo: 'bar' // }, - protocolNode: CollectorProtocolNode.HTTP_JSON, + protocolNode: CollectorProtocolNode.HTTP_PROTO, + // protocolNode: CollectorProtocolNode.HTTP_JSON, }); const provider = new BasicTracerProvider(); diff --git a/packages/opentelemetry-exporter-collector/README.md b/packages/opentelemetry-exporter-collector/README.md index 8cfda2f5a2d..89431bdbaf2 100644 --- a/packages/opentelemetry-exporter-collector/README.md +++ b/packages/opentelemetry-exporter-collector/README.md @@ -132,10 +132,33 @@ provider.register(); ``` +## Usage in Node - PROTO over http + +```js +const { BasicTracerProvider, SimpleSpanProcessor } = require('@opentelemetry/tracing'); +const { CollectorExporter, CollectorTransportNode } = require('@opentelemetry/exporter-collector'); + +const collectorOptions = { + protocolNode: CollectorTransportNode.HTTP_PROTO, + serviceName: 'basic-service', + url: '', // url is optional and can be omitted - default is http://localhost:55680/v1/trace + headers: { + foo: 'bar' + }, //an optional object containing custom headers to be sent with each request will only work with json over http +}; + +const provider = new BasicTracerProvider(); +const exporter = new CollectorExporter(collectorOptions); +provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); + +provider.register(); + +``` + ## Running opentelemetry-collector locally to see the traces -1. Go to examples/basic-tracer-node -2. run `npm run collector:docker:ot` +1. Go to examples/collector-exporter-node +2. run `npm run docker:start` 3. Open page at `http://localhost:9411/zipkin/` to observe the traces ## Useful links diff --git a/packages/opentelemetry-exporter-collector/package.json b/packages/opentelemetry-exporter-collector/package.json index 1e2dcb4ac35..f9b4438dfdc 100644 --- a/packages/opentelemetry-exporter-collector/package.json +++ b/packages/opentelemetry-exporter-collector/package.json @@ -83,13 +83,13 @@ "webpack-merge": "5.0.9" }, "dependencies": { - "@grpc/proto-loader": "^0.5.3", + "@grpc/proto-loader": "^0.5.4", "@opentelemetry/api": "^0.9.0", "@opentelemetry/core": "^0.9.0", "@opentelemetry/resources": "^0.9.0", "@opentelemetry/metrics": "^0.9.0", "@opentelemetry/tracing": "^0.9.0", - "google-protobuf": "^3.11.4", + "protobufjs": "^6.9.0", "grpc": "^1.24.2" } } diff --git a/packages/opentelemetry-exporter-collector/src/enums.ts b/packages/opentelemetry-exporter-collector/src/enums.ts index 08c2fd9f2b7..8e8e27a299c 100644 --- a/packages/opentelemetry-exporter-collector/src/enums.ts +++ b/packages/opentelemetry-exporter-collector/src/enums.ts @@ -21,4 +21,5 @@ export enum CollectorProtocolNode { GRPC, HTTP_JSON, + HTTP_PROTO, } diff --git a/packages/opentelemetry-exporter-collector/src/platform/browser/CollectorTraceExporter.ts b/packages/opentelemetry-exporter-collector/src/platform/browser/CollectorTraceExporter.ts index 6984c91cf73..6036fe3773d 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/browser/CollectorTraceExporter.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/browser/CollectorTraceExporter.ts @@ -22,8 +22,8 @@ import * as collectorTypes from '../../types'; import { sendWithBeacon, sendWithXhr } from './util'; import { parseHeaders } from '../../util'; -const DEFAULT_COLLECTOR_URL = 'http://localhost:55680/v1/trace'; const DEFAULT_SERVICE_NAME = 'collector-trace-exporter'; +const DEFAULT_COLLECTOR_URL = 'http://localhost:55680/v1/trace'; /** * Collector Trace Exporter for Web diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts index 2e64bc39a4e..743fbad4de2 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts @@ -14,15 +14,17 @@ * limitations under the License. */ +import { Metadata } from 'grpc'; import { CollectorExporterBase } from '../../CollectorExporterBase'; +import { ServiceClientType } from '../../types'; import { CollectorExporterConfigNode, GRPCQueueItem } from './types'; import { ServiceClient } from './types'; -import * as grpc from 'grpc'; import { CollectorProtocolNode } from '../../enums'; import * as collectorTypes from '../../types'; import { parseHeaders } from '../../util'; -import { sendWithJson, initWithJson } from './utilWithJson'; -import { sendUsingGrpc, initWithGrpc } from './utilWithGrpc'; +import { initWithJson, sendWithJson } from './utilWithJson'; +import { initWithGrpc, sendWithGrpc } from './utilWithGrpc'; +import { initWithJsonProto, sendWithJsonProto } from './utilWithJsonProto'; const DEFAULT_SERVICE_NAME = 'collector-metric-exporter'; @@ -40,10 +42,9 @@ export abstract class CollectorExporterNodeBase< DEFAULT_HEADERS: Record = { [collectorTypes.OT_REQUEST_HEADER]: '1', }; - grpcQueue: GRPCQueueItem[]; + grpcQueue: GRPCQueueItem[] = []; + metadata?: Metadata; serviceClient?: ServiceClient = undefined; - credentials: grpc.ChannelCredentials; - metadata?: grpc.Metadata; headers: Record; protected readonly _protocol: CollectorProtocolNode; @@ -53,30 +54,35 @@ export abstract class CollectorExporterNodeBase< typeof config.protocolNode !== 'undefined' ? config.protocolNode : CollectorProtocolNode.GRPC; - if (this._protocol === CollectorProtocolNode.HTTP_JSON) { - this.logger.debug('CollectorExporter - using json over http'); - if (config.metadata) { - this.logger.warn('Metadata cannot be set when using json'); - } - } else { + if (this._protocol === CollectorProtocolNode.GRPC) { this.logger.debug('CollectorExporter - using grpc'); if (config.headers) { this.logger.warn('Headers cannot be set when using grpc'); } + } else { + if (this._protocol === CollectorProtocolNode.HTTP_JSON) { + this.logger.debug('CollectorExporter - using json over http'); + } else { + this.logger.debug('CollectorExporter - using proto over http'); + } + if (config.metadata) { + this.logger.warn('Metadata cannot be set when using http'); + } } - this.grpcQueue = []; - this.credentials = config.credentials || grpc.credentials.createInsecure(); - this.metadata = config.metadata; this.headers = parseHeaders(config.headers, this.logger) || this.DEFAULT_HEADERS; + this.metadata = config.metadata; } onInit(config: CollectorExporterConfigNode): void { this._isShutdown = false; + if (config.protocolNode === CollectorProtocolNode.HTTP_JSON) { initWithJson(this, config); + } else if (config.protocolNode === CollectorProtocolNode.HTTP_PROTO) { + initWithJsonProto(this, config); } else { - initWithGrpc(this); + initWithGrpc(this, config); } } @@ -91,8 +97,10 @@ export abstract class CollectorExporterNodeBase< } if (this._protocol === CollectorProtocolNode.HTTP_JSON) { sendWithJson(this, objects, onSuccess, onError); + } else if (this._protocol === CollectorProtocolNode.HTTP_PROTO) { + sendWithJsonProto(this, objects, onSuccess, onError); } else { - sendUsingGrpc(this, objects, onSuccess, onError); + sendWithGrpc(this, objects, onSuccess, onError); } } @@ -108,8 +116,5 @@ export abstract class CollectorExporterNodeBase< } abstract getServiceProtoPath(): string; - abstract getServiceClient( - packageObject: any, - serverAddress: string - ): ServiceClient; + abstract getServiceClientType(): ServiceClientType; } diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorMetricExporter.ts b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorMetricExporter.ts index 27f160a8948..fbf00089f82 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorMetricExporter.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorMetricExporter.ts @@ -15,13 +15,15 @@ */ import { MetricRecord, MetricExporter } from '@opentelemetry/metrics'; +import { ServiceClientType } from '../../types'; import * as collectorTypes from '../../types'; -import { CollectorExporterConfigNode, ServiceClient } from './types'; +import { CollectorExporterConfigNode } from './types'; import { CollectorProtocolNode } from '../../enums'; import { CollectorExporterNodeBase } from './CollectorExporterNodeBase'; import { toCollectorExportMetricServiceRequest } from '../../transformMetrics'; -import { DEFAULT_COLLECTOR_URL_GRPC } from './utilWithGrpc'; + const DEFAULT_SERVICE_NAME = 'collector-metric-exporter'; +const DEFAULT_COLLECTOR_URL_GRPC = 'localhost:55680'; const DEFAULT_COLLECTOR_URL_JSON = 'http://localhost:55680/v1/metrics'; /** @@ -58,11 +60,8 @@ export class CollectorMetricExporter return config.serviceName || DEFAULT_SERVICE_NAME; } - getServiceClient(packageObject: any, serverAddress: string): ServiceClient { - return new packageObject.opentelemetry.proto.collector.metrics.v1.MetricsService( - serverAddress, - this.credentials - ); + getServiceClientType() { + return ServiceClientType.METRICS; } getServiceProtoPath(): string { diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorTraceExporter.ts b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorTraceExporter.ts index bbdf9fe5525..2cabd1768c4 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorTraceExporter.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorTraceExporter.ts @@ -15,15 +15,17 @@ */ import { ReadableSpan, SpanExporter } from '@opentelemetry/tracing'; +import { ServiceClientType } from '../../types'; import { CollectorExporterNodeBase } from './CollectorExporterNodeBase'; import * as collectorTypes from '../../types'; import { CollectorProtocolNode } from '../../enums'; -import { CollectorExporterConfigNode, ServiceClient } from './types'; +import { CollectorExporterConfigNode } from './types'; import { toCollectorExportTraceServiceRequest } from '../../transform'; -import { DEFAULT_COLLECTOR_URL_GRPC } from './utilWithGrpc'; const DEFAULT_SERVICE_NAME = 'collector-trace-exporter'; +const DEFAULT_COLLECTOR_URL_GRPC = 'localhost:55680'; const DEFAULT_COLLECTOR_URL_JSON = 'http://localhost:55680/v1/trace'; +const DEFAULT_COLLECTOR_URL_JSON_PROTO = 'http://localhost:55680/v1/trace'; /** * Collector Trace Exporter for Node @@ -42,9 +44,13 @@ export class CollectorTraceExporter getDefaultUrl(config: CollectorExporterConfigNode): string { if (!config.url) { - return config.protocolNode === CollectorProtocolNode.HTTP_JSON - ? DEFAULT_COLLECTOR_URL_JSON - : DEFAULT_COLLECTOR_URL_GRPC; + if (config.protocolNode === CollectorProtocolNode.HTTP_JSON) { + return DEFAULT_COLLECTOR_URL_JSON; + } else if (config.protocolNode === CollectorProtocolNode.HTTP_PROTO) { + return DEFAULT_COLLECTOR_URL_JSON_PROTO; + } else { + return DEFAULT_COLLECTOR_URL_GRPC; + } } return config.url; } @@ -53,11 +59,8 @@ export class CollectorTraceExporter return config.serviceName || DEFAULT_SERVICE_NAME; } - getServiceClient(packageObject: any, serverAddress: string): ServiceClient { - return new packageObject.opentelemetry.proto.collector.trace.v1.TraceService( - serverAddress, - this.credentials - ); + getServiceClientType() { + return ServiceClientType.SPANS; } getServiceProtoPath(): string { diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/README.md b/packages/opentelemetry-exporter-collector/src/platform/node/README.md index 76ff2c2c492..66ca6c5f992 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/README.md +++ b/packages/opentelemetry-exporter-collector/src/platform/node/README.md @@ -43,4 +43,4 @@ the latest sha when this guide was written is `b54688569186e0b862bf7462a983ccf2c git commit -am "chore: updating submodule for opentelemetry-proto" ``` -9. If you look now at git log you will notice that the folder `protos` has been changed and it will show what was the previous sha and what is current one +9. If you look now at git log you will notice that the folder `protos` has been changed and it will show what was the previous sha and what is current one. diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts index 9ad59d5690d..a91bc485223 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts @@ -13,6 +13,65 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import * as url from 'url'; +import * as http from 'http'; +import * as https from 'https'; +import * as collectorTypes from '../../types'; +import { CollectorExporterNodeBase } from './CollectorExporterNodeBase'; + export function removeProtocol(url: string): string { return url.replace(/^https?:\/\//, ''); } + +/** + * Sends data using http + * @param collector + * @param data + * @param contentType + * @param onSuccess + * @param onError + */ +export function sendDataUsingHttp( + collector: CollectorExporterNodeBase, + data: string | Buffer, + contentType: string, + onSuccess: () => void, + onError: (error: collectorTypes.CollectorExporterError) => void +): void { + const parsedUrl = new url.URL(collector.url); + + const options = { + hostname: parsedUrl.hostname, + port: parsedUrl.port, + path: parsedUrl.pathname, + method: 'POST', + headers: { + 'Content-Length': Buffer.byteLength(data), + 'Content-Type': contentType, + ...collector.headers, + }, + }; + + const request = parsedUrl.protocol === 'http:' ? http.request : https.request; + const req = request(options, (res: http.IncomingMessage) => { + if (res.statusCode && res.statusCode < 299) { + collector.logger.debug(`statusCode: ${res.statusCode}`); + onSuccess(); + } else { + collector.logger.error(`statusCode: ${res.statusCode}`); + onError({ + code: res.statusCode, + message: res.statusMessage, + }); + } + }); + + req.on('error', (error: Error) => { + collector.logger.error('error', error.message); + onError({ + message: error.message, + }); + }); + req.write(data); + req.end(); +} diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/utilWithGrpc.ts b/packages/opentelemetry-exporter-collector/src/platform/node/utilWithGrpc.ts index 6d56ab8757b..8d1e3d1e334 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/utilWithGrpc.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/utilWithGrpc.ts @@ -17,18 +17,22 @@ import * as protoLoader from '@grpc/proto-loader'; import * as grpc from 'grpc'; import * as path from 'path'; +import { ServiceClientType } from '../../types'; import * as collectorTypes from '../../types'; -import { GRPCQueueItem } from './types'; +import { CollectorExporterConfigNode, GRPCQueueItem } from './types'; import { removeProtocol } from './util'; import { CollectorExporterNodeBase } from './CollectorExporterNodeBase'; -export const DEFAULT_COLLECTOR_URL_GRPC = 'localhost:55680'; - export function initWithGrpc( - collector: CollectorExporterNodeBase + collector: CollectorExporterNodeBase, + config: CollectorExporterConfigNode ): void { + collector.grpcQueue = []; const serverAddress = removeProtocol(collector.url); + const credentials: grpc.ChannelCredentials = + config.credentials || grpc.credentials.createInsecure(); + const includeDirs = [path.resolve(__dirname, 'protos')]; protoLoader @@ -42,10 +46,19 @@ export function initWithGrpc( }) .then(packageDefinition => { const packageObject: any = grpc.loadPackageDefinition(packageDefinition); - collector.serviceClient = collector.getServiceClient( - packageObject, - serverAddress - ); + + if (collector.getServiceClientType() === ServiceClientType.SPANS) { + collector.serviceClient = new packageObject.opentelemetry.proto.collector.trace.v1.TraceService( + serverAddress, + credentials + ); + } else { + collector.serviceClient = new packageObject.opentelemetry.proto.collector.metrics.v1.MetricsService( + serverAddress, + credentials + ); + } + if (collector.grpcQueue.length > 0) { const queue = collector.grpcQueue.splice(0); queue.forEach((item: GRPCQueueItem) => { @@ -55,7 +68,7 @@ export function initWithGrpc( }); } -export function sendUsingGrpc( +export function sendWithGrpc( collector: CollectorExporterNodeBase, objects: ExportItem[], onSuccess: () => void, diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/utilWithJson.ts b/packages/opentelemetry-exporter-collector/src/platform/node/utilWithJson.ts index 028f245d53c..38da393cfe6 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/utilWithJson.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/utilWithJson.ts @@ -14,13 +14,10 @@ * limitations under the License. */ -import * as url from 'url'; -import * as http from 'http'; -import * as https from 'https'; - import * as collectorTypes from '../../types'; import { CollectorExporterNodeBase } from './CollectorExporterNodeBase'; import { CollectorExporterConfigNode } from './types'; +import { sendDataUsingHttp } from './util'; export function initWithJson( _collector: CollectorExporterNodeBase, @@ -36,41 +33,12 @@ export function sendWithJson( onError: (error: collectorTypes.CollectorExporterError) => void ): void { const serviceRequest = collector.convert(objects); - const body = JSON.stringify(serviceRequest); - const parsedUrl = new url.URL(collector.url); - - const options = { - hostname: parsedUrl.hostname, - port: parsedUrl.port, - path: parsedUrl.pathname, - method: 'POST', - headers: { - 'Content-Length': Buffer.byteLength(body), - 'Content-Type': 'application/json', - ...collector.headers, - }, - }; - - const request = parsedUrl.protocol === 'http:' ? http.request : https.request; - const req = request(options, (res: http.IncomingMessage) => { - if (res.statusCode && res.statusCode < 299) { - collector.logger.debug(`statusCode: ${res.statusCode}`); - onSuccess(); - } else { - collector.logger.error(`statusCode: ${res.statusCode}`); - onError({ - code: res.statusCode, - message: res.statusMessage, - }); - } - }); - req.on('error', (error: Error) => { - collector.logger.error('error', error.message); - onError({ - message: error.message, - }); - }); - req.write(body); - req.end(); + sendDataUsingHttp( + collector, + JSON.stringify(serviceRequest), + 'application/json', + onSuccess, + onError + ); } diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/utilWithJsonProto.ts b/packages/opentelemetry-exporter-collector/src/platform/node/utilWithJsonProto.ts new file mode 100644 index 00000000000..ba0035bb652 --- /dev/null +++ b/packages/opentelemetry-exporter-collector/src/platform/node/utilWithJsonProto.ts @@ -0,0 +1,76 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as path from 'path'; +import { Type } from 'protobufjs'; +import * as protobufjs from 'protobufjs'; +import * as collectorTypes from '../../types'; +import { CollectorExporterNodeBase } from './CollectorExporterNodeBase'; +import { CollectorExporterConfigNode } from './types'; +import { sendDataUsingHttp } from './util'; + +let ExportTraceServiceRequestProto: Type | undefined; + +export function getExportTraceServiceRequestProto(): Type | undefined { + return ExportTraceServiceRequestProto; +} + +export function initWithJsonProto( + _collector: CollectorExporterNodeBase, + _config: CollectorExporterConfigNode +): void { + const dir = path.resolve(__dirname, 'protos'); + const root = new protobufjs.Root(); + root.resolvePath = function (origin, target) { + return `${dir}/${target}`; + }; + const proto = root.loadSync([ + 'opentelemetry/proto/common/v1/common.proto', + 'opentelemetry/proto/resource/v1/resource.proto', + 'opentelemetry/proto/trace/v1/trace.proto', + 'opentelemetry/proto/collector/trace/v1/trace_service.proto', + ]); + ExportTraceServiceRequestProto = proto?.lookupType( + 'ExportTraceServiceRequest' + ); +} + +export function sendWithJsonProto( + collector: CollectorExporterNodeBase, + objects: ExportItem[], + onSuccess: () => void, + onError: (error: collectorTypes.CollectorExporterError) => void +): void { + const serviceRequest = collector.convert(objects); + + const message = ExportTraceServiceRequestProto?.create(serviceRequest); + if (message) { + const body = ExportTraceServiceRequestProto?.encode(message).finish(); + if (body) { + sendDataUsingHttp( + collector, + Buffer.from(body), + 'application/x-protobuf', + onSuccess, + onError + ); + } + } else { + onError({ + message: 'No proto', + }); + } +} diff --git a/packages/opentelemetry-exporter-collector/src/types.ts b/packages/opentelemetry-exporter-collector/src/types.ts index 40eb5755d8d..3f11157e7dc 100644 --- a/packages/opentelemetry-exporter-collector/src/types.ts +++ b/packages/opentelemetry-exporter-collector/src/types.ts @@ -172,6 +172,7 @@ export namespace opentelemetryProto { PRODUCER, CONSUMER, } + export type TraceState = string | undefined; } @@ -294,3 +295,8 @@ export const COLLECTOR_SPAN_KIND_MAPPING = { [SpanKind.PRODUCER]: opentelemetryProto.trace.v1.Span.SpanKind.PRODUCER, [SpanKind.CONSUMER]: opentelemetryProto.trace.v1.Span.SpanKind.CONSUMER, }; + +export enum ServiceClientType { + SPANS, + METRICS, +} diff --git a/packages/opentelemetry-exporter-collector/test/helper.ts b/packages/opentelemetry-exporter-collector/test/helper.ts index 7437460b421..b992867c1ca 100644 --- a/packages/opentelemetry-exporter-collector/test/helper.ts +++ b/packages/opentelemetry-exporter-collector/test/helper.ts @@ -502,6 +502,57 @@ export function ensureEventsAreCorrect( ); } +export function ensureProtoEventsAreCorrect( + events: opentelemetryProto.trace.v1.Span.Event[] +) { + assert.deepStrictEqual( + events, + [ + { + timeUnixNano: '1574120165429803008', + name: 'fetchStart', + droppedAttributesCount: 0, + }, + { + timeUnixNano: '1574120165429803008', + name: 'domainLookupStart', + droppedAttributesCount: 0, + }, + { + timeUnixNano: '1574120165429803008', + name: 'domainLookupEnd', + droppedAttributesCount: 0, + }, + { + timeUnixNano: '1574120165429803008', + name: 'connectStart', + droppedAttributesCount: 0, + }, + { + timeUnixNano: '1574120165429803008', + name: 'connectEnd', + droppedAttributesCount: 0, + }, + { + timeUnixNano: '1574120165435513088', + name: 'requestStart', + droppedAttributesCount: 0, + }, + { + timeUnixNano: '1574120165436923136', + name: 'responseStart', + droppedAttributesCount: 0, + }, + { + timeUnixNano: '1574120165438688000', + name: 'responseEnd', + droppedAttributesCount: 0, + }, + ], + 'events are incorrect' + ); +} + export function ensureAttributesAreCorrect( attributes: opentelemetryProto.common.v1.AttributeKeyValue[] ) { @@ -518,6 +569,22 @@ export function ensureAttributesAreCorrect( ); } +export function ensureProtoAttributesAreCorrect( + attributes: opentelemetryProto.common.v1.AttributeKeyValue[] +) { + assert.deepStrictEqual( + attributes, + [ + { + key: 'component', + type: 'STRING', + stringValue: 'document-load', + }, + ], + 'attributes are incorrect' + ); +} + export function ensureLinksAreCorrect( attributes: opentelemetryProto.trace.v1.Span.Link[] ) { @@ -541,6 +608,29 @@ export function ensureLinksAreCorrect( ); } +export function ensureProtoLinksAreCorrect( + attributes: opentelemetryProto.trace.v1.Span.Link[] +) { + assert.deepStrictEqual( + attributes, + [ + { + traceId: traceIdBase64, + spanId: parentIdBase64, + attributes: [ + { + key: 'component', + type: 'STRING', + stringValue: 'document-load', + }, + ], + droppedAttributesCount: 0, + }, + ], + 'links are incorrect' + ); +} + export function ensureSpanIsCorrect( span: collectorTypes.opentelemetryProto.trace.v1.Span ) { @@ -586,6 +676,47 @@ export function ensureSpanIsCorrect( assert.deepStrictEqual(span.status, { code: 0 }, 'status is wrong'); } +export function ensureProtoSpanIsCorrect( + span: collectorTypes.opentelemetryProto.trace.v1.Span +) { + if (span.attributes) { + ensureProtoAttributesAreCorrect(span.attributes); + } + if (span.events) { + ensureProtoEventsAreCorrect(span.events); + } + if (span.links) { + ensureProtoLinksAreCorrect(span.links); + } + assert.deepStrictEqual(span.traceId, traceIdBase64, 'traceId is wrong'); + assert.deepStrictEqual(span.spanId, spanIdBase64, 'spanId is wrong'); + assert.deepStrictEqual( + span.parentSpanId, + parentIdBase64, + 'parentIdArr is wrong' + ); + assert.strictEqual(span.name, 'documentFetch', 'name is wrong'); + assert.strictEqual(span.kind, 'INTERNAL', 'kind is wrong'); + assert.strictEqual( + span.startTimeUnixNano, + '1574120165429803008', + 'startTimeUnixNano is wrong' + ); + assert.strictEqual( + span.endTimeUnixNano, + '1574120165438688000', + 'endTimeUnixNano is wrong' + ); + assert.strictEqual( + span.droppedAttributesCount, + 0, + 'droppedAttributesCount is wrong' + ); + assert.strictEqual(span.droppedEventsCount, 0, 'droppedEventsCount is wrong'); + assert.strictEqual(span.droppedLinksCount, 0, 'droppedLinksCount is wrong'); + assert.deepStrictEqual(span.status, { code: 'Ok' }, 'status is wrong'); +} + export function ensureExportedSpanIsCorrect( span: collectorTypes.opentelemetryProto.trace.v1.Span ) { diff --git a/packages/opentelemetry-exporter-collector/test/node/CollectorExporterWithProto.test.ts b/packages/opentelemetry-exporter-collector/test/node/CollectorExporterWithProto.test.ts new file mode 100644 index 00000000000..bf009167cb8 --- /dev/null +++ b/packages/opentelemetry-exporter-collector/test/node/CollectorExporterWithProto.test.ts @@ -0,0 +1,188 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as core from '@opentelemetry/core'; +import { ReadableSpan } from '@opentelemetry/tracing'; +import * as http from 'http'; +import * as assert from 'assert'; +import * as sinon from 'sinon'; +import { CollectorProtocolNode } from '../../src/enums'; +import { CollectorTraceExporter } from '../../src/platform/node'; +import { CollectorExporterConfigNode } from '../../src/platform/node/types'; +import { getExportTraceServiceRequestProto } from '../../src/platform/node/utilWithJsonProto'; +import * as collectorTypes from '../../src/types'; + +import { + ensureExportTraceServiceRequestIsSet, + ensureProtoSpanIsCorrect, + mockedReadableSpan, +} from '../helper'; + +const fakeRequest = { + end: function () {}, + on: function () {}, + write: function () {}, +}; + +const mockRes = { + statusCode: 200, +}; + +const mockResError = { + statusCode: 400, +}; + +describe('CollectorExporter - node with proto over http', () => { + let collectorExporter: CollectorTraceExporter; + let collectorExporterConfig: CollectorExporterConfigNode; + let spyRequest: sinon.SinonSpy; + let spyWrite: sinon.SinonSpy; + let spans: ReadableSpan[]; + describe('export', () => { + beforeEach(() => { + spyRequest = sinon.stub(http, 'request').returns(fakeRequest as any); + spyWrite = sinon.stub(fakeRequest, 'write'); + collectorExporterConfig = { + headers: { + foo: 'bar', + }, + protocolNode: CollectorProtocolNode.HTTP_PROTO, + hostname: 'foo', + logger: new core.NoopLogger(), + serviceName: 'bar', + attributes: {}, + url: 'http://foo.bar.com', + }; + collectorExporter = new CollectorTraceExporter(collectorExporterConfig); + spans = []; + spans.push(Object.assign({}, mockedReadableSpan)); + }); + afterEach(() => { + spyRequest.restore(); + spyWrite.restore(); + }); + + it('should open the connection', done => { + collectorExporter.export(spans, () => {}); + + setTimeout(() => { + const args = spyRequest.args[0]; + const options = args[0]; + + assert.strictEqual(options.hostname, 'foo.bar.com'); + assert.strictEqual(options.method, 'POST'); + assert.strictEqual(options.path, '/'); + done(); + }); + }); + + it('should set custom headers', done => { + collectorExporter.export(spans, () => {}); + + setTimeout(() => { + const args = spyRequest.args[0]; + const options = args[0]; + assert.strictEqual(options.headers['foo'], 'bar'); + done(); + }); + }); + + it('should successfully send the spans', done => { + collectorExporter.export(spans, () => {}); + + setTimeout(() => { + const writeArgs = spyWrite.args[0]; + const ExportTraceServiceRequestProto = getExportTraceServiceRequestProto(); + const data = ExportTraceServiceRequestProto?.decode(writeArgs[0]); + const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest; + const span1 = + json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0]; + assert.ok(typeof span1 !== 'undefined', "span doesn't exist"); + if (span1) { + ensureProtoSpanIsCorrect(span1); + } + + ensureExportTraceServiceRequestIsSet(json); + + done(); + }); + }); + + it('should log the successful message', done => { + const spyLoggerDebug = sinon.stub(collectorExporter.logger, 'debug'); + const spyLoggerError = sinon.stub(collectorExporter.logger, 'error'); + + const responseSpy = sinon.spy(); + collectorExporter.export(spans, responseSpy); + + setTimeout(() => { + const args = spyRequest.args[0]; + const callback = args[1]; + callback(mockRes); + setTimeout(() => { + const response: any = spyLoggerDebug.args[1][0]; + assert.strictEqual(response, 'statusCode: 200'); + assert.strictEqual(spyLoggerError.args.length, 0); + assert.strictEqual(responseSpy.args[0][0], 0); + done(); + }); + }); + }); + + it('should log the error message', done => { + const spyLoggerError = sinon.stub(collectorExporter.logger, 'error'); + + const responseSpy = sinon.spy(); + collectorExporter.export(spans, responseSpy); + + setTimeout(() => { + const args = spyRequest.args[0]; + const callback = args[1]; + callback(mockResError); + setTimeout(() => { + const response: any = spyLoggerError.args[0][0]; + assert.strictEqual(response, 'statusCode: 400'); + + assert.strictEqual(responseSpy.args[0][0], 1); + done(); + }); + }); + }); + }); + describe('CollectorTraceExporter - node (getDefaultUrl)', () => { + it('should default to localhost', done => { + const collectorExporter = new CollectorTraceExporter({ + protocolNode: CollectorProtocolNode.HTTP_PROTO, + }); + setTimeout(() => { + assert.strictEqual( + collectorExporter['url'], + 'http://localhost:55680/v1/trace' + ); + done(); + }); + }); + + it('should keep the URL if included', done => { + const url = 'http://foo.bar.com'; + const collectorExporter = new CollectorTraceExporter({ url }); + setTimeout(() => { + assert.strictEqual(collectorExporter['url'], url); + done(); + }); + }); + }); +}); diff --git a/packages/opentelemetry-exporter-collector/test/node/CollectorMetricExporter.test.ts b/packages/opentelemetry-exporter-collector/test/node/CollectorMetricExporter.test.ts index abdd1d774c5..300c257593b 100644 --- a/packages/opentelemetry-exporter-collector/test/node/CollectorMetricExporter.test.ts +++ b/packages/opentelemetry-exporter-collector/test/node/CollectorMetricExporter.test.ts @@ -186,7 +186,7 @@ const testCollectorMetricExporter = (params: TestParams) => protocolNode: CollectorProtocolNode.HTTP_JSON, }); const args = spyLoggerWarn.args[0]; - assert.strictEqual(args[0], 'Metadata cannot be set when using json'); + assert.strictEqual(args[0], 'Metadata cannot be set when using http'); }); }); diff --git a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts index 91e89fc02fc..9cadd1574ce 100644 --- a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts @@ -168,7 +168,7 @@ const testCollectorExporter = (params: TestParams) => protocolNode: CollectorProtocolNode.HTTP_JSON, }); const args = spyLoggerWarn.args[0]; - assert.strictEqual(args[0], 'Metadata cannot be set when using json'); + assert.strictEqual(args[0], 'Metadata cannot be set when using http'); }); });