Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport - move contentType from ::send() argument to transport property #831

Merged
merged 1 commit into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ update: ## Update dependencies
$(DC_RUN_PHP) env XDEBUG_MODE=off composer update
test: test-unit test-integration ## Run unit and integration tests
test-unit: ## Run unit tests
$(DC_RUN_PHP) env XDEBUG_MODE=coverage vendor/bin/phpunit --testsuite unit --colors=always --coverage-text --testdox --coverage-clover coverage.clover --coverage-html=tests/coverage/html --log-junit=junit.xml
$(DC_RUN_PHP) env XDEBUG_MODE=coverage vendor/bin/phpunit --testsuite unit --colors=always --testdox --coverage-clover coverage.clover --coverage-html=tests/coverage/html --log-junit=junit.xml
brettmc marked this conversation as resolved.
Show resolved Hide resolved
test-integration: ## Run integration tests
$(DC_RUN_PHP) env XDEBUG_MODE=off vendor/bin/phpunit --testsuite integration --colors=always
test-coverage: ## Run units tests and generate code coverage
Expand Down
3 changes: 1 addition & 2 deletions deptrac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ deptrac:
- Composer
Contrib:
- +SDK
- OtelProto
- Grpc
- +OtelProto
- Prometheus
- Thrift
- JaegerThrift
Expand Down
5 changes: 3 additions & 2 deletions examples/metrics/basic.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

require_once __DIR__ . '/../../vendor/autoload.php';

use OpenTelemetry\Contrib\Otlp\StreamMetricExporter;
use OpenTelemetry\Contrib\Otlp\MetricExporter;
use OpenTelemetry\SDK\Common\Attribute\Attributes;
use OpenTelemetry\SDK\Common\Export\Stream\StreamTransportFactory;
use OpenTelemetry\SDK\Common\Instrumentation\InstrumentationScopeFactory;
use OpenTelemetry\SDK\Common\Time\ClockFactory;
use OpenTelemetry\SDK\Metrics\Aggregation\ExplicitBucketHistogramAggregation;
Expand All @@ -20,7 +21,7 @@
use OpenTelemetry\SDK\Resource\ResourceInfoFactory;

$clock = ClockFactory::getDefault();
$reader = new ExportingReader(new StreamMetricExporter(STDOUT, /*Temporality::CUMULATIVE*/), $clock);
$reader = new ExportingReader(new MetricExporter((new StreamTransportFactory())->create(STDOUT, 'application/x-ndjson'), /*Temporality::CUMULATIVE*/), $clock);

// Let's imagine we export the metrics as Histogram, and to simplify the story we will only have one histogram bucket (-Inf, +Inf):
$views = new CriteriaViewRegistry();
Expand Down
14 changes: 8 additions & 6 deletions src/Contrib/Grpc/GrpcTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
use OpenTelemetry\SDK\Common\Future\FutureInterface;
use OpenTelemetry\SDK\Common\Future\NullCancellation;
use RuntimeException;
use function sprintf;
use Throwable;
use UnexpectedValueException;

