Skip to content

Commit

Permalink
Refactor transport layer (#831)
Browse files Browse the repository at this point in the history
- move content type from `::send()` to transport property
- move otlp exporter to `Otlp\` namespace
  • Loading branch information
Nevay authored Oct 11, 2022
1 parent a167add commit 3866a82
Show file tree
Hide file tree
Showing 31 changed files with 429 additions and 385 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ update: ## Update dependencies
$(DC_RUN_PHP) env XDEBUG_MODE=off composer update
test: test-unit test-integration ## Run unit and integration tests
test-unit: ## Run unit tests
$(DC_RUN_PHP) env XDEBUG_MODE=coverage vendor/bin/phpunit --testsuite unit --colors=always --coverage-text --testdox --coverage-clover coverage.clover --coverage-html=tests/coverage/html --log-junit=junit.xml
$(DC_RUN_PHP) env XDEBUG_MODE=coverage vendor/bin/phpunit --testsuite unit --colors=always --testdox --coverage-clover coverage.clover --coverage-html=tests/coverage/html --log-junit=junit.xml
test-integration: ## Run integration tests
$(DC_RUN_PHP) env XDEBUG_MODE=off vendor/bin/phpunit --testsuite integration --colors=always
test-coverage: ## Run units tests and generate code coverage
Expand Down
3 changes: 1 addition & 2 deletions deptrac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ deptrac:
- Composer
Contrib:
- +SDK
- OtelProto
- Grpc
- +OtelProto
- Prometheus
- Thrift
- JaegerThrift
Expand Down
5 changes: 3 additions & 2 deletions examples/metrics/basic.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

require_once __DIR__ . '/../../vendor/autoload.php';

use OpenTelemetry\Contrib\Otlp\StreamMetricExporter;
use OpenTelemetry\Contrib\Otlp\MetricExporter;
use OpenTelemetry\SDK\Common\Attribute\Attributes;
use OpenTelemetry\SDK\Common\Export\Stream\StreamTransportFactory;
use OpenTelemetry\SDK\Common\Instrumentation\InstrumentationScopeFactory;
use OpenTelemetry\SDK\Common\Time\ClockFactory;
use OpenTelemetry\SDK\Metrics\Aggregation\ExplicitBucketHistogramAggregation;
Expand All @@ -20,7 +21,7 @@
use OpenTelemetry\SDK\Resource\ResourceInfoFactory;

$clock = ClockFactory::getDefault();
$reader = new ExportingReader(new StreamMetricExporter(STDOUT, /*Temporality::CUMULATIVE*/), $clock);
$reader = new ExportingReader(new MetricExporter((new StreamTransportFactory())->create(STDOUT, 'application/x-ndjson'), /*Temporality::CUMULATIVE*/), $clock);

// Let's imagine we export the metrics as Histogram, and to simplify the story we will only have one histogram bucket (-Inf, +Inf):
$views = new CriteriaViewRegistry();
Expand Down
14 changes: 8 additions & 6 deletions src/Contrib/Grpc/GrpcTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
use OpenTelemetry\SDK\Common\Future\FutureInterface;
use OpenTelemetry\SDK\Common\Future\NullCancellation;
use RuntimeException;
use function sprintf;
use Throwable;
use UnexpectedValueException;

/**
* @internal
*
* @template-implements TransportInterface<"application/x-protobuf">
*/
final class GrpcTransport implements TransportInterface
{
Expand All @@ -51,14 +51,16 @@ public function __construct(
$this->headers = array_change_key_case($headers);
}

public function send(string $payload, string $contentType, ?CancellationInterface $cancellation = null): FutureInterface
public function contentType(): string
{
return 'application/x-protobuf';
}

public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface
{
if ($this->closed) {
return new ErrorFuture(new BadMethodCallException('Transport closed'));
}
if ($contentType !== 'application/x-protobuf') {
return new ErrorFuture(new UnexpectedValueException(sprintf('Unsupported content type "%s", grpc transport supports only application/x-protobuf', $contentType)));
}

$call = new Call($this->channel, $this->method, Timeval::infFuture());

Expand Down
12 changes: 12 additions & 0 deletions src/Contrib/Grpc/GrpcTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@

final class GrpcTransportFactory implements TransportFactoryInterface
{
/**
* @psalm-param "application/x-protobuf" $contentType
* @psalm-return TransportInterface<"application/x-protobuf">
*
* @psalm-suppress MoreSpecificImplementedParamType
* @psalm-suppress ImplementedReturnTypeMismatch
*/
public function create(
string $endpoint,
string $contentType = 'application/x-protobuf',
array $headers = [],
$compression = null,
float $timeout = 10.,
Expand All @@ -35,6 +43,10 @@ public function create(
if (!isset($parts['scheme'], $parts['host'], $parts['path'])) {
throw new InvalidArgumentException('Endpoint has to contain scheme, host and path');
}
/** @phpstan-ignore-next-line */
if ($contentType !== 'application/x-protobuf') {
throw new InvalidArgumentException(sprintf('Unsupported content type "%s", grpc transport supports only application/x-protobuf', $contentType));
}

$scheme = $parts['scheme'];
$method = $parts['path'];
Expand Down
6 changes: 3 additions & 3 deletions src/Contrib/Newrelic/Exporter.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public function __construct(
SpanConverter $spanConverter = null,
string $dataFormatVersion = Exporter::DATA_FORMAT_VERSION_DEFAULT
) {
$this->transport = (new PsrTransportFactory($client, $requestFactory, $streamFactory))->create($endpointUrl, [
$this->transport = (new PsrTransportFactory($client, $requestFactory, $streamFactory))->create($endpointUrl, 'application/json', [
'Api-Key' => $licenseKey,
'Data-Format' => 'newrelic',
'Data-Format-Version' => $dataFormatVersion,
Expand Down Expand Up @@ -93,10 +93,10 @@ public static function fromConnectionString(string $endpointUrl, string $name, $
);
}

public function export(iterable $spans, ?CancellationInterface $cancellation = null): FutureInterface
public function export(iterable $batch, ?CancellationInterface $cancellation = null): FutureInterface
{
return $this->transport
->send($this->serializeTrace($spans), 'application/json', $cancellation)
->send($this->serializeTrace($batch), $cancellation)
->map(static fn (): bool => true)
->catch(static function (Throwable $throwable): bool {
self::logError('Export failure', ['exception' => $throwable]);
Expand Down
87 changes: 87 additions & 0 deletions src/Contrib/Otlp/MetricExporter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\Contrib\Otlp;

use Opentelemetry\Proto\Collector\Metrics\V1\ExportMetricsServiceResponse;
use OpenTelemetry\SDK\Behavior\LogsMessagesTrait;
use OpenTelemetry\SDK\Common\Export\TransportInterface;
use OpenTelemetry\SDK\Metrics\Data\Temporality;
use OpenTelemetry\SDK\Metrics\MetricExporterInterface;
use OpenTelemetry\SDK\Metrics\MetricMetadataInterface;
use Throwable;

/**
* @psalm-import-type SUPPORTED_CONTENT_TYPES from ProtobufSerializer
*/
final class MetricExporter implements MetricExporterInterface
{
use LogsMessagesTrait;

private TransportInterface $transport;
private ProtobufSerializer $serializer;
private $temporality;

/**
* @param string|Temporality|null $temporality
*
* @psalm-param TransportInterface<SUPPORTED_CONTENT_TYPES> $transport
*/
public function __construct(TransportInterface $transport, $temporality = null)
{
$this->transport = $transport;
$this->serializer = ProtobufSerializer::forTransport($transport);
$this->temporality = $temporality;
}

public function temporality(MetricMetadataInterface $metric)
{
return $this->temporality ?? $metric->temporality();
}

public function export(iterable $batch): bool
{
return $this->transport
->send($this->serializer->serialize((new MetricConverter())->convert($batch)))
->map(function (?string $payload): bool {
if ($payload === null) {
return true;
}

$serviceResponse = new ExportMetricsServiceResponse();
$this->serializer->hydrate($serviceResponse, $payload);

$partialSuccess = $serviceResponse->getPartialSuccess();
if ($partialSuccess !== null && $partialSuccess->getRejectedDataPoints()) {
self::logError('Export partial success', [
'rejected_data_points' => $partialSuccess->getRejectedDataPoints(),
'error_message' => $partialSuccess->getErrorMessage(),
]);

return false;
}
if ($partialSuccess !== null && $partialSuccess->getErrorMessage()) {
self::logWarning('Export success with warnings/suggestions', ['error_message' => $partialSuccess->getErrorMessage()]);
}

return true;
})
->catch(static function (Throwable $throwable): bool {
self::logError('Export failure', ['exception' => $throwable]);

return false;
})
->await();
}

public function shutdown(): bool
{
return $this->transport->shutdown();
}

public function forceFlush(): bool
{
return $this->transport->forceFlush();
}
}
76 changes: 76 additions & 0 deletions src/Contrib/Otlp/ProtobufSerializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\Contrib\Otlp;

use AssertionError;
use Google\Protobuf\Internal\Message;
use InvalidArgumentException;
use OpenTelemetry\SDK\Common\Export\TransportInterface;
use function sprintf;

/**
* @internal
*
* @psalm-type SUPPORTED_CONTENT_TYPES = self::PROTOBUF|self::JSON|self::NDJSON
*/
final class ProtobufSerializer
{
private const PROTOBUF = 'application/x-protobuf';
private const JSON = 'application/json';
private const NDJSON = 'application/x-ndjson';

private string $contentType;

private function __construct(string $contentType)
{
$this->contentType = $contentType;
}

/**
* @param TransportInterface<SUPPORTED_CONTENT_TYPES> $transport
*/
public static function forTransport(TransportInterface $transport): ProtobufSerializer
{
switch ($contentType = $transport->contentType()) {
case self::PROTOBUF:
case self::JSON:
case self::NDJSON:
return new self($contentType);
default:
throw new InvalidArgumentException(sprintf('Not supported content type "%s"', $contentType));
}
}

public function serialize(Message $message): string
{
switch ($this->contentType) {
case self::PROTOBUF:
return $message->serializeToString();
case self::JSON:
return $message->serializeToJsonString();
case self::NDJSON:
return $message->serializeToJsonString() . "\n";
default:
throw new AssertionError();
}
}

public function hydrate(Message $message, string $payload): void
{
switch ($this->contentType) {
case self::PROTOBUF:
$message->mergeFromString($payload);

break;
case self::JSON:
case self::NDJSON:
$message->mergeFromJsonString($payload);

break;
default:
throw new AssertionError();
}
}
}
77 changes: 77 additions & 0 deletions src/Contrib/Otlp/SpanExporter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\Contrib\Otlp;

use Opentelemetry\Proto\Collector\Trace\V1\ExportTraceServiceResponse;
use OpenTelemetry\SDK\Behavior\LogsMessagesTrait;
use OpenTelemetry\SDK\Common\Export\TransportInterface;
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Common\Future\FutureInterface;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
use Throwable;

/**
* @psalm-import-type SUPPORTED_CONTENT_TYPES from ProtobufSerializer
*/
final class SpanExporter implements SpanExporterInterface
{
use LogsMessagesTrait;

private TransportInterface $transport;
private ProtobufSerializer $serializer;

/**
* @psalm-param TransportInterface<SUPPORTED_CONTENT_TYPES> $transport
*/
public function __construct(TransportInterface $transport)
{
$this->transport = $transport;
$this->serializer = ProtobufSerializer::forTransport($transport);
}

public function export(iterable $batch, ?CancellationInterface $cancellation = null): FutureInterface
{
return $this->transport
->send($this->serializer->serialize((new SpanConverter())->convert($batch)), $cancellation)
->map(function (?string $payload): bool {
if ($payload === null) {
return true;
}

$serviceResponse = new ExportTraceServiceResponse();
$this->serializer->hydrate($serviceResponse, $payload);

$partialSuccess = $serviceResponse->getPartialSuccess();
if ($partialSuccess !== null && $partialSuccess->getRejectedSpans()) {
self::logError('Export partial success', [
'rejected_spans' => $partialSuccess->getRejectedSpans(),
'error_message' => $partialSuccess->getErrorMessage(),
]);

return false;
}
if ($partialSuccess !== null && $partialSuccess->getErrorMessage()) {
self::logWarning('Export success with warnings/suggestions', ['error_message' => $partialSuccess->getErrorMessage()]);
}

return true;
})
->catch(static function (Throwable $throwable): bool {
self::logError('Export failure', ['exception' => $throwable]);

return false;
});
}

public function shutdown(?CancellationInterface $cancellation = null): bool
{
return $this->transport->shutdown($cancellation);
}

public function forceFlush(?CancellationInterface $cancellation = null): bool
{
return $this->transport->forceFlush($cancellation);
}
}
Loading

0 comments on commit 3866a82

Please sign in to comment.