Skip to content

Commit

Permalink
Improve stream test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
Nevay committed Jul 17, 2022
1 parent 23b85f2 commit c855193
Show file tree
Hide file tree
Showing 3 changed files with 346 additions and 0 deletions.
9 changes: 9 additions & 0 deletions tests/Unit/SDK/Metrics/Stream/DeltaStorageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ public function test_storage_returns_inserted_metric_cumulative(): void
}
}

public function test_storage_does_not_return_zero_reader_metric(): void
{
$ds = new DeltaStorage(new SumAggregation());

$ds->add(new Metric([Attributes::create(['a'])], [new SumSummary(3)], 0, 0), 0b0);

$this->assertNull($ds->collect(0));
}

public function test_storage_keeps_metrics_for_additional_readers(): void
{
$ds = new DeltaStorage(new SumAggregation());
Expand Down
202 changes: 202 additions & 0 deletions tests/Unit/SDK/Metrics/Stream/MetricStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@

namespace OpenTelemetry\Tests\Unit\SDK\Metrics\Stream;

use function current;
use function extension_loaded;
use OpenTelemetry\API\Metrics\ObserverInterface;
use OpenTelemetry\Context\Context;
use OpenTelemetry\SDK\Common\Attribute\Attributes;
use OpenTelemetry\SDK\Metrics\Aggregation\SumAggregation;
use OpenTelemetry\SDK\Metrics\AttributeProcessor\FilteredAttributeProcessor;
use OpenTelemetry\SDK\Metrics\Data;
use OpenTelemetry\SDK\Metrics\Data\Temporality;
use OpenTelemetry\SDK\Metrics\Exemplar\ExemplarReservoirInterface;
use OpenTelemetry\SDK\Metrics\Stream\AsynchronousMetricStream;
use OpenTelemetry\SDK\Metrics\Stream\MetricAggregator;
use OpenTelemetry\SDK\Metrics\Stream\StreamWriter;
use OpenTelemetry\SDK\Metrics\Stream\SynchronousMetricStream;
use const PHP_INT_SIZE;
use PHPUnit\Framework\TestCase;

/**
Expand Down Expand Up @@ -213,4 +220,199 @@ public function test_synchronous_multiple_data_points(): void
new Data\NumberDataPoint(7, Attributes::create(['status' => 400]), 3, 8),
], Temporality::CUMULATIVE, false), $s->collect($c, 8));
}

public function test_asynchronous_temporality(): void
{
$s = new AsynchronousMetricStream(Attributes::factory(), null, new SumAggregation(), null, fn (ObserverInterface $observer) => $observer->observe(1), 3);
$this->assertSame(Temporality::CUMULATIVE, $s->temporality());
}

public function test_synchronous_temporality(): void
{
$s = new SynchronousMetricStream(null, new SumAggregation(), null, 3);
$this->assertSame(Temporality::DELTA, $s->temporality());
}

public function test_asynchronous_collection_timestamp_returns_last_collection_timestamp(): void
{
$s = new AsynchronousMetricStream(Attributes::factory(), null, new SumAggregation(), null, fn (ObserverInterface $observer) => $observer->observe(1), 3);
$this->assertSame(3, $s->collectionTimestamp());

$s->collect(0, 5);
$this->assertSame(5, $s->collectionTimestamp());
}

public function test_synchronous_collection_timestamp_returns_last_collection_timestamp(): void
{
$s = new SynchronousMetricStream(null, new SumAggregation(), null, 3);
$this->assertSame(3, $s->collectionTimestamp());

$s->collect(0, 5);
$this->assertSame(5, $s->collectionTimestamp());
}

public function test_asynchronous_unregister_removes_reader(): void
{
/** @var int|null $value */
$value = null;

$s = new AsynchronousMetricStream(Attributes::factory(), null, new SumAggregation(), null, function (ObserverInterface $observer) use (&$value): void {
if ($value !== null) {
$observer->observe($value);
}
}, 3);

$value = 5;
$d = $s->register(Temporality::DELTA);
$s->collect($d, 5);
$s->unregister($d);

// Implementation treats unknown reader as cumulative reader
$this->assertEquals(new Data\Sum([
new Data\NumberDataPoint(5, Attributes::create([]), 3, 7),
], Temporality::CUMULATIVE, false), $s->collect($d, 7));
}

