Skip to content

Commit

Permalink
Span and SpanContext implementation (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
c24t authored Jul 25, 2019
1 parent 7a7a207 commit 8a453ee
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 0 deletions.
2 changes: 2 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
# limitations under the License.

from . import trace
from . import util

__all__ = [
"trace",
"util",
]
228 changes: 228 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,235 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from collections import OrderedDict
from collections import deque
from collections import namedtuple
import threading
import typing

from opentelemetry import trace as trace_api
from opentelemetry.sdk import util

try:
from collections.abc import MutableMapping
from collections.abc import Sequence
except ImportError:
from collections import MutableMapping
from collections import Sequence


MAX_NUM_ATTRIBUTES = 32
MAX_NUM_EVENTS = 128
MAX_NUM_LINKS = 32

AttributeValue = typing.Union[str, bool, float]


class BoundedList(Sequence):
"""An append only list with a fixed max size."""
def __init__(self, maxlen):
self.dropped = 0
self._dq = deque(maxlen=maxlen)
self._lock = threading.Lock()

def __repr__(self):
return ("{}({}, maxlen={})"
.format(
type(self).__name__,
list(self._dq),
self._dq.maxlen
))

def __getitem__(self, index):
return self._dq[index]

def __len__(self):
return len(self._dq)

def __iter__(self):
with self._lock:
return iter(self._dq.copy())

def append(self, item):
with self._lock:
if len(self._dq) == self._dq.maxlen:
self.dropped += 1
self._dq.append(item)

def extend(self, seq):
with self._lock:
to_drop = len(seq) + len(self._dq) - self._dq.maxlen
if to_drop > 0:
self.dropped += to_drop
self._dq.extend(seq)

@classmethod
def from_seq(cls, maxlen, seq):
seq = tuple(seq)
if len(seq) > maxlen:
raise ValueError
bounded_list = cls(maxlen)
bounded_list._dq = deque(seq, maxlen=maxlen)
return bounded_list


class BoundedDict(MutableMapping):
"""A dict with a fixed max capacity."""
def __init__(self, maxlen):
if not isinstance(maxlen, int):
raise ValueError
if maxlen < 0:
raise ValueError
self.maxlen = maxlen
self.dropped = 0
self._dict = OrderedDict()
self._lock = threading.Lock()

def __repr__(self):
return ("{}({}, maxlen={})"
.format(
type(self).__name__,
dict(self._dict),
self.maxlen
))

def __getitem__(self, key):
return self._dict[key]

def __setitem__(self, key, value):
with self._lock:
if self.maxlen == 0:
self.dropped += 1
return

if key in self._dict:
del self._dict[key]
elif len(self._dict) == self.maxlen:
del self._dict[next(iter(self._dict.keys()))]
self.dropped += 1
self._dict[key] = value

def __delitem__(self, key):
del self._dict[key]

def __iter__(self):
with self._lock:
return iter(self._dict.copy())

def __len__(self):
return len(self._dict)

@classmethod
def from_map(cls, maxlen, mapping):
mapping = OrderedDict(mapping)
if len(mapping) > maxlen:
raise ValueError
bounded_dict = cls(maxlen)
bounded_dict._dict = mapping
return bounded_dict


class SpanContext(trace_api.SpanContext):
"""See `opentelemetry.trace.SpanContext`."""

def is_valid(self) -> bool:
return (self.trace_id == trace_api.INVALID_TRACE_ID or
self.span_id == trace_api.INVALID_SPAN_ID)


Event = namedtuple('Event', ('name', 'attributes'))

Link = namedtuple('Link', ('context', 'attributes'))


class Span(trace_api.Span):

# Initialize these lazily assuming most spans won't have them.
empty_attributes = BoundedDict(MAX_NUM_ATTRIBUTES)
empty_events = BoundedList(MAX_NUM_EVENTS)
empty_links = BoundedList(MAX_NUM_LINKS)

def __init__(self: 'Span',
name: str,
context: 'SpanContext',
# TODO: span processor
parent: typing.Union['Span', 'SpanContext'] = None,
root: bool = False,
sampler=None, # TODO
trace_config=None, # TraceConfig TODO
resource=None, # Resource TODO
# TODO: is_recording
attributes=None, # type TODO
events=None, # type TODO
links=None, # type TODO
) -> None:
"""See `opentelemetry.trace.Span`."""
if root:
if parent is not None:
raise ValueError("Root span can't have a parent")

self.name = name
self.context = context
self.parent = parent
self.root = root
self.sampler = sampler
self.trace_config = trace_config
self.resource = resource
self.attributes = attributes
self.events = events
self.links = links

if attributes is None:
self.attributes = Span.empty_attributes
else:
self.attributes = BoundedDict.from_map(
MAX_NUM_ATTRIBUTES, attributes)

if events is None:
self.events = Span.empty_events
else:
self.events = BoundedList.from_seq(MAX_NUM_EVENTS, events)

if links is None:
self.links = Span.empty_links
else:
self.links = BoundedList.from_seq(MAX_NUM_LINKS, links)

self.end_time = None
self.start_time = None

def set_attribute(self: 'Span',
key: str,
value: 'AttributeValue'
) -> None:
if self.attributes is Span.empty_attributes:
self.attributes = BoundedDict(MAX_NUM_ATTRIBUTES)
self.attributes[key] = value

def add_event(self: 'Span',
name: str,
attributes: typing.Dict[str, 'AttributeValue']
) -> None:
if self.events is Span.empty_events:
self.events = BoundedList(MAX_NUM_EVENTS)
self.events.append(Event(name, attributes))

def add_link(self: 'Span',
context: 'SpanContext',
attributes: typing.Dict[str, 'AttributeValue'],
) -> None:
if self.links is Span.empty_links:
self.links = BoundedList(MAX_NUM_LINKS)
self.links.append(Link(context, attributes))

def start(self):
if self.end_time is None:
self.start_time = util.time_ns()

def end(self):
if self.end_time is None:
self.end_time = util.time_ns()


class Tracer(trace_api.Tracer):
Expand Down
22 changes: 22 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2019, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time

try:
time_ns = time.time_ns # noqa
# Python versions < 3.7
except AttributeError:
def time_ns():
return int(time.time() * 1e9)
8 changes: 8 additions & 0 deletions opentelemetry-sdk/tests/trace/test_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest import mock
import unittest

from opentelemetry import trace as trace_api
Expand All @@ -23,3 +24,10 @@ class TestTracer(unittest.TestCase):
def test_extends_api(self):
tracer = trace.Tracer()
self.assertIsInstance(tracer, trace_api.Tracer)


class TestSpan(unittest.TestCase):

def test_basic_span(self):
span = trace.Span('name', mock.Mock(spec=trace.SpanContext))
self.assertEqual(span.name, 'name')

0 comments on commit 8a453ee

Please sign in to comment.