Skip to content

Commit

Permalink
feat: adding proto over http for collector exporter (open-telemetry#1302
Browse files Browse the repository at this point in the history
)
  • Loading branch information
obecny authored Jul 22, 2020
1 parent e056558 commit d6e96e0
Show file tree
Hide file tree
Showing 21 changed files with 578 additions and 104 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
receivers:
otlp: {}
otlp:
{}
# keep it when upgrading to version 0.5+
# protocols:
# grpc:
# http:
# endpoint: localhost:55680

exporters:
zipkin:
Expand Down
6 changes: 0 additions & 6 deletions examples/collector-exporter-node/docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ services:
# Collector
collector:
image: otel/opentelemetry-collector:0.4.0
networks:
- otelcol
command: ["--config=/conf/collector-config.yaml", "--log-level=DEBUG"]
volumes:
- ./collector-config.yaml:/conf/collector-config.yaml
Expand All @@ -15,10 +13,6 @@ services:
# Zipkin
zipkin-all-in-one:
image: openzipkin/zipkin:latest
networks:
- otelcol
ports:
- "9411:9411"

networks:
otelcol:
3 changes: 2 additions & 1 deletion examples/collector-exporter-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"main": "index.js",
"scripts": {
"start": "node ./start.js",
"docker:start": "cd ./docker && docker-compose down && docker-compose up -d",
"docker:start": "cd ./docker && docker-compose down && docker-compose up",
"docker:startd": "cd ./docker && docker-compose down && docker-compose up -d",
"docker:stop": "cd ./docker && docker-compose down"
},
"repository": {
Expand Down
3 changes: 2 additions & 1 deletion examples/collector-exporter-node/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ const exporter = new CollectorTraceExporter({
// headers: {
// foo: 'bar'
// },
protocolNode: CollectorProtocolNode.HTTP_JSON,
protocolNode: CollectorProtocolNode.HTTP_PROTO,
// protocolNode: CollectorProtocolNode.HTTP_JSON,
});

const provider = new BasicTracerProvider();
Expand Down
27 changes: 25 additions & 2 deletions packages/opentelemetry-exporter-collector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,33 @@ provider.register();

```

## Usage in Node - PROTO over http

```js
const { BasicTracerProvider, SimpleSpanProcessor } = require('@opentelemetry/tracing');
const { CollectorExporter, CollectorTransportNode } = require('@opentelemetry/exporter-collector');

const collectorOptions = {
protocolNode: CollectorTransportNode.HTTP_PROTO,
serviceName: 'basic-service',
url: '<opentelemetry-collector-url>', // url is optional and can be omitted - default is http://localhost:55680/v1/trace
headers: {
foo: 'bar'
}, //an optional object containing custom headers to be sent with each request will only work with json over http
};

const provider = new BasicTracerProvider();
const exporter = new CollectorExporter(collectorOptions);
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));

provider.register();

```

## Running opentelemetry-collector locally to see the traces

1. Go to examples/basic-tracer-node
2. run `npm run collector:docker:ot`
1. Go to examples/collector-exporter-node
2. run `npm run docker:start`
3. Open page at `http://localhost:9411/zipkin/` to observe the traces

