From 7f81ec5ff38a2eb00a0ca180f1e35b23ec8765fb Mon Sep 17 00:00:00 2001 From: Pablo Collins Date: Mon, 18 Sep 2023 12:21:56 -0400 Subject: [PATCH 1/7] work in progress: add simplified bsp and grpc exporter --- .../sdk/trace/export/experimental/client.py | 102 +++++++++++ .../sdk/trace/export/experimental/exporter.py | 40 +++++ .../trace/export/experimental/processor.py | 63 +++++++ .../sdk/trace/export/experimental/timer.py | 111 ++++++++++++ .../sdk/trace/export/experimental/util.py | 25 +++ .../tests/trace/export/test_experimental.py | 166 ++++++++++++++++++ 6 files changed, 507 insertions(+) create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/client.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/exporter.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/util.py create mode 100644 opentelemetry-sdk/tests/trace/export/test_experimental.py diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/client.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/client.py new file mode 100644 index 00000000000..b64a4ee440c --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/client.py @@ -0,0 +1,102 @@ +import abc +import time +import typing + +import grpc + +from opentelemetry.exporter.otlp.proto.common._internal import trace_encoder +from opentelemetry.proto.collector.trace.v1 import trace_service_pb2_grpc as pb +from opentelemetry.sdk.trace import ReadableSpan + + +class GrpcClientABC(abc.ABC): + + @abc.abstractmethod + def send(self, batch: typing.Sequence[ReadableSpan]) -> grpc.StatusCode: + pass + + +class GrpcClient(GrpcClientABC): + """ + Exports a batch of spans to the specified endpoint. Wrap this in a RetryingGrpcClient for retry/backoff. + """ + + def __init__( + self, + target: str = 'localhost:4317', + timeout_sec: int = 10, + ): + self._timeout_sec = timeout_sec + self._stub = pb.TraceServiceStub(grpc.insecure_channel(target)) + + def send(self, batch: typing.Sequence[ReadableSpan]) -> grpc.StatusCode: + try: + self._stub.Export(request=trace_encoder.encode_spans(batch), timeout=self._timeout_sec) + except grpc.RpcError as err: + # noinspection PyUnresolvedReferences + return err.code() + return grpc.StatusCode.OK + + +class RetryingGrpcClient(GrpcClientABC): + """ + A GRPC client implementation that wraps another GRPC client and retries failed requests using exponential backoff. + The `sleepfunc` arg can be passed in to fake time-based sleeping for testing. + """ + + def __init__( + self, + client: GrpcClientABC, + sleepfunc: typing.Callable[[int], None] = time.sleep, + max_retries: int = 4, + initial_sleep_time_sec: int = 0.5, + ): + self._client = client + self._sleep = sleepfunc + self._max_retries = max_retries + self._initial_sleep_time_sec = initial_sleep_time_sec + + def send(self, batch: typing.Sequence[ReadableSpan]) -> grpc.StatusCode: + sleep_time_sec = self._initial_sleep_time_sec + out = grpc.StatusCode.OK + for i in range(self._max_retries): + out = self._client.send(batch) + if out == grpc.StatusCode.OK: + return grpc.StatusCode.OK + self._sleep(sleep_time_sec) + sleep_time_sec *= 2 + return out + + +class FakeGrpcClient(GrpcClientABC): + """ + A fake GRPC client that can be used for testing. To fake status codes, optionally set the `status_codes` arg to a + list/tuple of status codes you want the send() method to return. If there are more calls to send() than there are + status codes, the last status code is reused. Set the `sleep_time_sec` arg to a positive value to sleep every time + there is a send(), simulating network transmission time. + """ + + def __init__( + self, + status_codes: typing.List[grpc.StatusCode] = (grpc.StatusCode.OK,), + sleep_time_sec: int = 0, + ): + self._status_codes = status_codes + self._sleep_time_sec = sleep_time_sec + self._batches = [] + + def send(self, batch: typing.Sequence[ReadableSpan]) -> grpc.StatusCode: + time.sleep(self._sleep_time_sec) + self._batches.append(batch) + num_sends = len(self._batches) + idx = min(num_sends, len(self._status_codes)) - 1 + return self._status_codes[idx] + + def get_batches(self): + return self._batches + + def num_batches(self): + return len(self._batches) + + def num_spans_in_batch(self, idx: int): + return len(self._batches[idx]) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/exporter.py new file mode 100644 index 00000000000..ad424877d41 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/exporter.py @@ -0,0 +1,40 @@ +import typing +from enum import Enum + +import grpc + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.sdk.trace.export.experimental.client import GrpcClientABC, RetryingGrpcClient, GrpcClient + + +class ExporterFlushResult(Enum): + SUCCESS = 1 + FAILURE = 2 + TIMEOUT = 3 + + +class OTLPSpanExporter(SpanExporter): + """ + An implementation of SpanExporter. Accepts an optional client. If one is not supplied, creates a retrying client. + Sends spans immediately -- has no queue to flush or separate thread to shut down. + """ + + def __init__(self, client: GrpcClientABC = RetryingGrpcClient(GrpcClient())): + self._client = client + + def export(self, batch: typing.Sequence[ReadableSpan]) -> SpanExportResult: + status_code = self._client.send(batch) + return SpanExportResult.SUCCESS if status_code == grpc.StatusCode.OK else SpanExportResult.FAILURE + + def force_flush(self, timeout_millis: int = 30000) -> ExporterFlushResult: + """ + Nothing to flush. + """ + return ExporterFlushResult.SUCCESS + + def shutdown(self) -> None: + """ + Nothing to shut down. + """ + pass diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py new file mode 100644 index 00000000000..204c7a39243 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py @@ -0,0 +1,63 @@ +import typing + +from opentelemetry.context import Context +from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, Span +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.sdk.trace.export.experimental.timer import TimerABC, PeriodicTimer +from opentelemetry.sdk.trace.export.experimental.util import SpanAccumulator + + +class BatchSpanProcessor(SpanProcessor): + """ + A SpanProcessor that sends spans in batches on an interval or when a maximum number of spans has been reached, + whichever comes first. + """ + + def __init__( + self, + exporter: SpanExporter, + max_batch_size: int = 1024, + interval_sec: int = 4, + timer: typing.Optional[TimerABC] = None, + ): + self._exporter = exporter + self._max_batch_size = max_batch_size + self._accumulator = SpanAccumulator() + self._timer = timer or PeriodicTimer(interval_sec) + self._timer.set_callback(self._export) + self._timer.start() + + def on_start(self, span: Span, parent_context: typing.Optional[Context] = None) -> None: + pass + + def on_end(self, span: ReadableSpan) -> None: + """ + This method must be extremely fast. It adds the span to the accumulator for later sending and pokes the timer + if the number of spans waiting to be sent has reached the maximum batch size. + """ + size = self._accumulator.push(span) + if size >= self._max_batch_size: + self._timer.poke() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """ + Stops the timer, exports any spans in the accumulator then restarts the timer. + """ + self._timer.stop() + self._exporter.force_flush(timeout_millis) + result = self._export() + self._timer.start() + return result == SpanExportResult.SUCCESS + + def shutdown(self) -> None: + self._timer.stop() + self._export() + self._exporter.shutdown() + + def _export(self) -> SpanExportResult: + batch = self._accumulator.batch() + if len(batch) > 0: + out = self._exporter.export(batch) + else: + out = SpanExportResult.SUCCESS + return out diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py new file mode 100644 index 00000000000..35a5800d56a --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py @@ -0,0 +1,111 @@ +import abc +import threading +import typing + + +class TimerABC(abc.ABC): + """ + An interface extracted from PeriodicTimer so alternative implementations can be used for testing. + """ + + @abc.abstractmethod + def set_callback(self, cb) -> None: + pass + + @abc.abstractmethod + def start(self) -> None: + pass + + @abc.abstractmethod + def poke(self) -> None: + pass + + @abc.abstractmethod + def stop(self) -> None: + pass + + @abc.abstractmethod + def started(self) -> None: + pass + + @abc.abstractmethod + def stopped(self) -> None: + pass + + +class PeriodicTimer(TimerABC): + """ + Executes the passed-in callback on a timer at the specified interval. The callback can be run sooner than the + interval via the poke() method, which also resets the timer. + """ + + def __init__( + self, + interval_sec: int, + callback: typing.Callable[[], None] = lambda: None, + daemon: bool = True, + ): + self._callback = callback + self._interval_sec = interval_sec + self._stop = threading.Event() + self._poke = threading.Event() + self._thread = threading.Thread(target=self._work, daemon=daemon) + + def set_callback(self, callback: typing.Callable[[], None]) -> None: + self._callback = callback + + def start(self) -> None: + self._thread.start() + + def _work(self) -> None: + while True: + self._poke.wait(timeout=self._interval_sec) + if self._stop.is_set(): + break + self._callback() + self._poke.clear() + + def poke(self) -> None: + """ + This method schedules the callback to be executed immediately instead of waiting for the next timeout. It also + resets the timer. + """ + self._poke.set() + + def stop(self) -> None: + self._stop.set() + self.poke() # in case we're waiting for a poke timeout + self._thread.join() + + def started(self) -> bool: + return self._thread.is_alive() + + def stopped(self) -> bool: + return self._stop.is_set() + + +class ThreadlessTimer(TimerABC): + """ + For testing/experimentation. Only executes the callback when you run poke(). + """ + + def __init__(self): + self._cb = lambda: None + + def set_callback(self, cb): + self._cb = cb + + def start(self): + pass + + def poke(self): + self._cb() + + def stop(self): + pass + + def started(self) -> None: + pass + + def stopped(self) -> None: + pass diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/util.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/util.py new file mode 100644 index 00000000000..fe5ba84417b --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/util.py @@ -0,0 +1,25 @@ +import threading +import typing + +from opentelemetry.sdk.trace import ReadableSpan + + +class SpanAccumulator: + """ + Accumulates and batches spans in a thread-safe manner. + """ + + def __init__(self): + self._q = [] + self._lock = threading.Lock() + + def push(self, span: ReadableSpan) -> int: + with self._lock: + self._q.append(span) + return len(self._q) + + def batch(self) -> typing.List[ReadableSpan]: + with self._lock: + out = self._q + self._q = [] + return out diff --git a/opentelemetry-sdk/tests/trace/export/test_experimental.py b/opentelemetry-sdk/tests/trace/export/test_experimental.py new file mode 100644 index 00000000000..41e8eaa156c --- /dev/null +++ b/opentelemetry-sdk/tests/trace/export/test_experimental.py @@ -0,0 +1,166 @@ +import time +import typing +import unittest +from os import environ + +import grpc + +from opentelemetry.sdk.trace import ReadableSpan, _Span +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.sdk.trace.export.experimental.client import RetryingGrpcClient, FakeGrpcClient +from opentelemetry.sdk.trace.export.experimental.exporter import OTLPSpanExporter +from opentelemetry.sdk.trace.export.experimental.processor import BatchSpanProcessor +from opentelemetry.sdk.trace.export.experimental.timer import PeriodicTimer, ThreadlessTimer +from opentelemetry.trace import SpanContext, TraceFlags + + +class TestBatchSpanProcessor(unittest.TestCase): + + def test_export_by_max_batch_size(self): + exporter = FakeSpanExporter() + timer = ThreadlessTimer() + proc = BatchSpanProcessor(exporter=exporter, timer=timer, max_batch_size=2) + num_spans = 16 + for _ in range(num_spans): + create_start_and_end_span('foo', proc) + # we have a batch size of 2, and we've exported 16 spans, so we expect 8 batches + self.assertEqual(8, exporter.count()) + + def test_export_by_timer(self): + exporter = FakeSpanExporter() + timer = ThreadlessTimer() + # we have a batch size of 32, which is larger than the 16 spans that we're planning on sending + proc = BatchSpanProcessor(exporter=exporter, timer=timer, max_batch_size=32) + num_spans = 16 + for i in range(num_spans): + create_start_and_end_span('foo', proc) + self.assertEqual(0, exporter.count()) + # we want this test to be fast, so we don't sleep() -- instead we perform a manual poke() + timer.poke() + self.assertEqual(1, exporter.count()) + + +class TestPeriodicTimer(unittest.TestCase): + + @unittest.skipUnless(environ.get('RUN_LONG_TESTS', '').lower() == 'true', 'Skipping long-running test') + def test_x(self): + t = PeriodicTimer(4) + t.set_callback(lambda: print('callback!')) + t.start() + time.sleep(2) + t.poke() + time.sleep(2) + t.stop() + + +class TestRetryingGrpcClient(unittest.TestCase): + + def test_success(self): + fs = FakeSleeper() + rt = RetryingGrpcClient( + FakeGrpcClient([grpc.StatusCode.OK]), + sleepfunc=fs.sleep + ) + status_code = rt.send([(mk_readable_span())]) + self.assertEqual(grpc.StatusCode.OK, status_code) + self.assertListEqual([], fs.get_sleeps()) + + def test_retry_all_failed(self): + fs = FakeSleeper() + rt = RetryingGrpcClient( + FakeGrpcClient([grpc.StatusCode.UNAVAILABLE]), + sleepfunc=fs.sleep + ) + status_code = rt.send([(mk_readable_span())]) + self.assertEqual(grpc.StatusCode.UNAVAILABLE, status_code) + self.assertListEqual([0.5, 1.0, 2.0, 4.0], fs.get_sleeps()) + + def test_retry_initial_failure_then_success(self): + fs = FakeSleeper() + rt = RetryingGrpcClient( + FakeGrpcClient([grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.OK]), + sleepfunc=fs.sleep + ) + status_code = rt.send([(mk_readable_span())]) + self.assertEqual(grpc.StatusCode.OK, status_code) + self.assertListEqual([0.5], fs.get_sleeps()) + + +class TestOTLPSpanExporter(unittest.TestCase): + + def test_exporter(self): + client = FakeGrpcClient() + exporter = OTLPSpanExporter(client=client) + timer = ThreadlessTimer() + proc = BatchSpanProcessor(exporter, timer=timer, max_batch_size=128) + span = mk_readable_span() + num_spans = 100 # less than the batch size of 128 + for _ in range(num_spans): + proc.on_end(span) + timer.poke() + client.num_batches() + self.assertEqual(1, client.num_batches()) + self.assertEqual(num_spans, client.num_spans_in_batch(0)) + + +class FakeSleeper: + + def __init__(self): + self._sleeps = [] + + def sleep(self, seconds): + self._sleeps.append(seconds) + + def get_sleeps(self): + return self._sleeps + + +class FakeSpanExporter(SpanExporter): + + def __init__(self): + self._exported = [] + + def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: + self._exported.append(spans) + return SpanExportResult.SUCCESS + + def shutdown(self) -> None: + pass + + def force_flush(self, timeout_millis: int = 30000) -> bool: + pass + + def get_exported(self): + return self._exported + + def count(self): + return len(self._exported) + + +# standalone functions + +def mk_readable_span(): + ctx = SpanContext(0, 0, False) + return ReadableSpan(context=ctx, attributes={}) + + +def mk_spans(n): + span = mk_span() + out = [] + for _ in range(n): + out.append(span) + return out + + +def mk_span(): + return _Span(name='my-span', context=mk_ctx()) + + +def create_start_and_end_span(name, span_processor): + span = _Span(name, mk_ctx(), span_processor=span_processor) + span.start() + span.end() + + +def mk_ctx(): + return SpanContext(1, 2, False, trace_flags=TraceFlags(TraceFlags.SAMPLED)) From 20af8c624e058c8a337ad7c7b2abc4ee3296f586 Mon Sep 17 00:00:00 2001 From: Pablo Collins Date: Fri, 13 Oct 2023 09:45:45 -0400 Subject: [PATCH 2/7] append `2` to public class names --- .../sdk/trace/export/experimental/exporter.py | 2 +- .../trace/export/experimental/processor.py | 2 +- .../sdk/trace/export/experimental/timer.py | 9 +- .../tests/trace/export/__init__.py | 13 -- .../tests/trace/export/test_experimental.py | 57 ++------ .../tests/trace/export/test_integration.py | 128 ++++++++++++++++++ opentelemetry-sdk/tests/trace/export/util.py | 29 ++++ 7 files changed, 180 insertions(+), 60 deletions(-) delete mode 100644 opentelemetry-sdk/tests/trace/export/__init__.py create mode 100644 opentelemetry-sdk/tests/trace/export/test_integration.py create mode 100644 opentelemetry-sdk/tests/trace/export/util.py diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/exporter.py index ad424877d41..4a460ef9d60 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/exporter.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/exporter.py @@ -14,7 +14,7 @@ class ExporterFlushResult(Enum): TIMEOUT = 3 -class OTLPSpanExporter(SpanExporter): +class OTLPSpanExporter2(SpanExporter): """ An implementation of SpanExporter. Accepts an optional client. If one is not supplied, creates a retrying client. Sends spans immediately -- has no queue to flush or separate thread to shut down. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py index 204c7a39243..de55226ad57 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py @@ -7,7 +7,7 @@ from opentelemetry.sdk.trace.export.experimental.util import SpanAccumulator -class BatchSpanProcessor(SpanProcessor): +class BatchSpanProcessor2(SpanProcessor): """ A SpanProcessor that sends spans in batches on an interval or when a maximum number of spans has been reached, whichever comes first. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py index 35a5800d56a..a7104b4fd56 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py @@ -45,11 +45,15 @@ def __init__( callback: typing.Callable[[], None] = lambda: None, daemon: bool = True, ): - self._callback = callback self._interval_sec = interval_sec + self._callback = callback + self._daemon = daemon self._stop = threading.Event() self._poke = threading.Event() - self._thread = threading.Thread(target=self._work, daemon=daemon) + self._new_thread() + + def _new_thread(self): + self._thread = threading.Thread(target=self._work, daemon=self._daemon) def set_callback(self, callback: typing.Callable[[], None]) -> None: self._callback = callback @@ -76,6 +80,7 @@ def stop(self) -> None: self._stop.set() self.poke() # in case we're waiting for a poke timeout self._thread.join() + self._new_thread() # in case we want to start it again def started(self) -> bool: return self._thread.is_alive() diff --git a/opentelemetry-sdk/tests/trace/export/__init__.py b/opentelemetry-sdk/tests/trace/export/__init__.py deleted file mode 100644 index b0a6f428417..00000000000 --- a/opentelemetry-sdk/tests/trace/export/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/opentelemetry-sdk/tests/trace/export/test_experimental.py b/opentelemetry-sdk/tests/trace/export/test_experimental.py index 41e8eaa156c..0d3ba8dd8ef 100644 --- a/opentelemetry-sdk/tests/trace/export/test_experimental.py +++ b/opentelemetry-sdk/tests/trace/export/test_experimental.py @@ -5,13 +5,13 @@ import grpc -from opentelemetry.sdk.trace import ReadableSpan, _Span +import util +from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from opentelemetry.sdk.trace.export.experimental.client import RetryingGrpcClient, FakeGrpcClient -from opentelemetry.sdk.trace.export.experimental.exporter import OTLPSpanExporter -from opentelemetry.sdk.trace.export.experimental.processor import BatchSpanProcessor +from opentelemetry.sdk.trace.export.experimental.exporter import OTLPSpanExporter2 +from opentelemetry.sdk.trace.export.experimental.processor import BatchSpanProcessor2 from opentelemetry.sdk.trace.export.experimental.timer import PeriodicTimer, ThreadlessTimer -from opentelemetry.trace import SpanContext, TraceFlags class TestBatchSpanProcessor(unittest.TestCase): @@ -19,10 +19,10 @@ class TestBatchSpanProcessor(unittest.TestCase): def test_export_by_max_batch_size(self): exporter = FakeSpanExporter() timer = ThreadlessTimer() - proc = BatchSpanProcessor(exporter=exporter, timer=timer, max_batch_size=2) + proc = BatchSpanProcessor2(exporter=exporter, timer=timer, max_batch_size=2) num_spans = 16 for _ in range(num_spans): - create_start_and_end_span('foo', proc) + util.create_start_and_end_span('foo', proc) # we have a batch size of 2, and we've exported 16 spans, so we expect 8 batches self.assertEqual(8, exporter.count()) @@ -30,10 +30,10 @@ def test_export_by_timer(self): exporter = FakeSpanExporter() timer = ThreadlessTimer() # we have a batch size of 32, which is larger than the 16 spans that we're planning on sending - proc = BatchSpanProcessor(exporter=exporter, timer=timer, max_batch_size=32) + proc = BatchSpanProcessor2(exporter=exporter, timer=timer, max_batch_size=32) num_spans = 16 for i in range(num_spans): - create_start_and_end_span('foo', proc) + util.create_start_and_end_span('foo', proc) self.assertEqual(0, exporter.count()) # we want this test to be fast, so we don't sleep() -- instead we perform a manual poke() timer.poke() @@ -61,7 +61,7 @@ def test_success(self): FakeGrpcClient([grpc.StatusCode.OK]), sleepfunc=fs.sleep ) - status_code = rt.send([(mk_readable_span())]) + status_code = rt.send([(util.mk_readable_span())]) self.assertEqual(grpc.StatusCode.OK, status_code) self.assertListEqual([], fs.get_sleeps()) @@ -71,7 +71,7 @@ def test_retry_all_failed(self): FakeGrpcClient([grpc.StatusCode.UNAVAILABLE]), sleepfunc=fs.sleep ) - status_code = rt.send([(mk_readable_span())]) + status_code = rt.send([(util.mk_readable_span())]) self.assertEqual(grpc.StatusCode.UNAVAILABLE, status_code) self.assertListEqual([0.5, 1.0, 2.0, 4.0], fs.get_sleeps()) @@ -81,7 +81,7 @@ def test_retry_initial_failure_then_success(self): FakeGrpcClient([grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.OK]), sleepfunc=fs.sleep ) - status_code = rt.send([(mk_readable_span())]) + status_code = rt.send([(util.mk_readable_span())]) self.assertEqual(grpc.StatusCode.OK, status_code) self.assertListEqual([0.5], fs.get_sleeps()) @@ -90,10 +90,10 @@ class TestOTLPSpanExporter(unittest.TestCase): def test_exporter(self): client = FakeGrpcClient() - exporter = OTLPSpanExporter(client=client) + exporter = OTLPSpanExporter2(client=client) timer = ThreadlessTimer() - proc = BatchSpanProcessor(exporter, timer=timer, max_batch_size=128) - span = mk_readable_span() + proc = BatchSpanProcessor2(exporter, timer=timer, max_batch_size=128) + span = util.mk_readable_span() num_spans = 100 # less than the batch size of 128 for _ in range(num_spans): proc.on_end(span) @@ -135,32 +135,3 @@ def get_exported(self): def count(self): return len(self._exported) - - -# standalone functions - -def mk_readable_span(): - ctx = SpanContext(0, 0, False) - return ReadableSpan(context=ctx, attributes={}) - - -def mk_spans(n): - span = mk_span() - out = [] - for _ in range(n): - out.append(span) - return out - - -def mk_span(): - return _Span(name='my-span', context=mk_ctx()) - - -def create_start_and_end_span(name, span_processor): - span = _Span(name, mk_ctx(), span_processor=span_processor) - span.start() - span.end() - - -def mk_ctx(): - return SpanContext(1, 2, False, trace_flags=TraceFlags(TraceFlags.SAMPLED)) diff --git a/opentelemetry-sdk/tests/trace/export/test_integration.py b/opentelemetry-sdk/tests/trace/export/test_integration.py new file mode 100644 index 00000000000..1bbd2e3c541 --- /dev/null +++ b/opentelemetry-sdk/tests/trace/export/test_integration.py @@ -0,0 +1,128 @@ +import time +import unittest +from concurrent import futures +from os import environ + +import grpc + +import util +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.proto.collector.logs.v1 import logs_service_pb2, logs_service_pb2_grpc +from opentelemetry.proto.collector.metrics.v1 import metrics_service_pb2, metrics_service_pb2_grpc +from opentelemetry.proto.collector.trace.v1 import trace_service_pb2, trace_service_pb2_grpc +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.trace.export.experimental.exporter import OTLPSpanExporter2 +from opentelemetry.sdk.trace.export.experimental.processor import BatchSpanProcessor2 + + +@unittest.skipUnless(environ.get('RUN_LONG_TESTS', '').lower() == 'true', 'Skipping long-running test') +class TestIntegration(unittest.TestCase): + + def test_bsp2(self): + server = OTLPServer() + server.start() + bsp = BatchSpanProcessor2(OTLPSpanExporter2()) + num_spans_sent = 100000 + start = time.time() + span = util.mk_span('test-span') + for i in range(num_spans_sent): + bsp.on_end(span) + end = time.time() + elapsed = end - start + print() + print(f'new: elapsed: {elapsed}') + bsp.force_flush() + num_span_received = server.get_num_spans_received() + self.assertEqual(num_spans_sent, num_span_received) + server.stop() + + @unittest.SkipTest + def test_bsp(self): + server = OTLPServer() + server.start() + bsp = BatchSpanProcessor(OTLPSpanExporter()) + num_spans_sent = 10000 + start = time.time() + span = util.mk_span('test-span') + for i in range(num_spans_sent): + bsp.on_end(span) + end = time.time() + elapsed = end - start + print() + print(f'old: elapsed: {elapsed}') + bsp.force_flush() + while num_spans_sent > server.get_num_spans_received(): + time.sleep(0.1) + server.stop() + + +class OTLPServer: + + def __init__(self): + self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + + self.trace_servicer = TraceServiceServicer() + trace_service_pb2_grpc.add_TraceServiceServicer_to_server(self.trace_servicer, self.server) + + metrics_servicer = MetricsServiceServicer() + metrics_service_pb2_grpc.add_MetricsServiceServicer_to_server(metrics_servicer, self.server) + + logs_servicer = LogsServiceServicer() + logs_service_pb2_grpc.add_LogsServiceServicer_to_server(logs_servicer, self.server) + + self.server.add_insecure_port('0.0.0.0:4317') + + def start(self): + self.server.start() + + def stop(self): + self.server.stop(0) + + def get_num_spans_received(self): + return self.trace_servicer.get_num_spans() + + +def serve_otel_grpc(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + trace_service_pb2_grpc.add_TraceServiceServicer_to_server(TraceServiceServicer(), server) + metrics_service_pb2_grpc.add_MetricsServiceServicer_to_server(MetricsServiceServicer(), server) + logs_service_pb2_grpc.add_LogsServiceServicer_to_server(LogsServiceServicer(), server) + server.add_insecure_port('0.0.0.0:4317') + server.start() + return server + + +class LogsServiceServicer(logs_service_pb2_grpc.LogsServiceServicer): + + def __init__(self): + self.requests_received = [] + + def Export(self, request, context): + self.requests_received.append(request) + return logs_service_pb2.ExportLogsServiceResponse() + + +class TraceServiceServicer(trace_service_pb2_grpc.TraceServiceServicer): + + def __init__(self): + self.requests_received = [] + + def Export(self, request, context): + self.requests_received.append(request) + return trace_service_pb2.ExportTraceServiceResponse() + + def get_num_spans(self): + out = 0 + for req in self.requests_received: + out += len(req.resource_spans[0].scope_spans[0].spans) + return out + + +class MetricsServiceServicer(metrics_service_pb2_grpc.MetricsServiceServicer): + + def __init__(self): + self.requests_received = [] + + def Export(self, request, context): + self.requests_received.append(request) + return metrics_service_pb2.ExportMetricsServiceResponse() diff --git a/opentelemetry-sdk/tests/trace/export/util.py b/opentelemetry-sdk/tests/trace/export/util.py new file mode 100644 index 00000000000..784cd1a2d1f --- /dev/null +++ b/opentelemetry-sdk/tests/trace/export/util.py @@ -0,0 +1,29 @@ +from opentelemetry.sdk.trace import ReadableSpan, _Span +from opentelemetry.trace import SpanContext, TraceFlags + + +def mk_readable_span(): + ctx = SpanContext(0, 0, False) + return ReadableSpan(context=ctx, attributes={}) + + +def mk_spans(n): + span = mk_span('foo') + out = [] + for _ in range(n): + out.append(span) + return out + + +def create_start_and_end_span(name, span_processor): + span = _Span(name, mk_ctx(), span_processor=span_processor) + span.start() + span.end() + + +def mk_span(name): + return _Span(name=name, context=mk_ctx()) + + +def mk_ctx(): + return SpanContext(1, 2, False, trace_flags=TraceFlags(TraceFlags.SAMPLED)) From a00fc2c0ceda565b408edbf67ea46efbb9d78a95 Mon Sep 17 00:00:00 2001 From: Pablo Collins Date: Fri, 13 Oct 2023 17:42:41 -0400 Subject: [PATCH 3/7] add a batches deque to SpanAccumulator --- .../trace/export/experimental/accumulator.py | 39 +++++++++++++++++++ .../trace/export/experimental/processor.py | 34 +++++++++------- .../sdk/trace/export/experimental/timer.py | 1 - .../sdk/trace/export/experimental/util.py | 25 ------------ .../tests/trace/export/test_experimental.py | 17 +------- .../tests/trace/export/test_integration.py | 2 +- 6 files changed, 60 insertions(+), 58 deletions(-) create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py delete mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/util.py diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py new file mode 100644 index 00000000000..1e69eb0c8ce --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py @@ -0,0 +1,39 @@ +import collections +import threading +import typing + +from opentelemetry.sdk.trace import ReadableSpan + + +class SpanAccumulator: + """ + Accumulates and batches spans in a thread-safe manner. + """ + + def __init__(self, max_len: int): + self._max_len = max_len + self._spans: typing.List[ReadableSpan] = [] + self._batches = collections.deque() # fixme set max size? + self._lock = threading.Lock() + + def nonempty(self): + return len(self._spans) > 0 or len(self._batches) > 0 + + def push(self, span: ReadableSpan) -> bool: + with self._lock: + self._spans.append(span) + if len(self._spans) < self._max_len: + return False + self._batches.appendleft(self._spans) + self._spans = [] + return True + + def batch(self) -> typing.List[ReadableSpan]: + try: + return self._batches.pop() + except IndexError: + # if there are no batches, batch the current spans + with self._lock: + out = self._spans + self._spans = [] + return out diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py index de55226ad57..e8ccfb8c973 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py @@ -3,8 +3,8 @@ from opentelemetry.context import Context from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, Span from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.sdk.trace.export.experimental.accumulator import SpanAccumulator from opentelemetry.sdk.trace.export.experimental.timer import TimerABC, PeriodicTimer -from opentelemetry.sdk.trace.export.experimental.util import SpanAccumulator class BatchSpanProcessor2(SpanProcessor): @@ -21,10 +21,9 @@ def __init__( timer: typing.Optional[TimerABC] = None, ): self._exporter = exporter - self._max_batch_size = max_batch_size - self._accumulator = SpanAccumulator() + self._accumulator = SpanAccumulator(max_batch_size) self._timer = timer or PeriodicTimer(interval_sec) - self._timer.set_callback(self._export) + self._timer.set_callback(self._export_batch) self._timer.start() def on_start(self, span: Span, parent_context: typing.Optional[Context] = None) -> None: @@ -35,8 +34,8 @@ def on_end(self, span: ReadableSpan) -> None: This method must be extremely fast. It adds the span to the accumulator for later sending and pokes the timer if the number of spans waiting to be sent has reached the maximum batch size. """ - size = self._accumulator.push(span) - if size >= self._max_batch_size: + full = self._accumulator.push(span) + if full: self._timer.poke() def force_flush(self, timeout_millis: int = 30000) -> bool: @@ -44,20 +43,25 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: Stops the timer, exports any spans in the accumulator then restarts the timer. """ self._timer.stop() - self._exporter.force_flush(timeout_millis) - result = self._export() + self._exporter.force_flush(timeout_millis) # this may be a no-op depending on the impl + while self._accumulator.nonempty(): + result = self._export_batch() + if result != SpanExportResult.SUCCESS: + return False self._timer.start() - return result == SpanExportResult.SUCCESS + def shutdown(self) -> None: self._timer.stop() - self._export() + while self._accumulator.nonempty(): + self._export_batch() self._exporter.shutdown() - def _export(self) -> SpanExportResult: + def _export_batch(self) -> SpanExportResult: + """ + This is the timer's callback. It runs on its own thread. + """ batch = self._accumulator.batch() if len(batch) > 0: - out = self._exporter.export(batch) - else: - out = SpanExportResult.SUCCESS - return out + return self._exporter.export(batch) + return SpanExportResult.SUCCESS diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py index a7104b4fd56..3632507c2e7 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py @@ -67,7 +67,6 @@ def _work(self) -> None: if self._stop.is_set(): break self._callback() - self._poke.clear() def poke(self) -> None: """ diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/util.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/util.py deleted file mode 100644 index fe5ba84417b..00000000000 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/util.py +++ /dev/null @@ -1,25 +0,0 @@ -import threading -import typing - -from opentelemetry.sdk.trace import ReadableSpan - - -class SpanAccumulator: - """ - Accumulates and batches spans in a thread-safe manner. - """ - - def __init__(self): - self._q = [] - self._lock = threading.Lock() - - def push(self, span: ReadableSpan) -> int: - with self._lock: - self._q.append(span) - return len(self._q) - - def batch(self) -> typing.List[ReadableSpan]: - with self._lock: - out = self._q - self._q = [] - return out diff --git a/opentelemetry-sdk/tests/trace/export/test_experimental.py b/opentelemetry-sdk/tests/trace/export/test_experimental.py index 0d3ba8dd8ef..885fa28d3ef 100644 --- a/opentelemetry-sdk/tests/trace/export/test_experimental.py +++ b/opentelemetry-sdk/tests/trace/export/test_experimental.py @@ -1,7 +1,5 @@ -import time import typing import unittest -from os import environ import grpc @@ -11,7 +9,7 @@ from opentelemetry.sdk.trace.export.experimental.client import RetryingGrpcClient, FakeGrpcClient from opentelemetry.sdk.trace.export.experimental.exporter import OTLPSpanExporter2 from opentelemetry.sdk.trace.export.experimental.processor import BatchSpanProcessor2 -from opentelemetry.sdk.trace.export.experimental.timer import PeriodicTimer, ThreadlessTimer +from opentelemetry.sdk.trace.export.experimental.timer import ThreadlessTimer class TestBatchSpanProcessor(unittest.TestCase): @@ -40,19 +38,6 @@ def test_export_by_timer(self): self.assertEqual(1, exporter.count()) -class TestPeriodicTimer(unittest.TestCase): - - @unittest.skipUnless(environ.get('RUN_LONG_TESTS', '').lower() == 'true', 'Skipping long-running test') - def test_x(self): - t = PeriodicTimer(4) - t.set_callback(lambda: print('callback!')) - t.start() - time.sleep(2) - t.poke() - time.sleep(2) - t.stop() - - class TestRetryingGrpcClient(unittest.TestCase): def test_success(self): diff --git a/opentelemetry-sdk/tests/trace/export/test_integration.py b/opentelemetry-sdk/tests/trace/export/test_integration.py index 1bbd2e3c541..9e84c19dedf 100644 --- a/opentelemetry-sdk/tests/trace/export/test_integration.py +++ b/opentelemetry-sdk/tests/trace/export/test_integration.py @@ -22,7 +22,7 @@ def test_bsp2(self): server = OTLPServer() server.start() bsp = BatchSpanProcessor2(OTLPSpanExporter2()) - num_spans_sent = 100000 + num_spans_sent = 10000 start = time.time() span = util.mk_span('test-span') for i in range(num_spans_sent): From 152ed959decddd108675786d90fe3405ccf7bae6 Mon Sep 17 00:00:00 2001 From: Pablo Collins Date: Tue, 17 Oct 2023 17:31:41 -0400 Subject: [PATCH 4/7] add additional timer implementations --- .../trace/export/experimental/accumulator.py | 2 +- .../trace/export/experimental/processor.py | 1 - .../sdk/trace/export/experimental/timer.py | 84 ++++++++++++-- .../tests/trace/export/test_experimental.py | 3 +- .../tests/trace/export/test_integration.py | 109 +++++++++++++----- 5 files changed, 159 insertions(+), 40 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py index 1e69eb0c8ce..d814fe2fc2c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py @@ -32,7 +32,7 @@ def batch(self) -> typing.List[ReadableSpan]: try: return self._batches.pop() except IndexError: - # if there are no batches, batch the current spans + # if there are no batches left, return the current spans with self._lock: out = self._spans self._spans = [] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py index e8ccfb8c973..89becdbed59 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py @@ -50,7 +50,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: return False self._timer.start() - def shutdown(self) -> None: self._timer.stop() while self._accumulator.nonempty(): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py index 3632507c2e7..bd0fc43e204 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py @@ -2,6 +2,8 @@ import threading import typing +from icecream import icecream + class TimerABC(abc.ABC): """ @@ -24,14 +26,6 @@ def poke(self) -> None: def stop(self) -> None: pass - @abc.abstractmethod - def started(self) -> None: - pass - - @abc.abstractmethod - def stopped(self) -> None: - pass - class PeriodicTimer(TimerABC): """ @@ -88,6 +82,80 @@ def stopped(self) -> bool: return self._stop.is_set() +class IntervalTimer(TimerABC): + + def __init__(self, interval_sec): + self._thread = threading.Thread(target=self._work) + self._interval_sec = interval_sec + self._cb = lambda: None + self._poke = threading.Event() + self._stop = threading.Event() + + def set_callback(self, cb) -> None: + self._cb = cb + + def start(self) -> None: + self._thread.start() + + def _work(self): + while True: + self._poke.wait(self._interval_sec) + if self._stop.is_set(): + return + self._poke.clear() + self._cb() + + def poke(self) -> None: + self._poke.set() + + def stop(self) -> None: + self._stop.set() + + def started(self) -> bool: + pass + + def stopped(self) -> bool: + pass + + +class ThreadingTimer(TimerABC): + + def __init__(self, interval_sec: int): + self.interval_sec = interval_sec + self.cb = lambda: None + self.timer = None + self.lock = threading.Lock() + + def set_callback(self, cb) -> None: + with self.lock: + self.cb = cb + + def start(self) -> None: + with self.lock: + self.timer = threading.Timer(self.interval_sec, self._work) + self.timer.start() + + def _work(self): + self.cb() + self.start() + + def poke(self) -> None: + with self.lock: + self._do_stop() + # start a new thread from a thread that's just about to be shut down + threading.Thread(target=self._work, daemon=True).start() + + def stop(self) -> None: + with self.lock: + self._do_stop() + + def _do_stop(self): + if self.timer is None: + return + self.timer.cancel() + self.timer = None + + class ThreadlessTimer(TimerABC): """ For testing/experimentation. Only executes the callback when you run poke(). diff --git a/opentelemetry-sdk/tests/trace/export/test_experimental.py b/opentelemetry-sdk/tests/trace/export/test_experimental.py index 885fa28d3ef..9124c17ba5f 100644 --- a/opentelemetry-sdk/tests/trace/export/test_experimental.py +++ b/opentelemetry-sdk/tests/trace/export/test_experimental.py @@ -1,3 +1,4 @@ +import time import typing import unittest @@ -9,7 +10,7 @@ from opentelemetry.sdk.trace.export.experimental.client import RetryingGrpcClient, FakeGrpcClient from opentelemetry.sdk.trace.export.experimental.exporter import OTLPSpanExporter2 from opentelemetry.sdk.trace.export.experimental.processor import BatchSpanProcessor2 -from opentelemetry.sdk.trace.export.experimental.timer import ThreadlessTimer +from opentelemetry.sdk.trace.export.experimental.timer import ThreadlessTimer, ThreadingTimer class TestBatchSpanProcessor(unittest.TestCase): diff --git a/opentelemetry-sdk/tests/trace/export/test_integration.py b/opentelemetry-sdk/tests/trace/export/test_integration.py index 9e84c19dedf..343fb3bce73 100644 --- a/opentelemetry-sdk/tests/trace/export/test_integration.py +++ b/opentelemetry-sdk/tests/trace/export/test_integration.py @@ -1,3 +1,4 @@ +import threading import time import unittest from concurrent import futures @@ -6,54 +7,104 @@ import grpc import util -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.proto.collector.logs.v1 import logs_service_pb2, logs_service_pb2_grpc from opentelemetry.proto.collector.metrics.v1 import metrics_service_pb2, metrics_service_pb2_grpc from opentelemetry.proto.collector.trace.v1 import trace_service_pb2, trace_service_pb2_grpc -from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.trace import SpanProcessor from opentelemetry.sdk.trace.export.experimental.exporter import OTLPSpanExporter2 from opentelemetry.sdk.trace.export.experimental.processor import BatchSpanProcessor2 +from opentelemetry.sdk.trace.export.experimental.timer import ThreadingTimer, PeriodicTimer, ThreadlessTimer -@unittest.skipUnless(environ.get('RUN_LONG_TESTS', '').lower() == 'true', 'Skipping long-running test') +@unittest.skipUnless(environ.get('RUN_LONG_TESTS', '').lower() == 'true', 'Skipping, RUN_LONG_TESTS not set') class TestIntegration(unittest.TestCase): - def test_bsp2(self): + def test_full_speed(self): server = OTLPServer() server.start() - bsp = BatchSpanProcessor2(OTLPSpanExporter2()) - num_spans_sent = 10000 + max_interval_sec = 4 + + # timer = ThreadingTimer(max_interval_sec) + timer = PeriodicTimer(max_interval_sec) + + bsp = BatchSpanProcessor2(OTLPSpanExporter2(), timer=timer) + num_spans_per_firehose = 1_000 + sf = SpanFirehose(bsp, num_spans=num_spans_per_firehose, sleep_sec=0) + start = time.time() - span = util.mk_span('test-span') - for i in range(num_spans_sent): - bsp.on_end(span) - end = time.time() - elapsed = end - start - print() - print(f'new: elapsed: {elapsed}') - bsp.force_flush() + + threads = [] + num_threads = 128 + for _ in range(num_threads): + thread = threading.Thread(target=sf.run) + thread.start() + threads.append(thread) + + checkpoint = time.time() + + for thread in threads: + thread.join() + + joined = time.time() + + # ThreadingTimer: checkpoint: 0.72, joined: 5.89 + # PeriodicTimer: checkpoint: 0.8266980648040771, joined: 4.759222030639648 + print(f'checkpoint: {checkpoint - start}, joined: {joined - start}') + + time.sleep(max_interval_sec * 2) + num_span_received = server.get_num_spans_received() - self.assertEqual(num_spans_sent, num_span_received) + self.assertEqual(num_spans_per_firehose * num_threads, num_span_received) server.stop() - @unittest.SkipTest - def test_bsp(self): + def test_slower(self): server = OTLPServer() server.start() - bsp = BatchSpanProcessor(OTLPSpanExporter()) - num_spans_sent = 10000 + max_interval_sec = 4 + bsp = BatchSpanProcessor2(OTLPSpanExporter2(), timer=ThreadingTimer(max_interval_sec)) + num_spans_per_firehose = 1000 + sf = SpanFirehose(bsp, num_spans=num_spans_per_firehose, sleep_sec=0.01) + threads = [] + num_threads = 128 + for _ in range(num_threads): + thread = threading.Thread(target=sf.run) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + time.sleep(max_interval_sec * 2) + num_span_received = server.get_num_spans_received() + self.assertEqual(num_spans_per_firehose * num_threads, num_span_received) + server.stop() + + def test_slow_enough_to_engage_timer(self): + server = OTLPServer() + server.start() + bsp = BatchSpanProcessor2(OTLPSpanExporter2()) + num_spans = 10 + sf = SpanFirehose(bsp, num_spans=num_spans, sleep_sec=1) + sf.run() + time.sleep(5) + # bsp.force_flush() + num_span_received = server.get_num_spans_received() + self.assertEqual(num_spans, num_span_received) + server.stop() + + +class SpanFirehose: + + def __init__(self, sp: SpanProcessor, num_spans: int, sleep_sec: float): + self._sp = sp + self._num_spans = num_spans + self._sleep_sec = sleep_sec + + def run(self) -> float: start = time.time() span = util.mk_span('test-span') - for i in range(num_spans_sent): - bsp.on_end(span) - end = time.time() - elapsed = end - start - print() - print(f'old: elapsed: {elapsed}') - bsp.force_flush() - while num_spans_sent > server.get_num_spans_received(): - time.sleep(0.1) - server.stop() + for _ in range(self._num_spans): + time.sleep(self._sleep_sec) + self._sp.on_end(span) + return time.time() - start class OTLPServer: From f54bdd86baa7269adf43d8f040db1635778aa013 Mon Sep 17 00:00:00 2001 From: Pablo Collins Date: Wed, 18 Oct 2023 15:41:20 -0400 Subject: [PATCH 5/7] tweak default values --- .../opentelemetry/sdk/trace/export/experimental/processor.py | 4 ++-- opentelemetry-sdk/tests/trace/export/test_experimental.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py index 89becdbed59..6bf5ab56c12 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py @@ -16,8 +16,8 @@ class BatchSpanProcessor2(SpanProcessor): def __init__( self, exporter: SpanExporter, - max_batch_size: int = 1024, - interval_sec: int = 4, + max_batch_size: int = 512, + interval_sec: int = 5, timer: typing.Optional[TimerABC] = None, ): self._exporter = exporter diff --git a/opentelemetry-sdk/tests/trace/export/test_experimental.py b/opentelemetry-sdk/tests/trace/export/test_experimental.py index 9124c17ba5f..5e6db6fd81b 100644 --- a/opentelemetry-sdk/tests/trace/export/test_experimental.py +++ b/opentelemetry-sdk/tests/trace/export/test_experimental.py @@ -6,7 +6,7 @@ import util from opentelemetry.sdk.trace import ReadableSpan -from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult, BatchSpanProcessor from opentelemetry.sdk.trace.export.experimental.client import RetryingGrpcClient, FakeGrpcClient from opentelemetry.sdk.trace.export.experimental.exporter import OTLPSpanExporter2 from opentelemetry.sdk.trace.export.experimental.processor import BatchSpanProcessor2 From 97db14769cd367b9e4e6441824389f04ce00c21b Mon Sep 17 00:00:00 2001 From: Pablo Collins Date: Wed, 18 Oct 2023 18:47:54 -0400 Subject: [PATCH 6/7] add comments, make ThreadingTimer the default, fix tests --- .../trace/export/experimental/accumulator.py | 30 +++-- .../trace/export/experimental/processor.py | 4 +- .../sdk/trace/export/experimental/timer.py | 116 ++++++------------ .../tests/trace/export/test_integration.py | 16 +-- 4 files changed, 65 insertions(+), 101 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py index d814fe2fc2c..bd25103d973 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py @@ -7,28 +7,44 @@ class SpanAccumulator: """ - Accumulates and batches spans in a thread-safe manner. + A thread-safe container designed to collect and batch spans. It accumulates spans until a specified batch size is + reached, at which point the accumulated spans are moved into a FIFO queue. Provides methods to add spans, check if + the accumulator is non-empty, and retrieve the earliest batch of spans from the queue. """ - def __init__(self, max_len: int): - self._max_len = max_len + def __init__(self, batch_size: int): + self._batch_size = batch_size self._spans: typing.List[ReadableSpan] = [] - self._batches = collections.deque() # fixme set max size? + self._batches = collections.deque() # fixme set max size self._lock = threading.Lock() - def nonempty(self): - return len(self._spans) > 0 or len(self._batches) > 0 + def nonempty(self) -> bool: + """ + Checks if the accumulator contains any spans or batches. It returns True if either the span list or the batch + queue is non-empty, and False otherwise. + """ + with self._lock: + return len(self._spans) > 0 or len(self._batches) > 0 def push(self, span: ReadableSpan) -> bool: + """ + Adds a span to the accumulator. If the addition causes the number of spans to reach the + specified batch size, the accumulated spans are moved into a FIFO queue as a new batch. Returns True if a new + batch was created, otherwise returns False. + """ with self._lock: self._spans.append(span) - if len(self._spans) < self._max_len: + if len(self._spans) < self._batch_size: return False self._batches.appendleft(self._spans) self._spans = [] return True def batch(self) -> typing.List[ReadableSpan]: + """ + Returns the earliest (first in line) batch of spans from the FIFO queue. If the queue is empty, returns any + remaining spans that haven't been batched. + """ try: return self._batches.pop() except IndexError: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py index 6bf5ab56c12..d2db600ece4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py @@ -4,7 +4,7 @@ from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, Span from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from opentelemetry.sdk.trace.export.experimental.accumulator import SpanAccumulator -from opentelemetry.sdk.trace.export.experimental.timer import TimerABC, PeriodicTimer +from opentelemetry.sdk.trace.export.experimental.timer import TimerABC, PeriodicTimer, ThreadingTimer class BatchSpanProcessor2(SpanProcessor): @@ -22,7 +22,7 @@ def __init__( ): self._exporter = exporter self._accumulator = SpanAccumulator(max_batch_size) - self._timer = timer or PeriodicTimer(interval_sec) + self._timer = timer or ThreadingTimer(interval_sec) self._timer.set_callback(self._export_batch) self._timer.start() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py index bd0fc43e204..f32419c1d1e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py @@ -2,8 +2,6 @@ import threading import typing -from icecream import icecream - class TimerABC(abc.ABC): """ @@ -27,9 +25,47 @@ def stop(self) -> None: pass +class ThreadingTimer(TimerABC): + + def __init__(self, interval_sec: int): + self.interval_sec = interval_sec + self.cb = lambda: None + self.timer = None + self.lock = threading.Lock() + + def set_callback(self, cb) -> None: + with self.lock: + self.cb = cb + + def start(self) -> None: + with self.lock: + self.timer = threading.Timer(self.interval_sec, self._work) + self.timer.daemon = True + self.timer.start() + + def _work(self): + self.cb() + self.start() + + def poke(self) -> None: + with self.lock: + self._do_stop() + threading.Thread(target=self._work, daemon=True).start() + + def stop(self) -> None: + with self.lock: + self._do_stop() + + def _do_stop(self): + if self.timer is None: + return + self.timer.cancel() + self.timer = None + + class PeriodicTimer(TimerABC): """ - Executes the passed-in callback on a timer at the specified interval. The callback can be run sooner than the + DEPRECATED. Executes the passed-in callback on a timer at the specified interval. The callback can be run sooner than the interval via the poke() method, which also resets the timer. """ @@ -82,80 +118,6 @@ def stopped(self) -> bool: return self._stop.is_set() -class IntervalTimer(TimerABC): - - def __init__(self, interval_sec): - self._thread = threading.Thread(target=self._work) - self._interval_sec = interval_sec - self._cb = lambda: None - self._poke = threading.Event() - self._stop = threading.Event() - - def set_callback(self, cb) -> None: - self._cb = cb - - def start(self) -> None: - self._thread.start() - - def _work(self): - while True: - self._poke.wait(self._interval_sec) - if self._stop.is_set(): - return - self._poke.clear() - self._cb() - - def poke(self) -> None: - self._poke.set() - - def stop(self) -> None: - self._stop.set() - - def started(self) -> bool: - pass - - def stopped(self) -> bool: - pass - - -class ThreadingTimer(TimerABC): - - def __init__(self, interval_sec: int): - self.interval_sec = interval_sec - self.cb = lambda: None - self.timer = None - self.lock = threading.Lock() - - def set_callback(self, cb) -> None: - with self.lock: - self.cb = cb - - def start(self) -> None: - with self.lock: - self.timer = threading.Timer(self.interval_sec, self._work) - self.timer.start() - - def _work(self): - self.cb() - self.start() - - def poke(self) -> None: - with self.lock: - self._do_stop() - # start a new thread from a thread that's just about to be shut down - threading.Thread(target=self._work, daemon=True).start() - - def stop(self) -> None: - with self.lock: - self._do_stop() - - def _do_stop(self): - if self.timer is None: - return - self.timer.cancel() - self.timer = None - - class ThreadlessTimer(TimerABC): """ For testing/experimentation. Only executes the callback when you run poke(). diff --git a/opentelemetry-sdk/tests/trace/export/test_integration.py b/opentelemetry-sdk/tests/trace/export/test_integration.py index 343fb3bce73..facc78e8895 100644 --- a/opentelemetry-sdk/tests/trace/export/test_integration.py +++ b/opentelemetry-sdk/tests/trace/export/test_integration.py @@ -24,15 +24,10 @@ def test_full_speed(self): server.start() max_interval_sec = 4 - # timer = ThreadingTimer(max_interval_sec) - timer = PeriodicTimer(max_interval_sec) - - bsp = BatchSpanProcessor2(OTLPSpanExporter2(), timer=timer) + bsp = BatchSpanProcessor2(OTLPSpanExporter2()) num_spans_per_firehose = 1_000 sf = SpanFirehose(bsp, num_spans=num_spans_per_firehose, sleep_sec=0) - start = time.time() - threads = [] num_threads = 128 for _ in range(num_threads): @@ -40,17 +35,9 @@ def test_full_speed(self): thread.start() threads.append(thread) - checkpoint = time.time() - for thread in threads: thread.join() - joined = time.time() - - # ThreadingTimer: checkpoint: 0.72, joined: 5.89 - # PeriodicTimer: checkpoint: 0.8266980648040771, joined: 4.759222030639648 - print(f'checkpoint: {checkpoint - start}, joined: {joined - start}') - time.sleep(max_interval_sec * 2) num_span_received = server.get_num_spans_received() @@ -85,7 +72,6 @@ def test_slow_enough_to_engage_timer(self): sf = SpanFirehose(bsp, num_spans=num_spans, sleep_sec=1) sf.run() time.sleep(5) - # bsp.force_flush() num_span_received = server.get_num_spans_received() self.assertEqual(num_spans, num_span_received) server.stop() From 37b9801fa9b316fcb0963aa95cf1f4e488fe7e78 Mon Sep 17 00:00:00 2001 From: Pablo Collins Date: Wed, 25 Oct 2023 16:47:10 -0400 Subject: [PATCH 7/7] rename timers --- .../sdk/trace/export/experimental/processor.py | 4 ++-- .../sdk/trace/export/experimental/timer.py | 16 +++++++++++----- .../tests/trace/export/test_experimental.py | 2 +- .../tests/trace/export/test_integration.py | 4 ++-- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py index d2db600ece4..40680d8ee61 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py @@ -4,7 +4,7 @@ from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, Span from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from opentelemetry.sdk.trace.export.experimental.accumulator import SpanAccumulator -from opentelemetry.sdk.trace.export.experimental.timer import TimerABC, PeriodicTimer, ThreadingTimer +from opentelemetry.sdk.trace.export.experimental.timer import TimerABC, EventBasedTimer, ThreadBasedTimer class BatchSpanProcessor2(SpanProcessor): @@ -22,7 +22,7 @@ def __init__( ): self._exporter = exporter self._accumulator = SpanAccumulator(max_batch_size) - self._timer = timer or ThreadingTimer(interval_sec) + self._timer = timer or ThreadBasedTimer(interval_sec) self._timer.set_callback(self._export_batch) self._timer.start() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py index f32419c1d1e..eb8f769fb86 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py @@ -6,6 +6,9 @@ class TimerABC(abc.ABC): """ An interface extracted from PeriodicTimer so alternative implementations can be used for testing. + + Implementations should execute the passed-in callback on a timer at the specified interval at a minimum. The + callback can be run sooner than the interval via the poke() method, which also resets the timer. """ @abc.abstractmethod @@ -25,7 +28,11 @@ def stop(self) -> None: pass -class ThreadingTimer(TimerABC): +class ThreadBasedTimer(TimerABC): + """ + A Timer implementation that uses a threading.Timer for each interval and runs the callback asynchronously using a + new Thread on poke(). + """ def __init__(self, interval_sec: int): self.interval_sec = interval_sec @@ -63,10 +70,9 @@ def _do_stop(self): self.timer = None -class PeriodicTimer(TimerABC): +class EventBasedTimer(TimerABC): """ - DEPRECATED. Executes the passed-in callback on a timer at the specified interval. The callback can be run sooner than the - interval via the poke() method, which also resets the timer. + Deprecated but left here for reference. I believe this implementation is unnecessarily complicated. """ def __init__( @@ -120,7 +126,7 @@ def stopped(self) -> bool: class ThreadlessTimer(TimerABC): """ - For testing/experimentation. Only executes the callback when you run poke(). + For testing/experimentation. Synchronously executes the callback when you call poke(). """ def __init__(self): diff --git a/opentelemetry-sdk/tests/trace/export/test_experimental.py b/opentelemetry-sdk/tests/trace/export/test_experimental.py index 5e6db6fd81b..591a30825f7 100644 --- a/opentelemetry-sdk/tests/trace/export/test_experimental.py +++ b/opentelemetry-sdk/tests/trace/export/test_experimental.py @@ -10,7 +10,7 @@ from opentelemetry.sdk.trace.export.experimental.client import RetryingGrpcClient, FakeGrpcClient from opentelemetry.sdk.trace.export.experimental.exporter import OTLPSpanExporter2 from opentelemetry.sdk.trace.export.experimental.processor import BatchSpanProcessor2 -from opentelemetry.sdk.trace.export.experimental.timer import ThreadlessTimer, ThreadingTimer +from opentelemetry.sdk.trace.export.experimental.timer import ThreadlessTimer, ThreadBasedTimer class TestBatchSpanProcessor(unittest.TestCase): diff --git a/opentelemetry-sdk/tests/trace/export/test_integration.py b/opentelemetry-sdk/tests/trace/export/test_integration.py index facc78e8895..8f1cddaf513 100644 --- a/opentelemetry-sdk/tests/trace/export/test_integration.py +++ b/opentelemetry-sdk/tests/trace/export/test_integration.py @@ -13,7 +13,7 @@ from opentelemetry.sdk.trace import SpanProcessor from opentelemetry.sdk.trace.export.experimental.exporter import OTLPSpanExporter2 from opentelemetry.sdk.trace.export.experimental.processor import BatchSpanProcessor2 -from opentelemetry.sdk.trace.export.experimental.timer import ThreadingTimer, PeriodicTimer, ThreadlessTimer +from opentelemetry.sdk.trace.export.experimental.timer import ThreadBasedTimer, EventBasedTimer, ThreadlessTimer @unittest.skipUnless(environ.get('RUN_LONG_TESTS', '').lower() == 'true', 'Skipping, RUN_LONG_TESTS not set') @@ -48,7 +48,7 @@ def test_slower(self): server = OTLPServer() server.start() max_interval_sec = 4 - bsp = BatchSpanProcessor2(OTLPSpanExporter2(), timer=ThreadingTimer(max_interval_sec)) + bsp = BatchSpanProcessor2(OTLPSpanExporter2(), timer=ThreadBasedTimer(max_interval_sec)) num_spans_per_firehose = 1000 sf = SpanFirehose(bsp, num_spans=num_spans_per_firehose, sleep_sec=0.01) threads = []