public function test_synchronous_unregister_removes_reader(): void
{
$s = new SynchronousMetricStream(null, new SumAggregation(), null, 3);
$w = new StreamWriter(null, Attributes::factory(), $s->writable());

$c = $s->register(Temporality::CUMULATIVE);
$w->record(5, [], null, 4);
$s->collect($c, 5);
$s->unregister($c);

$w->record(-5, [], null, 6);
$this->assertEquals(new Data\Sum([
], Temporality::DELTA, false), $s->collect($c, 7));
}

public function test_asynchronous_unregister_invalid_does_not_affect_reader(): void
{
/** @var int|null $value */
$value = null;

$s = new AsynchronousMetricStream(Attributes::factory(), null, new SumAggregation(), null, function (ObserverInterface $observer) use (&$value): void {
if ($value !== null) {
$observer->observe($value);
}
}, 3);

$value = 5;
$d = $s->register(Temporality::DELTA);
$s->collect($d, 5);
$s->unregister($d + 1);

$this->assertEquals(new Data\Sum([
new Data\NumberDataPoint(0, Attributes::create([]), 5, 7),
], Temporality::DELTA, false), $s->collect($d, 7));
}

public function test_synchronous_unregister_invalid_does_not_affect_reader(): void
{
$s = new SynchronousMetricStream(null, new SumAggregation(), null, 3);
$w = new StreamWriter(null, Attributes::factory(), $s->writable());

$c = $s->register(Temporality::CUMULATIVE);
$w->record(5, [], null, 4);
$s->collect($c, 5);
$s->unregister($c + 1);

$w->record(-5, [], null, 6);
$this->assertEquals(new Data\Sum([
new Data\NumberDataPoint(0, Attributes::create([]), 3, 7),
], Temporality::CUMULATIVE, false), $s->collect($c, 7));
}

public function test_synchronous_reader_limit_exceeded_triggers_warning(): void
{
if (extension_loaded('gmp')) {
$this->markTestSkipped();
}

$s = new SynchronousMetricStream(null, new SumAggregation(), null, 3);

for ($i = 0; $i < (PHP_INT_SIZE << 3) - 1; $i++) {
$s->register(Temporality::DELTA);
}

$this->expectWarning();
$this->expectWarningMessageMatches('/^GMP extension required to register over \d++ readers$/');
$s->register(Temporality::DELTA);
}

public function test_synchronous_reader_limit_exceeded_returns_noop_reader(): void
{
if (extension_loaded('gmp')) {
$this->markTestSkipped();
}

$s = new SynchronousMetricStream(null, new SumAggregation(), null, 3);
$w = new StreamWriter(null, Attributes::factory(), $s->writable());

for ($i = 0; $i < (PHP_INT_SIZE << 3) - 1; $i++) {
$s->register(Temporality::DELTA);
}

$d = @$s->register(Temporality::DELTA);

$w->record(5, [], null, 4);
$this->assertEquals(new Data\Sum([
], Temporality::DELTA, false), $s->collect($d, 5));
}

public function test_synchronous_reader_limit_does_not_apply_if_gmp_available(): void
{
if (!extension_loaded('gmp')) {
$this->markTestSkipped();
}

$s = new SynchronousMetricStream(null, new SumAggregation(), null, 3);
$w = new StreamWriter(null, Attributes::factory(), $s->writable());

for ($i = 0; $i < (PHP_INT_SIZE << 3) - 1; $i++) {
$s->register(Temporality::DELTA);
}

$d = $s->register(Temporality::DELTA);

$w->record(5, [], null, 4);
$this->assertEquals(new Data\Sum([
new Data\NumberDataPoint(5, Attributes::create([]), 3, 5),
], Temporality::DELTA, false), $s->collect($d, 5));
}