## Useful links
Expand Down
4 changes: 2 additions & 2 deletions packages/opentelemetry-exporter-collector/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@
"webpack-merge": "5.0.9"
},
"dependencies": {
"@grpc/proto-loader": "^0.5.3",
"@grpc/proto-loader": "^0.5.4",
"@opentelemetry/api": "^0.9.0",
"@opentelemetry/core": "^0.9.0",
"@opentelemetry/resources": "^0.9.0",
"@opentelemetry/metrics": "^0.9.0",
"@opentelemetry/tracing": "^0.9.0",
"google-protobuf": "^3.11.4",
"protobufjs": "^6.9.0",
"grpc": "^1.24.2"
}
}
1 change: 1 addition & 0 deletions packages/opentelemetry-exporter-collector/src/enums.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@
export enum CollectorProtocolNode {
GRPC,
HTTP_JSON,
HTTP_PROTO,
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import * as collectorTypes from '../../types';
import { sendWithBeacon, sendWithXhr } from './util';
import { parseHeaders } from '../../util';

const DEFAULT_COLLECTOR_URL = 'http://localhost:55680/v1/trace';
const DEFAULT_SERVICE_NAME = 'collector-trace-exporter';
const DEFAULT_COLLECTOR_URL = 'http://localhost:55680/v1/trace';

/**
* Collector Trace Exporter for Web
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@
* limitations under the License.
*/

import { Metadata } from 'grpc';
import { CollectorExporterBase } from '../../CollectorExporterBase';
import { ServiceClientType } from '../../types';
import { CollectorExporterConfigNode, GRPCQueueItem } from './types';
import { ServiceClient } from './types';
import * as grpc from 'grpc';
import { CollectorProtocolNode } from '../../enums';
import * as collectorTypes from '../../types';
import { parseHeaders } from '../../util';
import { sendWithJson, initWithJson } from './utilWithJson';
import { sendUsingGrpc, initWithGrpc } from './utilWithGrpc';
import { initWithJson, sendWithJson } from './utilWithJson';
import { initWithGrpc, sendWithGrpc } from './utilWithGrpc';
import { initWithJsonProto, sendWithJsonProto } from './utilWithJsonProto';

const DEFAULT_SERVICE_NAME = 'collector-metric-exporter';

Expand All @@ -40,10 +42,9 @@ export abstract class CollectorExporterNodeBase<
DEFAULT_HEADERS: Record<string, string> = {
[collectorTypes.OT_REQUEST_HEADER]: '1',
};
grpcQueue: GRPCQueueItem<ExportItem>[];
grpcQueue: GRPCQueueItem<ExportItem>[] = [];
metadata?: Metadata;
serviceClient?: ServiceClient = undefined;
credentials: grpc.ChannelCredentials;
metadata?: grpc.Metadata;
headers: Record<string, string>;
protected readonly _protocol: CollectorProtocolNode;

Expand All @@ -53,30 +54,35 @@ export abstract class CollectorExporterNodeBase<
typeof config.protocolNode !== 'undefined'
? config.protocolNode
: CollectorProtocolNode.GRPC;
if (this._protocol === CollectorProtocolNode.HTTP_JSON) {
this.logger.debug('CollectorExporter - using json over http');
if (config.metadata) {
this.logger.warn('Metadata cannot be set when using json');
}
} else {
if (this._protocol === CollectorProtocolNode.GRPC) {
this.logger.debug('CollectorExporter - using grpc');
if (config.headers) {
this.logger.warn('Headers cannot be set when using grpc');
}
} else {
if (this._protocol === CollectorProtocolNode.HTTP_JSON) {
this.logger.debug('CollectorExporter - using json over http');
} else {
this.logger.debug('CollectorExporter - using proto over http');
}
if (config.metadata) {
this.logger.warn('Metadata cannot be set when using http');
}
}
this.grpcQueue = [];
this.credentials = config.credentials || grpc.credentials.createInsecure();
this.metadata = config.metadata;
this.headers =
parseHeaders(config.headers, this.logger) || this.DEFAULT_HEADERS;
this.metadata = config.metadata;
}

onInit(config: CollectorExporterConfigNode): void {
this._isShutdown = false;

if (config.protocolNode === CollectorProtocolNode.HTTP_JSON) {
initWithJson(this, config);
} else if (config.protocolNode === CollectorProtocolNode.HTTP_PROTO) {
initWithJsonProto(this, config);
} else {
initWithGrpc(this);
initWithGrpc(this, config);
}
}

Expand All @@ -91,8 +97,10 @@ export abstract class CollectorExporterNodeBase<
}
if (this._protocol === CollectorProtocolNode.HTTP_JSON) {
sendWithJson(this, objects, onSuccess, onError);
} else if (this._protocol === CollectorProtocolNode.HTTP_PROTO) {
sendWithJsonProto(this, objects, onSuccess, onError);
} else {
sendUsingGrpc(this, objects, onSuccess, onError);
sendWithGrpc(this, objects, onSuccess, onError);
}
}

Expand All @@ -108,8 +116,5 @@ export abstract class CollectorExporterNodeBase<
}

abstract getServiceProtoPath(): string;
abstract getServiceClient(
packageObject: any,
serverAddress: string
): ServiceClient;
abstract getServiceClientType(): ServiceClientType;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/

import { MetricRecord, MetricExporter } from '@opentelemetry/metrics';
import { ServiceClientType } from '../../types';
import * as collectorTypes from '../../types';
import { CollectorExporterConfigNode, ServiceClient } from './types';
import { CollectorExporterConfigNode } from './types';
import { CollectorProtocolNode } from '../../enums';
import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';
import { toCollectorExportMetricServiceRequest } from '../../transformMetrics';
import { DEFAULT_COLLECTOR_URL_GRPC } from './utilWithGrpc';

const DEFAULT_SERVICE_NAME = 'collector-metric-exporter';
const DEFAULT_COLLECTOR_URL_GRPC = 'localhost:55680';
const DEFAULT_COLLECTOR_URL_JSON = 'http://localhost:55680/v1/metrics';