/**
* @internal
*
* @template-implements TransportInterface<"application/x-protobuf">
*/
final class GrpcTransport implements TransportInterface
{
Expand All @@ -51,14 +51,16 @@ public function __construct(
$this->headers = array_change_key_case($headers);
}

public function send(string $payload, string $contentType, ?CancellationInterface $cancellation = null): FutureInterface
public function contentType(): string
{
return 'application/x-protobuf';
}

public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface
{
if ($this->closed) {
return new ErrorFuture(new BadMethodCallException('Transport closed'));
}
if ($contentType !== 'application/x-protobuf') {
return new ErrorFuture(new UnexpectedValueException(sprintf('Unsupported content type "%s", grpc transport supports only application/x-protobuf', $contentType)));
}

$call = new Call($this->channel, $this->method, Timeval::infFuture());

Expand Down
12 changes: 12 additions & 0 deletions src/Contrib/Grpc/GrpcTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@

final class GrpcTransportFactory implements TransportFactoryInterface
{
/**
* @psalm-param "application/x-protobuf" $contentType
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't find a solution that works with template type on TransportFactoryInterface.

* @psalm-return TransportInterface<"application/x-protobuf">
*
* @psalm-suppress MoreSpecificImplementedParamType
* @psalm-suppress ImplementedReturnTypeMismatch
*/
public function create(
string $endpoint,
string $contentType = 'application/x-protobuf',
array $headers = [],
$compression = null,
float $timeout = 10.,
Expand All @@ -35,6 +43,10 @@ public function create(
if (!isset($parts['scheme'], $parts['host'], $parts['path'])) {
throw new InvalidArgumentException('Endpoint has to contain scheme, host and path');
}
/** @phpstan-ignore-next-line */
if ($contentType !== 'application/x-protobuf') {
throw new InvalidArgumentException(sprintf('Unsupported content type "%s", grpc transport supports only application/x-protobuf', $contentType));
}

$scheme = $parts['scheme'];
$method = $parts['path'];
Expand Down
6 changes: 3 additions & 3 deletions src/Contrib/Newrelic/Exporter.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public function __construct(
SpanConverter $spanConverter = null,
string $dataFormatVersion = Exporter::DATA_FORMAT_VERSION_DEFAULT
) {
$this->transport = (new PsrTransportFactory($client, $requestFactory, $streamFactory))->create($endpointUrl, [
$this->transport = (new PsrTransportFactory($client, $requestFactory, $streamFactory))->create($endpointUrl, 'application/json', [
'Api-Key' => $licenseKey,
'Data-Format' => 'newrelic',
'Data-Format-Version' => $dataFormatVersion,
Expand Down Expand Up @@ -93,10 +93,10 @@ public static function fromConnectionString(string $endpointUrl, string $name, $
);
}

public function export(iterable $spans, ?CancellationInterface $cancellation = null): FutureInterface
public function export(iterable $batch, ?CancellationInterface $cancellation = null): FutureInterface
{
return $this->transport
->send($this->serializeTrace($spans), 'application/json', $cancellation)
->send($this->serializeTrace($batch), $cancellation)
->map(static fn (): bool => true)
->catch(static function (Throwable $throwable): bool {
self::logError('Export failure', ['exception' => $throwable]);
Expand Down
87 changes: 87 additions & 0 deletions src/Contrib/Otlp/MetricExporter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\Contrib\Otlp;

use Opentelemetry\Proto\Collector\Metrics\V1\ExportMetricsServiceResponse;
use OpenTelemetry\SDK\Behavior\LogsMessagesTrait;
use OpenTelemetry\SDK\Common\Export\TransportInterface;
use OpenTelemetry\SDK\Metrics\Data\Temporality;
use OpenTelemetry\SDK\Metrics\MetricExporterInterface;
use OpenTelemetry\SDK\Metrics\MetricMetadataInterface;
use Throwable;

/**
* @psalm-import-type SUPPORTED_CONTENT_TYPES from ProtobufSerializer
*/
final class MetricExporter implements MetricExporterInterface
{
use LogsMessagesTrait;

private TransportInterface $transport;
private ProtobufSerializer $serializer;
private $temporality;

/**
* @param string|Temporality|null $temporality
*
* @psalm-param TransportInterface<SUPPORTED_CONTENT_TYPES> $transport
*/
public function __construct(TransportInterface $transport, $temporality = null)
{
$this->transport = $transport;
$this->serializer = ProtobufSerializer::forTransport($transport);
$this->temporality = $temporality;
}

public function temporality(MetricMetadataInterface $metric)
{
return $this->temporality ?? $metric->temporality();
}

public function export(iterable $batch): bool
{
return $this->transport
->send($this->serializer->serialize((new MetricConverter())->convert($batch)))
->map(function (?string $payload): bool {
if ($payload === null) {
return true;
}

$serviceResponse = new ExportMetricsServiceResponse();
$this->serializer->hydrate($serviceResponse, $payload);

$partialSuccess = $serviceResponse->getPartialSuccess();
if ($partialSuccess !== null && $partialSuccess->getRejectedDataPoints()) {
self::logError('Export partial success', [
'rejected_data_points' => $partialSuccess->getRejectedDataPoints(),
'error_message' => $partialSuccess->getErrorMessage(),
]);

return false;
}
if ($partialSuccess !== null && $partialSuccess->getErrorMessage()) {
self::logWarning('Export success with warnings/suggestions', ['error_message' => $partialSuccess->getErrorMessage()]);
}

return true;
})
->catch(static function (Throwable $throwable): bool {
self::logError('Export failure', ['exception' => $throwable]);

return false;
})
->await();
}

public function shutdown(): bool
{
return $this->transport->shutdown();
}

public function forceFlush(): bool
{
return $this->transport->forceFlush();
}
}
76 changes: 76 additions & 0 deletions src/Contrib/Otlp/ProtobufSerializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\Contrib\Otlp;

use AssertionError;
use Google\Protobuf\Internal\Message;
use InvalidArgumentException;
use OpenTelemetry\SDK\Common\Export\TransportInterface;
use function sprintf;

/**
* @internal
*
* @psalm-type SUPPORTED_CONTENT_TYPES = self::PROTOBUF|self::JSON|self::NDJSON
*/
final class ProtobufSerializer
{
private const PROTOBUF = 'application/x-protobuf';
private const JSON = 'application/json';
private const NDJSON = 'application/x-ndjson';

private string $contentType;

private function __construct(string $contentType)
{
$this->contentType = $contentType;
}

/**
* @param TransportInterface<SUPPORTED_CONTENT_TYPES> $transport
*/
public static function forTransport(TransportInterface $transport): ProtobufSerializer
{
switch ($contentType = $transport->contentType()) {
case self::PROTOBUF:
case self::JSON:
case self::NDJSON:
return new self($contentType);
default:
throw new InvalidArgumentException(sprintf('Not supported content type "%s"', $contentType));
}
}

public function serialize(Message $message): string
{
switch ($this->contentType) {
case self::PROTOBUF:
return $message->serializeToString();
case self::JSON:
return $message->serializeToJsonString();
case self::NDJSON:
return $message->serializeToJsonString() . "\n";
default:
throw new AssertionError();
}
}

public function hydrate(Message $message, string $payload): void
{
switch ($this->contentType) {
case self::PROTOBUF:
$message->mergeFromString($payload);

break;
case self::JSON:
case self::NDJSON:
$message->mergeFromJsonString($payload);

break;
default:
throw new AssertionError();
}
}
}
77 changes: 77 additions & 0 deletions src/Contrib/Otlp/SpanExporter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\Contrib\Otlp;

use Opentelemetry\Proto\Collector\Trace\V1\ExportTraceServiceResponse;
use OpenTelemetry\SDK\Behavior\LogsMessagesTrait;
use OpenTelemetry\SDK\Common\Export\TransportInterface;
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Common\Future\FutureInterface;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
use Throwable;

/**
* @psalm-import-type SUPPORTED_CONTENT_TYPES from ProtobufSerializer
*/
final class SpanExporter implements SpanExporterInterface
{
use LogsMessagesTrait;

private TransportInterface $transport;
private ProtobufSerializer $serializer;

/**
* @psalm-param TransportInterface<SUPPORTED_CONTENT_TYPES> $transport
*/
public function __construct(TransportInterface $transport)
{
$this->transport = $transport;
$this->serializer = ProtobufSerializer::forTransport($transport);
}

public function export(iterable $batch, ?CancellationInterface $cancellation = null): FutureInterface
{
return $this->transport
->send($this->serializer->serialize((new SpanConverter())->convert($batch)), $cancellation)
->map(function (?string $payload): bool {
if ($payload === null) {
return true;
}

$serviceResponse = new ExportTraceServiceResponse();
$this->serializer->hydrate($serviceResponse, $payload);

$partialSuccess = $serviceResponse->getPartialSuccess();
if ($partialSuccess !== null && $partialSuccess->getRejectedSpans()) {
self::logError('Export partial success', [
'rejected_spans' => $partialSuccess->getRejectedSpans(),
'error_message' => $partialSuccess->getErrorMessage(),
]);

return false;
}
if ($partialSuccess !== null && $partialSuccess->getErrorMessage()) {
self::logWarning('Export success with warnings/suggestions', ['error_message' => $partialSuccess->getErrorMessage()]);
}

return true;
})
->catch(static function (Throwable $throwable): bool {
self::logError('Export failure', ['exception' => $throwable]);

return false;
});
}

public function shutdown(?CancellationInterface $cancellation = null): bool
{
return $this->transport->shutdown($cancellation);
}

public function forceFlush(?CancellationInterface $cancellation = null): bool
{
return $this->transport->forceFlush($cancellation);
}
}
Loading