public function test_metric_aggregator_applies_attribute_filter(): void
{
$aggregator = new MetricAggregator(new FilteredAttributeProcessor(Attributes::factory(), ['foo', 'bar']), new SumAggregation(), null);
$aggregator->record(5, Attributes::create(['foo' => 1, 'bar' => 2, 'baz' => 3]), Context::getRoot(), 0);

$this->assertEquals(
Attributes::create(['foo' => 1, 'bar' => 2]),
current($aggregator->collect(1)->attributes),
);
}

public function test_metric_aggregator_forwards_to_exemplar_filter(): void
{
$exemplarReservoir = $this->createMock(ExemplarReservoirInterface::class);
$exemplarReservoir->expects($this->once())->method('offer')->with($this->anything(), 5, Attributes::create(['foo' => 1]), Context::getRoot(), 3, 0);
$aggregator = new MetricAggregator(new FilteredAttributeProcessor(Attributes::factory(), ['foo', 'bar']), new SumAggregation(), $exemplarReservoir);
$aggregator->record(5, Attributes::create(['foo' => 1]), Context::getRoot(), 3);
}

public function test_metric_aggregator_exemplars_provides_current_revision_range(): void
{
$exemplars = [
[new Data\Exemplar(5, 3, Attributes::create([]), null, null)],
];
$exemplarReservoir = $this->createMock(ExemplarReservoirInterface::class);
$exemplarReservoir->expects($this->once())->method('collect')->with($this->anything(), 0, 1)->willReturn($exemplars);
$aggregator = new MetricAggregator(new FilteredAttributeProcessor(Attributes::factory(), ['foo', 'bar']), new SumAggregation(), $exemplarReservoir);
$aggregator->record(5, Attributes::create([]), Context::getRoot(), 3);
$metric = $aggregator->collect(2);

$this->assertSame($exemplars, $aggregator->exemplars($metric));
}
}
135 changes: 135 additions & 0 deletions tests/Unit/SDK/Metrics/Stream/StreamWriterTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\Tests\Unit\SDK\Metrics\Stream;

use OpenTelemetry\Context\Context;
use OpenTelemetry\Context\ContextStorageInterface;
use OpenTelemetry\SDK\Common\Attribute\Attributes;
use OpenTelemetry\SDK\Metrics\Stream\MultiStreamWriter;
use OpenTelemetry\SDK\Metrics\Stream\StreamWriter;
use OpenTelemetry\SDK\Metrics\Stream\WritableMetricStreamInterface;
use PHPUnit\Framework\TestCase;