/**
Expand Down Expand Up @@ -58,11 +60,8 @@ export class CollectorMetricExporter
return config.serviceName || DEFAULT_SERVICE_NAME;
}

getServiceClient(packageObject: any, serverAddress: string): ServiceClient {
return new packageObject.opentelemetry.proto.collector.metrics.v1.MetricsService(
serverAddress,
this.credentials
);
getServiceClientType() {
return ServiceClientType.METRICS;
}

getServiceProtoPath(): string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
*/

import { ReadableSpan, SpanExporter } from '@opentelemetry/tracing';
import { ServiceClientType } from '../../types';
import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';
import * as collectorTypes from '../../types';
import { CollectorProtocolNode } from '../../enums';
import { CollectorExporterConfigNode, ServiceClient } from './types';
import { CollectorExporterConfigNode } from './types';
import { toCollectorExportTraceServiceRequest } from '../../transform';
import { DEFAULT_COLLECTOR_URL_GRPC } from './utilWithGrpc';

const DEFAULT_SERVICE_NAME = 'collector-trace-exporter';
const DEFAULT_COLLECTOR_URL_GRPC = 'localhost:55680';
const DEFAULT_COLLECTOR_URL_JSON = 'http://localhost:55680/v1/trace';
const DEFAULT_COLLECTOR_URL_JSON_PROTO = 'http://localhost:55680/v1/trace';

/**
* Collector Trace Exporter for Node
Expand All @@ -42,9 +44,13 @@ export class CollectorTraceExporter

getDefaultUrl(config: CollectorExporterConfigNode): string {
if (!config.url) {
return config.protocolNode === CollectorProtocolNode.HTTP_JSON
? DEFAULT_COLLECTOR_URL_JSON
: DEFAULT_COLLECTOR_URL_GRPC;
if (config.protocolNode === CollectorProtocolNode.HTTP_JSON) {
return DEFAULT_COLLECTOR_URL_JSON;
} else if (config.protocolNode === CollectorProtocolNode.HTTP_PROTO) {
return DEFAULT_COLLECTOR_URL_JSON_PROTO;
} else {
return DEFAULT_COLLECTOR_URL_GRPC;
}
}
return config.url;
}
Expand All @@ -53,11 +59,8 @@ export class CollectorTraceExporter
return config.serviceName || DEFAULT_SERVICE_NAME;
}

getServiceClient(packageObject: any, serverAddress: string): ServiceClient {
return new packageObject.opentelemetry.proto.collector.trace.v1.TraceService(
serverAddress,
this.credentials
);
getServiceClientType() {
return ServiceClientType.SPANS;
}

getServiceProtoPath(): string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ the latest sha when this guide was written is `b54688569186e0b862bf7462a983ccf2c
git commit -am "chore: updating submodule for opentelemetry-proto"
```

9. If you look now at git log you will notice that the folder `protos` has been changed and it will show what was the previous sha and what is current one
9. If you look now at git log you will notice that the folder `protos` has been changed and it will show what was the previous sha and what is current one.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,65 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as url from 'url';
import * as http from 'http';
import * as https from 'https';
import * as collectorTypes from '../../types';
import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';

export function removeProtocol(url: string): string {
return url.replace(/^https?:\/\//, '');
}

/**
* Sends data using http
* @param collector
* @param data
* @param contentType
* @param onSuccess
* @param onError
*/
export function sendDataUsingHttp<ExportItem, ServiceRequest>(
collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
data: string | Buffer,
contentType: string,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
const parsedUrl = new url.URL(collector.url);

const options = {
hostname: parsedUrl.hostname,
port: parsedUrl.port,
path: parsedUrl.pathname,
method: 'POST',
headers: {
'Content-Length': Buffer.byteLength(data),
'Content-Type': contentType,
...collector.headers,
},
};

const request = parsedUrl.protocol === 'http:' ? http.request : https.request;
const req = request(options, (res: http.IncomingMessage) => {
if (res.statusCode && res.statusCode < 299) {
collector.logger.debug(`statusCode: ${res.statusCode}`);
onSuccess();
} else {
collector.logger.error(`statusCode: ${res.statusCode}`);
onError({
code: res.statusCode,
message: res.statusMessage,
});
}
});

req.on('error', (error: Error) => {
collector.logger.error('error', error.message);
onError({
message: error.message,
});
});
req.write(data);
req.end();
}
Loading

0 comments on commit d6e96e0

Please sign in to comment.