final class StreamWriterTest extends TestCase
{
/**
* @covers \OpenTelemetry\SDK\Metrics\Stream\StreamWriter
*/
public function test_stream_writer(): void
{
$stream = $this->createMock(WritableMetricStreamInterface::class);
$stream->expects($this->once())->method('record')->with(5, Attributes::create(['foo' => 1]), $this->anything(), 3);

$w = new StreamWriter(null, Attributes::factory(), $stream);
$w->record(5, ['foo' => 1], null, 3);
}

/**
* @covers \OpenTelemetry\SDK\Metrics\Stream\MultiStreamWriter
*/
public function test_multi_stream_writer(): void
{
$streams = [];
for ($i = 0; $i < 3; $i++) {
$stream = $this->createMock(WritableMetricStreamInterface::class);
$stream->expects($this->once())->method('record')->with(5, Attributes::create(['foo' => 1]), $this->anything(), 3);

$streams[] = $stream;
}

$w = new MultiStreamWriter(null, Attributes::factory(), $streams);
$w->record(5, ['foo' => 1], null, 3);
}

/**
* @covers \OpenTelemetry\SDK\Metrics\Stream\StreamWriter
*/
public function test_stream_writer_provides_current_context_on_context_null(): void
{
$context = Context::getRoot()->with(Context::createKey('-'), 5);
$contextStorage = $this->createMock(ContextStorageInterface::class);
$contextStorage->method('current')->willReturn($context);

$stream = $this->createMock(WritableMetricStreamInterface::class);
$stream->expects($this->once())->method('record')->with(5, Attributes::create(['foo' => 1]), $context, 3);

$w = new StreamWriter($contextStorage, Attributes::factory(), $stream);
$w->record(5, ['foo' => 1], null, 3);
}

/**
* @covers \OpenTelemetry\SDK\Metrics\Stream\StreamWriter
*/
public function test_stream_writer_context_provides_context_on_context_provided(): void
{
$context = Context::getRoot()->with(Context::createKey('-'), 5);
$contextStorage = $this->createMock(ContextStorageInterface::class);

$stream = $this->createMock(WritableMetricStreamInterface::class);
$stream->expects($this->once())->method('record')->with(5, Attributes::create(['foo' => 1]), $context, 3);

$w = new StreamWriter($contextStorage, Attributes::factory(), $stream);
$w->record(5, ['foo' => 1], $context, 3);
}

/**
* @covers \OpenTelemetry\SDK\Metrics\Stream\StreamWriter
*/
public function test_stream_writer_context_provides_root_context_on_context_false(): void
{
$contextStorage = $this->createMock(ContextStorageInterface::class);

$stream = $this->createMock(WritableMetricStreamInterface::class);
$stream->expects($this->once())->method('record')->with(5, Attributes::create(['foo' => 1]), Context::getRoot(), 3);

$w = new StreamWriter($contextStorage, Attributes::factory(), $stream);
$w->record(5, ['foo' => 1], false, 3);
}

/**
* @covers \OpenTelemetry\SDK\Metrics\Stream\MultiStreamWriter
*/
public function test_multi_stream_writer_provides_current_context_on_context_null(): void
{
$context = Context::getRoot()->with(Context::createKey('-'), 5);
$contextStorage = $this->createMock(ContextStorageInterface::class);
$contextStorage->method('current')->willReturn($context);

$stream = $this->createMock(WritableMetricStreamInterface::class);
$stream->expects($this->once())->method('record')->with(5, Attributes::create(['foo' => 1]), $context, 3);

$w = new MultiStreamWriter($contextStorage, Attributes::factory(), [$stream]);
$w->record(5, ['foo' => 1], null, 3);
}

/**
* @covers \OpenTelemetry\SDK\Metrics\Stream\MultiStreamWriter
*/
public function test_multi_stream_writer_context_provides_context_on_context_provided(): void
{
$context = Context::getRoot()->with(Context::createKey('-'), 5);
$contextStorage = $this->createMock(ContextStorageInterface::class);

$stream = $this->createMock(WritableMetricStreamInterface::class);
$stream->expects($this->once())->method('record')->with(5, Attributes::create(['foo' => 1]), $context, 3);

$w = new MultiStreamWriter($contextStorage, Attributes::factory(), [$stream]);
$w->record(5, ['foo' => 1], $context, 3);
}

/**
* @covers \OpenTelemetry\SDK\Metrics\Stream\MultiStreamWriter
*/
public function test_multi_stream_writer_context_provides_root_context_on_context_false(): void
{
$contextStorage = $this->createMock(ContextStorageInterface::class);

$stream = $this->createMock(WritableMetricStreamInterface::class);
$stream->expects($this->once())->method('record')->with(5, Attributes::create(['foo' => 1]), Context::getRoot(), 3);

$w = new MultiStreamWriter($contextStorage, Attributes::factory(), [$stream]);
$w->record(5, ['foo' => 1], false, 3);
}
}

0 comments on commit c855193

Please sign in to comment.