diff --git a/.flake8 b/.flake8 index 5384053b3b..a3411a1614 100644 --- a/.flake8 +++ b/.flake8 @@ -1,2 +1,3 @@ [flake8] ignore = E501,W503,E203 +exclude = .svn,CVS,.bzr,.hg,.git,__pycache__,.tox,ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/,ext/opentelemetry-ext-jaeger/build/* diff --git a/.isort.cfg b/.isort.cfg index 43cafae197..4bf64a34f1 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -12,3 +12,4 @@ line_length=79 ; ) ; docs: https://github.com/timothycrosley/isort#multi-line-output-modes multi_line_output=3 +skip_glob=ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/* diff --git a/.pylintrc b/.pylintrc index 8130305d70..782fc58700 100644 --- a/.pylintrc +++ b/.pylintrc @@ -7,7 +7,7 @@ extension-pkg-whitelist= # Add files or directories to the blacklist. They should be base names, not # paths. -ignore=CVS +ignore=CVS,gen # Add files or directories matching the regex patterns to the blacklist. The # regex matches against base names, not paths. diff --git a/ext/opentelemetry-ext-jaeger/README.rst b/ext/opentelemetry-ext-jaeger/README.rst new file mode 100644 index 0000000000..2c2e94cd9f --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/README.rst @@ -0,0 +1,67 @@ +OpenTelemetry Jaeger Exporter +============================= + +Installation +------------ + +:: + + pip install opentelemetry-ext-jaeger + + +Usage +----- + +The **OpenTelemetry Jaeger Exporter** allows to export `OpenTelemetry`_ traces to `Jaeger`_. +This exporter always send traces to the configured agent using Thrift compact protocol over UDP. +An optional collector can be configured, in this case Thrift binary protocol over HTTP is used. +gRPC is still not supported by this implementation. + + +.. _Jaeger: https://www.jaegertracing.io/ +.. _OpenTelemetry: https://github.com/opentelemetry/opentelemetry-python/ + +.. code:: python + + from opentelemetry import trace + from opentelemetry.ext import jaeger + from opentelemetry.sdk.trace import Tracer + from opentelemetry.sdk.trace.export import BatchExportSpanProcessor + + trace.set_preferred_tracer_implementation(lambda T: Tracer()) + tracer = trace.tracer() + + # create a JaegerSpanExporter + jaeger_exporter = jaeger.JaegerSpanExporter( + service_name='my-helloworld-service', + # configure agent + agent_host_name='localhost', + agent_port=6831, + # optional: configure also collector + # collector_host_name='localhost', + # collector_port=14268, + # collector_endpoint='/api/traces?format=jaeger.thrift', + # username=xxxx, # optional + # password=xxxx, # optional + ) + + # Create a BatchExportSpanProcessor and add the exporter to it + span_processor = BatchExportSpanProcessor(jaeger_exporter) + + # add to the tracer + tracer.add_span_processor(span_processor) + + with tracer.start_span('foo'): + print('Hello world!') + + # shutdown the span processor + # TODO: this has to be improved so user doesn't need to call it manually + span_processor.shutdown() + +The `examples <./examples>`_ folder contains more elaborated examples. + +References +---------- + +* `Jaeger `_ +* `OpenTelemetry Project `_ diff --git a/ext/opentelemetry-ext-jaeger/examples/jaeger_exporter_example.py b/ext/opentelemetry-ext-jaeger/examples/jaeger_exporter_example.py new file mode 100644 index 0000000000..b43b158e85 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/examples/jaeger_exporter_example.py @@ -0,0 +1,51 @@ +import time + +from opentelemetry import trace +from opentelemetry.ext import jaeger +from opentelemetry.sdk.trace import Tracer +from opentelemetry.sdk.trace.export import BatchExportSpanProcessor + +trace.set_preferred_tracer_implementation(lambda T: Tracer()) +tracer = trace.tracer() + +# create a JaegerSpanExporter +jaeger_exporter = jaeger.JaegerSpanExporter( + service_name="my-helloworld-service", + # configure agent + agent_host_name="localhost", + agent_port=6831, + # optional: configure also collector + # collector_host_name="localhost", + # collector_port=14268, + # collector_endpoint="/api/traces?format=jaeger.thrift", + # username=xxxx, # optional + # password=xxxx, # optional +) + +# create a BatchExportSpanProcessor and add the exporter to it +span_processor = BatchExportSpanProcessor(jaeger_exporter) + +# add to the tracer +tracer.add_span_processor(span_processor) + +# create some spans for testing +with tracer.start_span("foo") as foo: + time.sleep(0.1) + foo.set_attribute("my_atribbute", True) + foo.add_event("event in foo", {"name": "foo1"}) + with tracer.start_span("bar") as bar: + time.sleep(0.2) + bar.set_attribute("speed", 100.0) + bar.add_link(foo.get_context()) + + with tracer.start_span("baz") as baz: + time.sleep(0.3) + baz.set_attribute("name", "mauricio") + + time.sleep(0.2) + + time.sleep(0.1) + +# shutdown the span processor +# TODO: this has to be improved so user doesn't need to call it manually +span_processor.shutdown() diff --git a/ext/opentelemetry-ext-jaeger/setup.cfg b/ext/opentelemetry-ext-jaeger/setup.cfg new file mode 100644 index 0000000000..a5f04f1e9b --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/setup.cfg @@ -0,0 +1,47 @@ +# Copyright 2019, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +name = opentelemetry-ext-jaeger +description = Jaeger Exporter for OpenTelemetry +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python/ext/opentelemetry-ext-jaeger +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 3 - Alpha + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.4 + Programming Language :: Python :: 3.5 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + +[options] +python_requires = >=3.4 +package_dir= + =src +packages=find_namespace: +install_requires = + thrift >= 0.10.0 + opentelemetry-api + opentelemetry-sdk + +[options.packages.find] +where = src diff --git a/ext/opentelemetry-ext-jaeger/setup.py b/ext/opentelemetry-ext-jaeger/setup.py new file mode 100644 index 0000000000..44f6eb32b1 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/setup.py @@ -0,0 +1,26 @@ +# Copyright 2019, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +import setuptools + +BASE_DIR = os.path.dirname(__file__) +VERSION_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "ext", "jaeger", "version.py" +) +PACKAGE_INFO = {} +with open(VERSION_FILENAME) as f: + exec(f.read(), PACKAGE_INFO) + +setuptools.setup(version=PACKAGE_INFO["__version__"]) diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/__init__.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/__init__.py new file mode 100644 index 0000000000..b824c1a51b --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/__init__.py @@ -0,0 +1,363 @@ +# Copyright 2018, OpenCensus Authors +# Copyright 2019, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Jaeger Span Exporter for OpenTelemetry.""" + +import base64 +import logging +import socket + +from thrift.protocol import TBinaryProtocol, TCompactProtocol +from thrift.transport import THttpClient, TTransport + +import opentelemetry.trace as trace_api +from opentelemetry.ext.jaeger.gen.agent import Agent as agent +from opentelemetry.ext.jaeger.gen.jaeger import Collector as jaeger +from opentelemetry.sdk.trace.export import Span, SpanExporter, SpanExportResult + +DEFAULT_AGENT_HOST_NAME = "localhost" +DEFAULT_AGENT_PORT = 6831 +DEFAULT_COLLECTOR_ENDPOINT = "/api/traces?format=jaeger.thrift" + +UDP_PACKET_MAX_LENGTH = 65000 + +logger = logging.getLogger(__name__) + + +class JaegerSpanExporter(SpanExporter): + """Jaeger span exporter for OpenTelemetry. + + Args: + service_name: Service that logged an annotation in a trace.Classifier + when query for spans. + agent_host_name: The host name of the Jaeger-Agent. + agent_port: The port of the Jaeger-Agent. + collector_host_name: The host name of the Jaeger-Collector HTTP + Thrift. + collector_port: The port of the Jaeger-Collector HTTP Thrift. + collector_endpoint: The endpoint of the Jaeger-Collector HTTP Thrift. + username: The user name of the Basic Auth if authentication is + required. + password: The password of the Basic Auth if authentication is + required. + """ + + def __init__( + self, + service_name, + agent_host_name=DEFAULT_AGENT_HOST_NAME, + agent_port=DEFAULT_AGENT_PORT, + collector_host_name=None, + collector_port=None, + collector_endpoint=DEFAULT_COLLECTOR_ENDPOINT, + username=None, + password=None, + ): + self.service_name = service_name + self.agent_host_name = agent_host_name + self.agent_port = agent_port + self._agent_client = None + self.collector_host_name = collector_host_name + self.collector_port = collector_port + self.collector_endpoint = collector_endpoint + self.username = username + self.password = password + self._collector = None + + @property + def agent_client(self): + if self._agent_client is None: + self._agent_client = AgentClientUDP( + host_name=self.agent_host_name, port=self.agent_port + ) + return self._agent_client + + @property + def collector(self): + if self._collector is not None: + return self._collector + + if self.collector_host_name is None or self.collector_port is None: + return None + + thrift_url = "http://{}:{}{}".format( + self.collector_host_name, + self.collector_port, + self.collector_endpoint, + ) + + auth = None + if self.username is not None and self.password is not None: + auth = (self.username, self.password) + + self._collector = Collector(thrift_url=thrift_url, auth=auth) + return self._collector + + def export(self, spans): + jaeger_spans = _translate_to_jaeger(spans) + + batch = jaeger.Batch( + spans=jaeger_spans, + process=jaeger.Process(serviceName=self.service_name), + ) + + if self.collector is not None: + self.collector.submit(batch) + self.agent_client.emit(batch) + + return SpanExportResult.SUCCESS + + def shutdown(self): + pass + + +def _translate_to_jaeger(spans: Span): + """Translate the spans to Jaeger format. + + Args: + spans: Tuple of spans to convert + """ + + jaeger_spans = [] + + for span in spans: + ctx = span.get_context() + trace_id = ctx.trace_id + span_id = ctx.span_id + + start_time_us = span.start_time // 1e3 + duration_us = (span.end_time - span.start_time) // 1e3 + + parent_id = 0 + if isinstance(span.parent, trace_api.Span): + parent_id = span.parent.get_context().span_id + elif isinstance(span.parent, trace_api.SpanContext): + parent_id = span.parent.span_id + + tags = _extract_tags(span.attributes) + + # TODO: status is missing: + # https://github.com/open-telemetry/opentelemetry-python/issues/98 + + refs = _extract_refs_from_span(span) + logs = _extract_logs_from_span(span) + + flags = int(ctx.trace_options) + + jaeger_span = jaeger.Span( + traceIdHigh=_get_trace_id_high(trace_id), + traceIdLow=_get_trace_id_low(trace_id), + # generated code expects i64 + spanId=_convert_int_to_i64(span_id), + operationName=span.name, + startTime=start_time_us, + duration=duration_us, + tags=tags, + logs=logs, + references=refs, + flags=flags, + parentSpanId=_convert_int_to_i64(parent_id), + ) + + jaeger_spans.append(jaeger_span) + + return jaeger_spans + + +def _extract_refs_from_span(span): + if not span.links: + return None + + refs = [] + for link in span.links: + trace_id = link.context.trace_id + span_id = link.context.span_id + refs.append( + jaeger.SpanRef( + refType=jaeger.SpanRefType.FOLLOWS_FROM, + traceIdHigh=_get_trace_id_high(trace_id), + traceIdLow=_get_trace_id_low(trace_id), + spanId=_convert_int_to_i64(span_id), + ) + ) + return refs + + +def _convert_int_to_i64(val): + """Convert integer to signed int64 (i64)""" + if val > 0x7FFFFFFFFFFFFFFF: + val -= 0x10000000000000000 + return val + + +def _get_trace_id_low(trace_id): + return _convert_int_to_i64(trace_id & 0xFFFFFFFFFFFFFFFF) + + +def _get_trace_id_high(trace_id): + return _convert_int_to_i64((trace_id >> 64) & 0xFFFFFFFFFFFFFFFF) + + +def _extract_logs_from_span(span): + if not span.events: + return None + + logs = [] + + for event in span.events: + fields = [] + if event.attributes is not None: + fields = _extract_tags(event.attributes) + + fields.append( + jaeger.Tag( + key="message", vType=jaeger.TagType.STRING, vStr=event.name + ) + ) + + event_timestamp_us = event.timestamp // 1e3 + logs.append( + jaeger.Log(timestamp=int(event_timestamp_us), fields=fields) + ) + return logs + + +def _extract_tags(attr): + if not attr: + return None + tags = [] + for attribute_key, attribute_value in attr.items(): + tag = _convert_attribute_to_tag(attribute_key, attribute_value) + if tag is None: + continue + tags.append(tag) + return tags + + +def _convert_attribute_to_tag(key, attr): + """Convert the attributes to jaeger tags.""" + if isinstance(attr, bool): + return jaeger.Tag(key=key, vBool=attr, vType=jaeger.TagType.BOOL) + if isinstance(attr, str): + return jaeger.Tag(key=key, vStr=attr, vType=jaeger.TagType.STRING) + if isinstance(attr, int): + return jaeger.Tag(key=key, vLong=attr, vType=jaeger.TagType.LONG) + if isinstance(attr, float): + return jaeger.Tag(key=key, vDouble=attr, vType=jaeger.TagType.DOUBLE) + logger.warning("Could not serialize attribute %s:%r to tag", key, attr) + return None + + +class AgentClientUDP: + """Implement a UDP client to agent. + + Args: + host_name: The host name of the Jaeger server. + port: The port of the Jaeger server. + max_packet_size: Maximum size of UDP packet. + client: Class for creating new client objects for agencies. + """ + + def __init__( + self, + host_name, + port, + max_packet_size=UDP_PACKET_MAX_LENGTH, + client=agent.Client, + ): + self.address = (host_name, port) + self.max_packet_size = max_packet_size + self.buffer = TTransport.TMemoryBuffer() + self.client = client( + iprot=TCompactProtocol.TCompactProtocol(trans=self.buffer) + ) + + def emit(self, batch: jaeger.Batch): + """ + Args: + batch: Object to emit Jaeger spans. + """ + + # pylint: disable=protected-access + self.client._seqid = 0 + # truncate and reset the position of BytesIO object + self.buffer._buffer.truncate(0) + self.buffer._buffer.seek(0) + self.client.emitBatch(batch) + buff = self.buffer.getvalue() + if len(buff) > self.max_packet_size: + logger.warning( + "Data exceeds the max UDP packet size; size %r, max %r", + len(buff), + self.max_packet_size, + ) + return + + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket: + udp_socket.sendto(buff, self.address) + + +class Collector: + """Submits collected spans to Thrift HTTP server. + + Args: + thrift_url: URL of the Jaeger HTTP Thrift. + auth: Auth tuple that contains username and password for Basic Auth. + client: Class for creating a Jaeger collector client. + http_transport: Class for creating new client for Thrift HTTP server. + """ + + def __init__( + self, + thrift_url="", + auth=None, + client=jaeger.Client, + http_transport=THttpClient.THttpClient, + ): + self.thrift_url = thrift_url + self.auth = auth + self.http_transport = http_transport(uri_or_host=thrift_url) + self.client = client( + iprot=TBinaryProtocol.TBinaryProtocol(trans=self.http_transport) + ) + + # set basic auth header + if auth is not None: + auth_header = "{}:{}".format(*auth) + decoded = base64.b64encode(auth_header.encode()).decode("ascii") + basic_auth = dict(Authorization="Basic {}".format(decoded)) + self.http_transport.setCustomHeaders(basic_auth) + + def submit(self, batch: jaeger.Batch): + """Submits batches to Thrift HTTP Server through Binary Protocol. + + Args: + batch: Object to emit Jaeger spans. + """ + try: + self.client.submitBatches([batch]) + # it will call http_transport.flush() and + # status code and message will be updated + code = self.http_transport.code + msg = self.http_transport.message + if code >= 300 or code < 200: + logger.error( + "Traces cannot be uploaded; HTTP status code: %s, message %s", + code, + msg, + ) + finally: + if self.http_transport.isOpen(): + self.http_transport.close() diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/__init__.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/__init__.py new file mode 100644 index 0000000000..52b3cfb3e9 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/__init__.py @@ -0,0 +1,4 @@ + +import sys +from os.path import dirname +sys.path.append(dirname(__file__)) diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/Agent-remote b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/Agent-remote new file mode 100755 index 0000000000..5db3d20804 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/Agent-remote @@ -0,0 +1,124 @@ +#!/usr/bin/env python +# +# Autogenerated by Thrift Compiler (0.10.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +import sys +import pprint +if sys.version_info[0] > 2: + from urllib.parse import urlparse +else: + from urlparse import urlparse +from thrift.transport import TTransport, TSocket, TSSLSocket, THttpClient +from thrift.protocol.TBinaryProtocol import TBinaryProtocol + +from agent import Agent +from agent.ttypes import * + +if len(sys.argv) <= 1 or sys.argv[1] == '--help': + print('') + print('Usage: ' + sys.argv[0] + ' [-h host[:port]] [-u url] [-f[ramed]] [-s[sl]] [-novalidate] [-ca_certs certs] [-keyfile keyfile] [-certfile certfile] function [arg1 [arg2...]]') + print('') + print('Functions:') + print(' void emitZipkinBatch( spans)') + print(' void emitBatch(Batch batch)') + print('') + sys.exit(0) + +pp = pprint.PrettyPrinter(indent=2) +host = 'localhost' +port = 9090 +uri = '' +framed = False +ssl = False +validate = True +ca_certs = None +keyfile = None +certfile = None +http = False +argi = 1 + +if sys.argv[argi] == '-h': + parts = sys.argv[argi + 1].split(':') + host = parts[0] + if len(parts) > 1: + port = int(parts[1]) + argi += 2 + +if sys.argv[argi] == '-u': + url = urlparse(sys.argv[argi + 1]) + parts = url[1].split(':') + host = parts[0] + if len(parts) > 1: + port = int(parts[1]) + else: + port = 80 + uri = url[2] + if url[4]: + uri += '?%s' % url[4] + http = True + argi += 2 + +if sys.argv[argi] == '-f' or sys.argv[argi] == '-framed': + framed = True + argi += 1 + +if sys.argv[argi] == '-s' or sys.argv[argi] == '-ssl': + ssl = True + argi += 1 + +if sys.argv[argi] == '-novalidate': + validate = False + argi += 1 + +if sys.argv[argi] == '-ca_certs': + ca_certs = sys.argv[argi+1] + argi += 2 + +if sys.argv[argi] == '-keyfile': + keyfile = sys.argv[argi+1] + argi += 2 + +if sys.argv[argi] == '-certfile': + certfile = sys.argv[argi+1] + argi += 2 + +cmd = sys.argv[argi] +args = sys.argv[argi + 1:] + +if http: + transport = THttpClient.THttpClient(host, port, uri) +else: + if ssl: + socket = TSSLSocket.TSSLSocket(host, port, validate=validate, ca_certs=ca_certs, keyfile=keyfile, certfile=certfile) + else: + socket = TSocket.TSocket(host, port) + if framed: + transport = TTransport.TFramedTransport(socket) + else: + transport = TTransport.TBufferedTransport(socket) +protocol = TBinaryProtocol(transport) +client = Agent.Client(protocol) +transport.open() + +if cmd == 'emitZipkinBatch': + if len(args) != 1: + print('emitZipkinBatch requires 1 args') + sys.exit(1) + pp.pprint(client.emitZipkinBatch(eval(args[0]),)) + +elif cmd == 'emitBatch': + if len(args) != 1: + print('emitBatch requires 1 args') + sys.exit(1) + pp.pprint(client.emitBatch(eval(args[0]),)) + +else: + print('Unrecognized method %s' % cmd) + sys.exit(1) + +transport.close() diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/Agent.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/Agent.py new file mode 100644 index 0000000000..e8e0fe662e --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/Agent.py @@ -0,0 +1,246 @@ +# +# Autogenerated by Thrift Compiler (0.10.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +from thrift.Thrift import TType, TMessageType, TFrozenDict, TException, TApplicationException +from thrift.protocol.TProtocol import TProtocolException +import sys +import logging +from .ttypes import * +from thrift.Thrift import TProcessor +from thrift.transport import TTransport + + +class Iface(object): + def emitZipkinBatch(self, spans): + """ + Parameters: + - spans + """ + pass + + def emitBatch(self, batch): + """ + Parameters: + - batch + """ + pass + + +class Client(Iface): + def __init__(self, iprot, oprot=None): + self._iprot = self._oprot = iprot + if oprot is not None: + self._oprot = oprot + self._seqid = 0 + + def emitZipkinBatch(self, spans): + """ + Parameters: + - spans + """ + self.send_emitZipkinBatch(spans) + + def send_emitZipkinBatch(self, spans): + self._oprot.writeMessageBegin('emitZipkinBatch', TMessageType.ONEWAY, self._seqid) + args = emitZipkinBatch_args() + args.spans = spans + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def emitBatch(self, batch): + """ + Parameters: + - batch + """ + self.send_emitBatch(batch) + + def send_emitBatch(self, batch): + self._oprot.writeMessageBegin('emitBatch', TMessageType.ONEWAY, self._seqid) + args = emitBatch_args() + args.batch = batch + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + +class Processor(Iface, TProcessor): + def __init__(self, handler): + self._handler = handler + self._processMap = {} + self._processMap["emitZipkinBatch"] = Processor.process_emitZipkinBatch + self._processMap["emitBatch"] = Processor.process_emitBatch + + def process(self, iprot, oprot): + (name, type, seqid) = iprot.readMessageBegin() + if name not in self._processMap: + iprot.skip(TType.STRUCT) + iprot.readMessageEnd() + x = TApplicationException(TApplicationException.UNKNOWN_METHOD, 'Unknown function %s' % (name)) + oprot.writeMessageBegin(name, TMessageType.EXCEPTION, seqid) + x.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + return + else: + self._processMap[name](self, seqid, iprot, oprot) + return True + + def process_emitZipkinBatch(self, seqid, iprot, oprot): + args = emitZipkinBatch_args() + args.read(iprot) + iprot.readMessageEnd() + try: + self._handler.emitZipkinBatch(args.spans) + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except: + pass + + def process_emitBatch(self, seqid, iprot, oprot): + args = emitBatch_args() + args.read(iprot) + iprot.readMessageEnd() + try: + self._handler.emitBatch(args.batch) + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except: + pass + +# HELPER FUNCTIONS AND STRUCTURES + + +class emitZipkinBatch_args(object): + """ + Attributes: + - spans + """ + + thrift_spec = ( + None, # 0 + (1, TType.LIST, 'spans', (TType.STRUCT, (zipkincore.ttypes.Span, zipkincore.ttypes.Span.thrift_spec), False), None, ), # 1 + ) + + def __init__(self, spans=None,): + self.spans = spans + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.LIST: + self.spans = [] + (_etype3, _size0) = iprot.readListBegin() + for _i4 in range(_size0): + _elem5 = zipkincore.ttypes.Span() + _elem5.read(iprot) + self.spans.append(_elem5) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('emitZipkinBatch_args') + if self.spans is not None: + oprot.writeFieldBegin('spans', TType.LIST, 1) + oprot.writeListBegin(TType.STRUCT, len(self.spans)) + for iter6 in self.spans: + iter6.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class emitBatch_args(object): + """ + Attributes: + - batch + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'batch', (jaeger.ttypes.Batch, jaeger.ttypes.Batch.thrift_spec), None, ), # 1 + ) + + def __init__(self, batch=None,): + self.batch = batch + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.batch = jaeger.ttypes.Batch() + self.batch.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('emitBatch_args') + if self.batch is not None: + oprot.writeFieldBegin('batch', TType.STRUCT, 1) + self.batch.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/__init__.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/__init__.py new file mode 100644 index 0000000000..1059cfbc01 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/__init__.py @@ -0,0 +1 @@ +__all__ = ['ttypes', 'constants', 'Agent'] diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/constants.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/constants.py new file mode 100644 index 0000000000..eb0d35aa12 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/constants.py @@ -0,0 +1,12 @@ +# +# Autogenerated by Thrift Compiler (0.10.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +from thrift.Thrift import TType, TMessageType, TFrozenDict, TException, TApplicationException +from thrift.protocol.TProtocol import TProtocolException +import sys +from .ttypes import * diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/ttypes.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/ttypes.py new file mode 100644 index 0000000000..fc8743cba9 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/agent/ttypes.py @@ -0,0 +1,15 @@ +# +# Autogenerated by Thrift Compiler (0.10.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +from thrift.Thrift import TType, TMessageType, TFrozenDict, TException, TApplicationException +from thrift.protocol.TProtocol import TProtocolException +import sys +import jaeger.ttypes +import zipkincore.ttypes + +from thrift.transport import TTransport diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/Collector-remote b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/Collector-remote new file mode 100755 index 0000000000..5903f02360 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/Collector-remote @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# +# Autogenerated by Thrift Compiler (0.10.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +import sys +import pprint +if sys.version_info[0] > 2: + from urllib.parse import urlparse +else: + from urlparse import urlparse +from thrift.transport import TTransport, TSocket, TSSLSocket, THttpClient +from thrift.protocol.TBinaryProtocol import TBinaryProtocol + +from jaeger import Collector +from jaeger.ttypes import * + +if len(sys.argv) <= 1 or sys.argv[1] == '--help': + print('') + print('Usage: ' + sys.argv[0] + ' [-h host[:port]] [-u url] [-f[ramed]] [-s[sl]] [-novalidate] [-ca_certs certs] [-keyfile keyfile] [-certfile certfile] function [arg1 [arg2...]]') + print('') + print('Functions:') + print(' submitBatches( batches)') + print('') + sys.exit(0) + +pp = pprint.PrettyPrinter(indent=2) +host = 'localhost' +port = 9090 +uri = '' +framed = False +ssl = False +validate = True +ca_certs = None +keyfile = None +certfile = None +http = False +argi = 1 + +if sys.argv[argi] == '-h': + parts = sys.argv[argi + 1].split(':') + host = parts[0] + if len(parts) > 1: + port = int(parts[1]) + argi += 2 + +if sys.argv[argi] == '-u': + url = urlparse(sys.argv[argi + 1]) + parts = url[1].split(':') + host = parts[0] + if len(parts) > 1: + port = int(parts[1]) + else: + port = 80 + uri = url[2] + if url[4]: + uri += '?%s' % url[4] + http = True + argi += 2 + +if sys.argv[argi] == '-f' or sys.argv[argi] == '-framed': + framed = True + argi += 1 + +if sys.argv[argi] == '-s' or sys.argv[argi] == '-ssl': + ssl = True + argi += 1 + +if sys.argv[argi] == '-novalidate': + validate = False + argi += 1 + +if sys.argv[argi] == '-ca_certs': + ca_certs = sys.argv[argi+1] + argi += 2 + +if sys.argv[argi] == '-keyfile': + keyfile = sys.argv[argi+1] + argi += 2 + +if sys.argv[argi] == '-certfile': + certfile = sys.argv[argi+1] + argi += 2 + +cmd = sys.argv[argi] +args = sys.argv[argi + 1:] + +if http: + transport = THttpClient.THttpClient(host, port, uri) +else: + if ssl: + socket = TSSLSocket.TSSLSocket(host, port, validate=validate, ca_certs=ca_certs, keyfile=keyfile, certfile=certfile) + else: + socket = TSocket.TSocket(host, port) + if framed: + transport = TTransport.TFramedTransport(socket) + else: + transport = TTransport.TBufferedTransport(socket) +protocol = TBinaryProtocol(transport) +client = Collector.Client(protocol) +transport.open() + +if cmd == 'submitBatches': + if len(args) != 1: + print('submitBatches requires 1 args') + sys.exit(1) + pp.pprint(client.submitBatches(eval(args[0]),)) + +else: + print('Unrecognized method %s' % cmd) + sys.exit(1) + +transport.close() diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/Collector.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/Collector.py new file mode 100644 index 0000000000..f6f809b089 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/Collector.py @@ -0,0 +1,243 @@ +# +# Autogenerated by Thrift Compiler (0.10.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +from thrift.Thrift import TType, TMessageType, TFrozenDict, TException, TApplicationException +from thrift.protocol.TProtocol import TProtocolException +import sys +import logging +from .ttypes import * +from thrift.Thrift import TProcessor +from thrift.transport import TTransport + + +class Iface(object): + def submitBatches(self, batches): + """ + Parameters: + - batches + """ + pass + + +class Client(Iface): + def __init__(self, iprot, oprot=None): + self._iprot = self._oprot = iprot + if oprot is not None: + self._oprot = oprot + self._seqid = 0 + + def submitBatches(self, batches): + """ + Parameters: + - batches + """ + self.send_submitBatches(batches) + return self.recv_submitBatches() + + def send_submitBatches(self, batches): + self._oprot.writeMessageBegin('submitBatches', TMessageType.CALL, self._seqid) + args = submitBatches_args() + args.batches = batches + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_submitBatches(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = submitBatches_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + raise TApplicationException(TApplicationException.MISSING_RESULT, "submitBatches failed: unknown result") + + +class Processor(Iface, TProcessor): + def __init__(self, handler): + self._handler = handler + self._processMap = {} + self._processMap["submitBatches"] = Processor.process_submitBatches + + def process(self, iprot, oprot): + (name, type, seqid) = iprot.readMessageBegin() + if name not in self._processMap: + iprot.skip(TType.STRUCT) + iprot.readMessageEnd() + x = TApplicationException(TApplicationException.UNKNOWN_METHOD, 'Unknown function %s' % (name)) + oprot.writeMessageBegin(name, TMessageType.EXCEPTION, seqid) + x.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + return + else: + self._processMap[name](self, seqid, iprot, oprot) + return True + + def process_submitBatches(self, seqid, iprot, oprot): + args = submitBatches_args() + args.read(iprot) + iprot.readMessageEnd() + result = submitBatches_result() + try: + result.success = self._handler.submitBatches(args.batches) + msg_type = TMessageType.REPLY + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except Exception as ex: + msg_type = TMessageType.EXCEPTION + logging.exception(ex) + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("submitBatches", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + +# HELPER FUNCTIONS AND STRUCTURES + + +class submitBatches_args(object): + """ + Attributes: + - batches + """ + + thrift_spec = ( + None, # 0 + (1, TType.LIST, 'batches', (TType.STRUCT, (Batch, Batch.thrift_spec), False), None, ), # 1 + ) + + def __init__(self, batches=None,): + self.batches = batches + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.LIST: + self.batches = [] + (_etype45, _size42) = iprot.readListBegin() + for _i46 in range(_size42): + _elem47 = Batch() + _elem47.read(iprot) + self.batches.append(_elem47) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('submitBatches_args') + if self.batches is not None: + oprot.writeFieldBegin('batches', TType.LIST, 1) + oprot.writeListBegin(TType.STRUCT, len(self.batches)) + for iter48 in self.batches: + iter48.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class submitBatches_result(object): + """ + Attributes: + - success + """ + + thrift_spec = ( + (0, TType.LIST, 'success', (TType.STRUCT, (BatchSubmitResponse, BatchSubmitResponse.thrift_spec), False), None, ), # 0 + ) + + def __init__(self, success=None,): + self.success = success + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.LIST: + self.success = [] + (_etype52, _size49) = iprot.readListBegin() + for _i53 in range(_size49): + _elem54 = BatchSubmitResponse() + _elem54.read(iprot) + self.success.append(_elem54) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('submitBatches_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.LIST, 0) + oprot.writeListBegin(TType.STRUCT, len(self.success)) + for iter55 in self.success: + iter55.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/__init__.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/__init__.py new file mode 100644 index 0000000000..515d97d672 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/__init__.py @@ -0,0 +1 @@ +__all__ = ['ttypes', 'constants', 'Collector'] diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/constants.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/constants.py new file mode 100644 index 0000000000..eb0d35aa12 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/constants.py @@ -0,0 +1,12 @@ +# +# Autogenerated by Thrift Compiler (0.10.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +from thrift.Thrift import TType, TMessageType, TFrozenDict, TException, TApplicationException +from thrift.protocol.TProtocol import TProtocolException +import sys +from .ttypes import * diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/ttypes.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/ttypes.py new file mode 100644 index 0000000000..a43252b79d --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/jaeger/ttypes.py @@ -0,0 +1,831 @@ +# +# Autogenerated by Thrift Compiler (0.10.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +from thrift.Thrift import TType, TMessageType, TFrozenDict, TException, TApplicationException +from thrift.protocol.TProtocol import TProtocolException +import sys + +from thrift.transport import TTransport + + +class TagType(object): + STRING = 0 + DOUBLE = 1 + BOOL = 2 + LONG = 3 + BINARY = 4 + + _VALUES_TO_NAMES = { + 0: "STRING", + 1: "DOUBLE", + 2: "BOOL", + 3: "LONG", + 4: "BINARY", + } + + _NAMES_TO_VALUES = { + "STRING": 0, + "DOUBLE": 1, + "BOOL": 2, + "LONG": 3, + "BINARY": 4, + } + + +class SpanRefType(object): + CHILD_OF = 0 + FOLLOWS_FROM = 1 + + _VALUES_TO_NAMES = { + 0: "CHILD_OF", + 1: "FOLLOWS_FROM", + } + + _NAMES_TO_VALUES = { + "CHILD_OF": 0, + "FOLLOWS_FROM": 1, + } + + +class Tag(object): + """ + Attributes: + - key + - vType + - vStr + - vDouble + - vBool + - vLong + - vBinary + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'key', 'UTF8', None, ), # 1 + (2, TType.I32, 'vType', None, None, ), # 2 + (3, TType.STRING, 'vStr', 'UTF8', None, ), # 3 + (4, TType.DOUBLE, 'vDouble', None, None, ), # 4 + (5, TType.BOOL, 'vBool', None, None, ), # 5 + (6, TType.I64, 'vLong', None, None, ), # 6 + (7, TType.STRING, 'vBinary', 'BINARY', None, ), # 7 + ) + + def __init__(self, key=None, vType=None, vStr=None, vDouble=None, vBool=None, vLong=None, vBinary=None,): + self.key = key + self.vType = vType + self.vStr = vStr + self.vDouble = vDouble + self.vBool = vBool + self.vLong = vLong + self.vBinary = vBinary + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.key = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I32: + self.vType = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.vStr = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.DOUBLE: + self.vDouble = iprot.readDouble() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.BOOL: + self.vBool = iprot.readBool() + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.I64: + self.vLong = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 7: + if ftype == TType.STRING: + self.vBinary = iprot.readBinary() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Tag') + if self.key is not None: + oprot.writeFieldBegin('key', TType.STRING, 1) + oprot.writeString(self.key.encode('utf-8') if sys.version_info[0] == 2 else self.key) + oprot.writeFieldEnd() + if self.vType is not None: + oprot.writeFieldBegin('vType', TType.I32, 2) + oprot.writeI32(self.vType) + oprot.writeFieldEnd() + if self.vStr is not None: + oprot.writeFieldBegin('vStr', TType.STRING, 3) + oprot.writeString(self.vStr.encode('utf-8') if sys.version_info[0] == 2 else self.vStr) + oprot.writeFieldEnd() + if self.vDouble is not None: + oprot.writeFieldBegin('vDouble', TType.DOUBLE, 4) + oprot.writeDouble(self.vDouble) + oprot.writeFieldEnd() + if self.vBool is not None: + oprot.writeFieldBegin('vBool', TType.BOOL, 5) + oprot.writeBool(self.vBool) + oprot.writeFieldEnd() + if self.vLong is not None: + oprot.writeFieldBegin('vLong', TType.I64, 6) + oprot.writeI64(self.vLong) + oprot.writeFieldEnd() + if self.vBinary is not None: + oprot.writeFieldBegin('vBinary', TType.STRING, 7) + oprot.writeBinary(self.vBinary) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.key is None: + raise TProtocolException(message='Required field key is unset!') + if self.vType is None: + raise TProtocolException(message='Required field vType is unset!') + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class Log(object): + """ + Attributes: + - timestamp + - fields + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'timestamp', None, None, ), # 1 + (2, TType.LIST, 'fields', (TType.STRUCT, (Tag, Tag.thrift_spec), False), None, ), # 2 + ) + + def __init__(self, timestamp=None, fields=None,): + self.timestamp = timestamp + self.fields = fields + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.timestamp = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.LIST: + self.fields = [] + (_etype3, _size0) = iprot.readListBegin() + for _i4 in range(_size0): + _elem5 = Tag() + _elem5.read(iprot) + self.fields.append(_elem5) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Log') + if self.timestamp is not None: + oprot.writeFieldBegin('timestamp', TType.I64, 1) + oprot.writeI64(self.timestamp) + oprot.writeFieldEnd() + if self.fields is not None: + oprot.writeFieldBegin('fields', TType.LIST, 2) + oprot.writeListBegin(TType.STRUCT, len(self.fields)) + for iter6 in self.fields: + iter6.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.timestamp is None: + raise TProtocolException(message='Required field timestamp is unset!') + if self.fields is None: + raise TProtocolException(message='Required field fields is unset!') + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class SpanRef(object): + """ + Attributes: + - refType + - traceIdLow + - traceIdHigh + - spanId + """ + + thrift_spec = ( + None, # 0 + (1, TType.I32, 'refType', None, None, ), # 1 + (2, TType.I64, 'traceIdLow', None, None, ), # 2 + (3, TType.I64, 'traceIdHigh', None, None, ), # 3 + (4, TType.I64, 'spanId', None, None, ), # 4 + ) + + def __init__(self, refType=None, traceIdLow=None, traceIdHigh=None, spanId=None,): + self.refType = refType + self.traceIdLow = traceIdLow + self.traceIdHigh = traceIdHigh + self.spanId = spanId + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I32: + self.refType = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.traceIdLow = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.traceIdHigh = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I64: + self.spanId = iprot.readI64() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('SpanRef') + if self.refType is not None: + oprot.writeFieldBegin('refType', TType.I32, 1) + oprot.writeI32(self.refType) + oprot.writeFieldEnd() + if self.traceIdLow is not None: + oprot.writeFieldBegin('traceIdLow', TType.I64, 2) + oprot.writeI64(self.traceIdLow) + oprot.writeFieldEnd() + if self.traceIdHigh is not None: + oprot.writeFieldBegin('traceIdHigh', TType.I64, 3) + oprot.writeI64(self.traceIdHigh) + oprot.writeFieldEnd() + if self.spanId is not None: + oprot.writeFieldBegin('spanId', TType.I64, 4) + oprot.writeI64(self.spanId) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.refType is None: + raise TProtocolException(message='Required field refType is unset!') + if self.traceIdLow is None: + raise TProtocolException(message='Required field traceIdLow is unset!') + if self.traceIdHigh is None: + raise TProtocolException(message='Required field traceIdHigh is unset!') + if self.spanId is None: + raise TProtocolException(message='Required field spanId is unset!') + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class Span(object): + """ + Attributes: + - traceIdLow + - traceIdHigh + - spanId + - parentSpanId + - operationName + - references + - flags + - startTime + - duration + - tags + - logs + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'traceIdLow', None, None, ), # 1 + (2, TType.I64, 'traceIdHigh', None, None, ), # 2 + (3, TType.I64, 'spanId', None, None, ), # 3 + (4, TType.I64, 'parentSpanId', None, None, ), # 4 + (5, TType.STRING, 'operationName', 'UTF8', None, ), # 5 + (6, TType.LIST, 'references', (TType.STRUCT, (SpanRef, SpanRef.thrift_spec), False), None, ), # 6 + (7, TType.I32, 'flags', None, None, ), # 7 + (8, TType.I64, 'startTime', None, None, ), # 8 + (9, TType.I64, 'duration', None, None, ), # 9 + (10, TType.LIST, 'tags', (TType.STRUCT, (Tag, Tag.thrift_spec), False), None, ), # 10 + (11, TType.LIST, 'logs', (TType.STRUCT, (Log, Log.thrift_spec), False), None, ), # 11 + ) + + def __init__(self, traceIdLow=None, traceIdHigh=None, spanId=None, parentSpanId=None, operationName=None, references=None, flags=None, startTime=None, duration=None, tags=None, logs=None,): + self.traceIdLow = traceIdLow + self.traceIdHigh = traceIdHigh + self.spanId = spanId + self.parentSpanId = parentSpanId + self.operationName = operationName + self.references = references + self.flags = flags + self.startTime = startTime + self.duration = duration + self.tags = tags + self.logs = logs + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.traceIdLow = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.traceIdHigh = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.spanId = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I64: + self.parentSpanId = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.STRING: + self.operationName = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.LIST: + self.references = [] + (_etype10, _size7) = iprot.readListBegin() + for _i11 in range(_size7): + _elem12 = SpanRef() + _elem12.read(iprot) + self.references.append(_elem12) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 7: + if ftype == TType.I32: + self.flags = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 8: + if ftype == TType.I64: + self.startTime = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 9: + if ftype == TType.I64: + self.duration = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 10: + if ftype == TType.LIST: + self.tags = [] + (_etype16, _size13) = iprot.readListBegin() + for _i17 in range(_size13): + _elem18 = Tag() + _elem18.read(iprot) + self.tags.append(_elem18) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 11: + if ftype == TType.LIST: + self.logs = [] + (_etype22, _size19) = iprot.readListBegin() + for _i23 in range(_size19): + _elem24 = Log() + _elem24.read(iprot) + self.logs.append(_elem24) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Span') + if self.traceIdLow is not None: + oprot.writeFieldBegin('traceIdLow', TType.I64, 1) + oprot.writeI64(self.traceIdLow) + oprot.writeFieldEnd() + if self.traceIdHigh is not None: + oprot.writeFieldBegin('traceIdHigh', TType.I64, 2) + oprot.writeI64(self.traceIdHigh) + oprot.writeFieldEnd() + if self.spanId is not None: + oprot.writeFieldBegin('spanId', TType.I64, 3) + oprot.writeI64(self.spanId) + oprot.writeFieldEnd() + if self.parentSpanId is not None: + oprot.writeFieldBegin('parentSpanId', TType.I64, 4) + oprot.writeI64(self.parentSpanId) + oprot.writeFieldEnd() + if self.operationName is not None: + oprot.writeFieldBegin('operationName', TType.STRING, 5) + oprot.writeString(self.operationName.encode('utf-8') if sys.version_info[0] == 2 else self.operationName) + oprot.writeFieldEnd() + if self.references is not None: + oprot.writeFieldBegin('references', TType.LIST, 6) + oprot.writeListBegin(TType.STRUCT, len(self.references)) + for iter25 in self.references: + iter25.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.flags is not None: + oprot.writeFieldBegin('flags', TType.I32, 7) + oprot.writeI32(self.flags) + oprot.writeFieldEnd() + if self.startTime is not None: + oprot.writeFieldBegin('startTime', TType.I64, 8) + oprot.writeI64(self.startTime) + oprot.writeFieldEnd() + if self.duration is not None: + oprot.writeFieldBegin('duration', TType.I64, 9) + oprot.writeI64(self.duration) + oprot.writeFieldEnd() + if self.tags is not None: + oprot.writeFieldBegin('tags', TType.LIST, 10) + oprot.writeListBegin(TType.STRUCT, len(self.tags)) + for iter26 in self.tags: + iter26.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.logs is not None: + oprot.writeFieldBegin('logs', TType.LIST, 11) + oprot.writeListBegin(TType.STRUCT, len(self.logs)) + for iter27 in self.logs: + iter27.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.traceIdLow is None: + raise TProtocolException(message='Required field traceIdLow is unset!') + if self.traceIdHigh is None: + raise TProtocolException(message='Required field traceIdHigh is unset!') + if self.spanId is None: + raise TProtocolException(message='Required field spanId is unset!') + if self.parentSpanId is None: + raise TProtocolException(message='Required field parentSpanId is unset!') + if self.operationName is None: + raise TProtocolException(message='Required field operationName is unset!') + if self.flags is None: + raise TProtocolException(message='Required field flags is unset!') + if self.startTime is None: + raise TProtocolException(message='Required field startTime is unset!') + if self.duration is None: + raise TProtocolException(message='Required field duration is unset!') + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class Process(object): + """ + Attributes: + - serviceName + - tags + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'serviceName', 'UTF8', None, ), # 1 + (2, TType.LIST, 'tags', (TType.STRUCT, (Tag, Tag.thrift_spec), False), None, ), # 2 + ) + + def __init__(self, serviceName=None, tags=None,): + self.serviceName = serviceName + self.tags = tags + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.serviceName = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.LIST: + self.tags = [] + (_etype31, _size28) = iprot.readListBegin() + for _i32 in range(_size28): + _elem33 = Tag() + _elem33.read(iprot) + self.tags.append(_elem33) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Process') + if self.serviceName is not None: + oprot.writeFieldBegin('serviceName', TType.STRING, 1) + oprot.writeString(self.serviceName.encode('utf-8') if sys.version_info[0] == 2 else self.serviceName) + oprot.writeFieldEnd() + if self.tags is not None: + oprot.writeFieldBegin('tags', TType.LIST, 2) + oprot.writeListBegin(TType.STRUCT, len(self.tags)) + for iter34 in self.tags: + iter34.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.serviceName is None: + raise TProtocolException(message='Required field serviceName is unset!') + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class Batch(object): + """ + Attributes: + - process + - spans + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'process', (Process, Process.thrift_spec), None, ), # 1 + (2, TType.LIST, 'spans', (TType.STRUCT, (Span, Span.thrift_spec), False), None, ), # 2 + ) + + def __init__(self, process=None, spans=None,): + self.process = process + self.spans = spans + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.process = Process() + self.process.read(iprot) + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.LIST: + self.spans = [] + (_etype38, _size35) = iprot.readListBegin() + for _i39 in range(_size35): + _elem40 = Span() + _elem40.read(iprot) + self.spans.append(_elem40) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Batch') + if self.process is not None: + oprot.writeFieldBegin('process', TType.STRUCT, 1) + self.process.write(oprot) + oprot.writeFieldEnd() + if self.spans is not None: + oprot.writeFieldBegin('spans', TType.LIST, 2) + oprot.writeListBegin(TType.STRUCT, len(self.spans)) + for iter41 in self.spans: + iter41.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.process is None: + raise TProtocolException(message='Required field process is unset!') + if self.spans is None: + raise TProtocolException(message='Required field spans is unset!') + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class BatchSubmitResponse(object): + """ + Attributes: + - ok + """ + + thrift_spec = ( + None, # 0 + (1, TType.BOOL, 'ok', None, None, ), # 1 + ) + + def __init__(self, ok=None,): + self.ok = ok + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.BOOL: + self.ok = iprot.readBool() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('BatchSubmitResponse') + if self.ok is not None: + oprot.writeFieldBegin('ok', TType.BOOL, 1) + oprot.writeBool(self.ok) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.ok is None: + raise TProtocolException(message='Required field ok is unset!') + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/ZipkinCollector-remote b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/ZipkinCollector-remote new file mode 100755 index 0000000000..2b59c3275d --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/ZipkinCollector-remote @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# +# Autogenerated by Thrift Compiler (0.10.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +import sys +import pprint +if sys.version_info[0] > 2: + from urllib.parse import urlparse +else: + from urlparse import urlparse +from thrift.transport import TTransport, TSocket, TSSLSocket, THttpClient +from thrift.protocol.TBinaryProtocol import TBinaryProtocol + +from zipkincore import ZipkinCollector +from zipkincore.ttypes import * + +if len(sys.argv) <= 1 or sys.argv[1] == '--help': + print('') + print('Usage: ' + sys.argv[0] + ' [-h host[:port]] [-u url] [-f[ramed]] [-s[sl]] [-novalidate] [-ca_certs certs] [-keyfile keyfile] [-certfile certfile] function [arg1 [arg2...]]') + print('') + print('Functions:') + print(' submitZipkinBatch( spans)') + print('') + sys.exit(0) + +pp = pprint.PrettyPrinter(indent=2) +host = 'localhost' +port = 9090 +uri = '' +framed = False +ssl = False +validate = True +ca_certs = None +keyfile = None +certfile = None +http = False +argi = 1 + +if sys.argv[argi] == '-h': + parts = sys.argv[argi + 1].split(':') + host = parts[0] + if len(parts) > 1: + port = int(parts[1]) + argi += 2 + +if sys.argv[argi] == '-u': + url = urlparse(sys.argv[argi + 1]) + parts = url[1].split(':') + host = parts[0] + if len(parts) > 1: + port = int(parts[1]) + else: + port = 80 + uri = url[2] + if url[4]: + uri += '?%s' % url[4] + http = True + argi += 2 + +if sys.argv[argi] == '-f' or sys.argv[argi] == '-framed': + framed = True + argi += 1 + +if sys.argv[argi] == '-s' or sys.argv[argi] == '-ssl': + ssl = True + argi += 1 + +if sys.argv[argi] == '-novalidate': + validate = False + argi += 1 + +if sys.argv[argi] == '-ca_certs': + ca_certs = sys.argv[argi+1] + argi += 2 + +if sys.argv[argi] == '-keyfile': + keyfile = sys.argv[argi+1] + argi += 2 + +if sys.argv[argi] == '-certfile': + certfile = sys.argv[argi+1] + argi += 2 + +cmd = sys.argv[argi] +args = sys.argv[argi + 1:] + +if http: + transport = THttpClient.THttpClient(host, port, uri) +else: + if ssl: + socket = TSSLSocket.TSSLSocket(host, port, validate=validate, ca_certs=ca_certs, keyfile=keyfile, certfile=certfile) + else: + socket = TSocket.TSocket(host, port) + if framed: + transport = TTransport.TFramedTransport(socket) + else: + transport = TTransport.TBufferedTransport(socket) +protocol = TBinaryProtocol(transport) +client = ZipkinCollector.Client(protocol) +transport.open() + +if cmd == 'submitZipkinBatch': + if len(args) != 1: + print('submitZipkinBatch requires 1 args') + sys.exit(1) + pp.pprint(client.submitZipkinBatch(eval(args[0]),)) + +else: + print('Unrecognized method %s' % cmd) + sys.exit(1) + +transport.close() diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/ZipkinCollector.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/ZipkinCollector.py new file mode 100644 index 0000000000..6167a8e9f1 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/ZipkinCollector.py @@ -0,0 +1,243 @@ +# +# Autogenerated by Thrift Compiler (0.10.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +from thrift.Thrift import TType, TMessageType, TFrozenDict, TException, TApplicationException +from thrift.protocol.TProtocol import TProtocolException +import sys +import logging +from .ttypes import * +from thrift.Thrift import TProcessor +from thrift.transport import TTransport + + +class Iface(object): + def submitZipkinBatch(self, spans): + """ + Parameters: + - spans + """ + pass + + +class Client(Iface): + def __init__(self, iprot, oprot=None): + self._iprot = self._oprot = iprot + if oprot is not None: + self._oprot = oprot + self._seqid = 0 + + def submitZipkinBatch(self, spans): + """ + Parameters: + - spans + """ + self.send_submitZipkinBatch(spans) + return self.recv_submitZipkinBatch() + + def send_submitZipkinBatch(self, spans): + self._oprot.writeMessageBegin('submitZipkinBatch', TMessageType.CALL, self._seqid) + args = submitZipkinBatch_args() + args.spans = spans + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_submitZipkinBatch(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = submitZipkinBatch_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + raise TApplicationException(TApplicationException.MISSING_RESULT, "submitZipkinBatch failed: unknown result") + + +class Processor(Iface, TProcessor): + def __init__(self, handler): + self._handler = handler + self._processMap = {} + self._processMap["submitZipkinBatch"] = Processor.process_submitZipkinBatch + + def process(self, iprot, oprot): + (name, type, seqid) = iprot.readMessageBegin() + if name not in self._processMap: + iprot.skip(TType.STRUCT) + iprot.readMessageEnd() + x = TApplicationException(TApplicationException.UNKNOWN_METHOD, 'Unknown function %s' % (name)) + oprot.writeMessageBegin(name, TMessageType.EXCEPTION, seqid) + x.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + return + else: + self._processMap[name](self, seqid, iprot, oprot) + return True + + def process_submitZipkinBatch(self, seqid, iprot, oprot): + args = submitZipkinBatch_args() + args.read(iprot) + iprot.readMessageEnd() + result = submitZipkinBatch_result() + try: + result.success = self._handler.submitZipkinBatch(args.spans) + msg_type = TMessageType.REPLY + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except Exception as ex: + msg_type = TMessageType.EXCEPTION + logging.exception(ex) + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("submitZipkinBatch", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + +# HELPER FUNCTIONS AND STRUCTURES + + +class submitZipkinBatch_args(object): + """ + Attributes: + - spans + """ + + thrift_spec = ( + None, # 0 + (1, TType.LIST, 'spans', (TType.STRUCT, (Span, Span.thrift_spec), False), None, ), # 1 + ) + + def __init__(self, spans=None,): + self.spans = spans + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.LIST: + self.spans = [] + (_etype17, _size14) = iprot.readListBegin() + for _i18 in range(_size14): + _elem19 = Span() + _elem19.read(iprot) + self.spans.append(_elem19) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('submitZipkinBatch_args') + if self.spans is not None: + oprot.writeFieldBegin('spans', TType.LIST, 1) + oprot.writeListBegin(TType.STRUCT, len(self.spans)) + for iter20 in self.spans: + iter20.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class submitZipkinBatch_result(object): + """ + Attributes: + - success + """ + + thrift_spec = ( + (0, TType.LIST, 'success', (TType.STRUCT, (Response, Response.thrift_spec), False), None, ), # 0 + ) + + def __init__(self, success=None,): + self.success = success + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.LIST: + self.success = [] + (_etype24, _size21) = iprot.readListBegin() + for _i25 in range(_size21): + _elem26 = Response() + _elem26.read(iprot) + self.success.append(_elem26) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('submitZipkinBatch_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.LIST, 0) + oprot.writeListBegin(TType.STRUCT, len(self.success)) + for iter27 in self.success: + iter27.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/__init__.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/__init__.py new file mode 100644 index 0000000000..90e4f9d9c7 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/__init__.py @@ -0,0 +1 @@ +__all__ = ['ttypes', 'constants', 'ZipkinCollector'] diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/constants.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/constants.py new file mode 100644 index 0000000000..d66961b02b --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/constants.py @@ -0,0 +1,28 @@ +# +# Autogenerated by Thrift Compiler (0.10.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +from thrift.Thrift import TType, TMessageType, TFrozenDict, TException, TApplicationException +from thrift.protocol.TProtocol import TProtocolException +import sys +from .ttypes import * +CLIENT_SEND = "cs" +CLIENT_RECV = "cr" +SERVER_SEND = "ss" +SERVER_RECV = "sr" +MESSAGE_SEND = "ms" +MESSAGE_RECV = "mr" +WIRE_SEND = "ws" +WIRE_RECV = "wr" +CLIENT_SEND_FRAGMENT = "csf" +CLIENT_RECV_FRAGMENT = "crf" +SERVER_SEND_FRAGMENT = "ssf" +SERVER_RECV_FRAGMENT = "srf" +LOCAL_COMPONENT = "lc" +CLIENT_ADDR = "ca" +SERVER_ADDR = "sa" +MESSAGE_ADDR = "ma" diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/ttypes.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/ttypes.py new file mode 100644 index 0000000000..251c5a3694 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/zipkincore/ttypes.py @@ -0,0 +1,647 @@ +# +# Autogenerated by Thrift Compiler (0.10.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +from thrift.Thrift import TType, TMessageType, TFrozenDict, TException, TApplicationException +from thrift.protocol.TProtocol import TProtocolException +import sys + +from thrift.transport import TTransport + + +class AnnotationType(object): + BOOL = 0 + BYTES = 1 + I16 = 2 + I32 = 3 + I64 = 4 + DOUBLE = 5 + STRING = 6 + + _VALUES_TO_NAMES = { + 0: "BOOL", + 1: "BYTES", + 2: "I16", + 3: "I32", + 4: "I64", + 5: "DOUBLE", + 6: "STRING", + } + + _NAMES_TO_VALUES = { + "BOOL": 0, + "BYTES": 1, + "I16": 2, + "I32": 3, + "I64": 4, + "DOUBLE": 5, + "STRING": 6, + } + + +class Endpoint(object): + """ + Indicates the network context of a service recording an annotation with two + exceptions. + + When a BinaryAnnotation, and key is CLIENT_ADDR or SERVER_ADDR, + the endpoint indicates the source or destination of an RPC. This exception + allows zipkin to display network context of uninstrumented services, or + clients such as web browsers. + + Attributes: + - ipv4: IPv4 host address packed into 4 bytes. + + Ex for the ip 1.2.3.4, it would be (1 << 24) | (2 << 16) | (3 << 8) | 4 + - port: IPv4 port + + Note: this is to be treated as an unsigned integer, so watch for negatives. + + Conventionally, when the port isn't known, port = 0. + - service_name: Service name in lowercase, such as "memcache" or "zipkin-web" + + Conventionally, when the service name isn't known, service_name = "unknown". + - ipv6: IPv6 host address packed into 16 bytes. Ex Inet6Address.getBytes() + """ + + thrift_spec = ( + None, # 0 + (1, TType.I32, 'ipv4', None, None, ), # 1 + (2, TType.I16, 'port', None, None, ), # 2 + (3, TType.STRING, 'service_name', 'UTF8', None, ), # 3 + (4, TType.STRING, 'ipv6', 'BINARY', None, ), # 4 + ) + + def __init__(self, ipv4=None, port=None, service_name=None, ipv6=None,): + self.ipv4 = ipv4 + self.port = port + self.service_name = service_name + self.ipv6 = ipv6 + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I32: + self.ipv4 = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I16: + self.port = iprot.readI16() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.service_name = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.STRING: + self.ipv6 = iprot.readBinary() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Endpoint') + if self.ipv4 is not None: + oprot.writeFieldBegin('ipv4', TType.I32, 1) + oprot.writeI32(self.ipv4) + oprot.writeFieldEnd() + if self.port is not None: + oprot.writeFieldBegin('port', TType.I16, 2) + oprot.writeI16(self.port) + oprot.writeFieldEnd() + if self.service_name is not None: + oprot.writeFieldBegin('service_name', TType.STRING, 3) + oprot.writeString(self.service_name.encode('utf-8') if sys.version_info[0] == 2 else self.service_name) + oprot.writeFieldEnd() + if self.ipv6 is not None: + oprot.writeFieldBegin('ipv6', TType.STRING, 4) + oprot.writeBinary(self.ipv6) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class Annotation(object): + """ + An annotation is similar to a log statement. It includes a host field which + allows these events to be attributed properly, and also aggregatable. + + Attributes: + - timestamp: Microseconds from epoch. + + This value should use the most precise value possible. For example, + gettimeofday or syncing nanoTime against a tick of currentTimeMillis. + - value + - host: Always the host that recorded the event. By specifying the host you allow + rollup of all events (such as client requests to a service) by IP address. + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'timestamp', None, None, ), # 1 + (2, TType.STRING, 'value', 'UTF8', None, ), # 2 + (3, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 3 + ) + + def __init__(self, timestamp=None, value=None, host=None,): + self.timestamp = timestamp + self.value = value + self.host = host + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.timestamp = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.value = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRUCT: + self.host = Endpoint() + self.host.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Annotation') + if self.timestamp is not None: + oprot.writeFieldBegin('timestamp', TType.I64, 1) + oprot.writeI64(self.timestamp) + oprot.writeFieldEnd() + if self.value is not None: + oprot.writeFieldBegin('value', TType.STRING, 2) + oprot.writeString(self.value.encode('utf-8') if sys.version_info[0] == 2 else self.value) + oprot.writeFieldEnd() + if self.host is not None: + oprot.writeFieldBegin('host', TType.STRUCT, 3) + self.host.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class BinaryAnnotation(object): + """ + Binary annotations are tags applied to a Span to give it context. For + example, a binary annotation of "http.uri" could the path to a resource in a + RPC call. + + Binary annotations of type STRING are always queryable, though more a + historical implementation detail than a structural concern. + + Binary annotations can repeat, and vary on the host. Similar to Annotation, + the host indicates who logged the event. This allows you to tell the + difference between the client and server side of the same key. For example, + the key "http.uri" might be different on the client and server side due to + rewriting, like "/api/v1/myresource" vs "/myresource. Via the host field, + you can see the different points of view, which often help in debugging. + + Attributes: + - key + - value + - annotation_type + - host: The host that recorded tag, which allows you to differentiate between + multiple tags with the same key. There are two exceptions to this. + + When the key is CLIENT_ADDR or SERVER_ADDR, host indicates the source or + destination of an RPC. This exception allows zipkin to display network + context of uninstrumented services, or clients such as web browsers. + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'key', 'UTF8', None, ), # 1 + (2, TType.STRING, 'value', 'BINARY', None, ), # 2 + (3, TType.I32, 'annotation_type', None, None, ), # 3 + (4, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 4 + ) + + def __init__(self, key=None, value=None, annotation_type=None, host=None,): + self.key = key + self.value = value + self.annotation_type = annotation_type + self.host = host + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.key = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.value = iprot.readBinary() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I32: + self.annotation_type = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.STRUCT: + self.host = Endpoint() + self.host.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('BinaryAnnotation') + if self.key is not None: + oprot.writeFieldBegin('key', TType.STRING, 1) + oprot.writeString(self.key.encode('utf-8') if sys.version_info[0] == 2 else self.key) + oprot.writeFieldEnd() + if self.value is not None: + oprot.writeFieldBegin('value', TType.STRING, 2) + oprot.writeBinary(self.value) + oprot.writeFieldEnd() + if self.annotation_type is not None: + oprot.writeFieldBegin('annotation_type', TType.I32, 3) + oprot.writeI32(self.annotation_type) + oprot.writeFieldEnd() + if self.host is not None: + oprot.writeFieldBegin('host', TType.STRUCT, 4) + self.host.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class Span(object): + """ + A trace is a series of spans (often RPC calls) which form a latency tree. + + The root span is where trace_id = id and parent_id = Nil. The root span is + usually the longest interval in the trace, starting with a SERVER_RECV + annotation and ending with a SERVER_SEND. + + Attributes: + - trace_id + - name: Span name in lowercase, rpc method for example + + Conventionally, when the span name isn't known, name = "unknown". + - id + - parent_id + - annotations + - binary_annotations + - debug + - timestamp: Microseconds from epoch of the creation of this span. + + This value should be set directly by instrumentation, using the most + precise value possible. For example, gettimeofday or syncing nanoTime + against a tick of currentTimeMillis. + + For compatibilty with instrumentation that precede this field, collectors + or span stores can derive this via Annotation.timestamp. + For example, SERVER_RECV.timestamp or CLIENT_SEND.timestamp. + + This field is optional for compatibility with old data: first-party span + stores are expected to support this at time of introduction. + - duration: Measurement of duration in microseconds, used to support queries. + + This value should be set directly, where possible. Doing so encourages + precise measurement decoupled from problems of clocks, such as skew or NTP + updates causing time to move backwards. + + For compatibilty with instrumentation that precede this field, collectors + or span stores can derive this by subtracting Annotation.timestamp. + For example, SERVER_SEND.timestamp - SERVER_RECV.timestamp. + + If this field is persisted as unset, zipkin will continue to work, except + duration query support will be implementation-specific. Similarly, setting + this field non-atomically is implementation-specific. + + This field is i64 vs i32 to support spans longer than 35 minutes. + - trace_id_high: Optional unique 8-byte additional identifier for a trace. If non zero, this + means the trace uses 128 bit traceIds instead of 64 bit. + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'trace_id', None, None, ), # 1 + None, # 2 + (3, TType.STRING, 'name', 'UTF8', None, ), # 3 + (4, TType.I64, 'id', None, None, ), # 4 + (5, TType.I64, 'parent_id', None, None, ), # 5 + (6, TType.LIST, 'annotations', (TType.STRUCT, (Annotation, Annotation.thrift_spec), False), None, ), # 6 + None, # 7 + (8, TType.LIST, 'binary_annotations', (TType.STRUCT, (BinaryAnnotation, BinaryAnnotation.thrift_spec), False), None, ), # 8 + (9, TType.BOOL, 'debug', None, False, ), # 9 + (10, TType.I64, 'timestamp', None, None, ), # 10 + (11, TType.I64, 'duration', None, None, ), # 11 + (12, TType.I64, 'trace_id_high', None, None, ), # 12 + ) + + def __init__(self, trace_id=None, name=None, id=None, parent_id=None, annotations=None, binary_annotations=None, debug=thrift_spec[9][4], timestamp=None, duration=None, trace_id_high=None,): + self.trace_id = trace_id + self.name = name + self.id = id + self.parent_id = parent_id + self.annotations = annotations + self.binary_annotations = binary_annotations + self.debug = debug + self.timestamp = timestamp + self.duration = duration + self.trace_id_high = trace_id_high + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.trace_id = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.name = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I64: + self.id = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I64: + self.parent_id = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.LIST: + self.annotations = [] + (_etype3, _size0) = iprot.readListBegin() + for _i4 in range(_size0): + _elem5 = Annotation() + _elem5.read(iprot) + self.annotations.append(_elem5) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 8: + if ftype == TType.LIST: + self.binary_annotations = [] + (_etype9, _size6) = iprot.readListBegin() + for _i10 in range(_size6): + _elem11 = BinaryAnnotation() + _elem11.read(iprot) + self.binary_annotations.append(_elem11) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 9: + if ftype == TType.BOOL: + self.debug = iprot.readBool() + else: + iprot.skip(ftype) + elif fid == 10: + if ftype == TType.I64: + self.timestamp = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 11: + if ftype == TType.I64: + self.duration = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 12: + if ftype == TType.I64: + self.trace_id_high = iprot.readI64() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Span') + if self.trace_id is not None: + oprot.writeFieldBegin('trace_id', TType.I64, 1) + oprot.writeI64(self.trace_id) + oprot.writeFieldEnd() + if self.name is not None: + oprot.writeFieldBegin('name', TType.STRING, 3) + oprot.writeString(self.name.encode('utf-8') if sys.version_info[0] == 2 else self.name) + oprot.writeFieldEnd() + if self.id is not None: + oprot.writeFieldBegin('id', TType.I64, 4) + oprot.writeI64(self.id) + oprot.writeFieldEnd() + if self.parent_id is not None: + oprot.writeFieldBegin('parent_id', TType.I64, 5) + oprot.writeI64(self.parent_id) + oprot.writeFieldEnd() + if self.annotations is not None: + oprot.writeFieldBegin('annotations', TType.LIST, 6) + oprot.writeListBegin(TType.STRUCT, len(self.annotations)) + for iter12 in self.annotations: + iter12.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.binary_annotations is not None: + oprot.writeFieldBegin('binary_annotations', TType.LIST, 8) + oprot.writeListBegin(TType.STRUCT, len(self.binary_annotations)) + for iter13 in self.binary_annotations: + iter13.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.debug is not None: + oprot.writeFieldBegin('debug', TType.BOOL, 9) + oprot.writeBool(self.debug) + oprot.writeFieldEnd() + if self.timestamp is not None: + oprot.writeFieldBegin('timestamp', TType.I64, 10) + oprot.writeI64(self.timestamp) + oprot.writeFieldEnd() + if self.duration is not None: + oprot.writeFieldBegin('duration', TType.I64, 11) + oprot.writeI64(self.duration) + oprot.writeFieldEnd() + if self.trace_id_high is not None: + oprot.writeFieldBegin('trace_id_high', TType.I64, 12) + oprot.writeI64(self.trace_id_high) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + +class Response(object): + """ + Attributes: + - ok + """ + + thrift_spec = ( + None, # 0 + (1, TType.BOOL, 'ok', None, None, ), # 1 + ) + + def __init__(self, ok=None,): + self.ok = ok + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.BOOL: + self.ok = iprot.readBool() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Response') + if self.ok is not None: + oprot.writeFieldBegin('ok', TType.BOOL, 1) + oprot.writeBool(self.ok) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.ok is None: + raise TProtocolException(message='Required field ok is unset!') + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) diff --git a/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/version.py b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/version.py new file mode 100644 index 0000000000..262f246714 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/version.py @@ -0,0 +1,16 @@ +# Copyright 2019, OpenCensus Authors +# Copyright 2019, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.1.dev0" diff --git a/ext/opentelemetry-ext-jaeger/tests/__init__.py b/ext/opentelemetry-ext-jaeger/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/ext/opentelemetry-ext-jaeger/tests/test_jaeger_exporter.py b/ext/opentelemetry-ext-jaeger/tests/test_jaeger_exporter.py new file mode 100644 index 0000000000..3fb14cd354 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/tests/test_jaeger_exporter.py @@ -0,0 +1,254 @@ +# Copyright 2018, OpenCensus Authors +# Copyright 2019, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +# pylint:disable=no-name-in-module +# pylint:disable=import-error +import opentelemetry.ext.jaeger as jaeger_exporter +from opentelemetry import trace as trace_api +from opentelemetry.ext.jaeger.gen.jaeger import ttypes as jaeger +from opentelemetry.sdk import trace + + +class TestJaegerSpanExporter(unittest.TestCase): + def test_constructor_default(self): + service_name = "my-service-name" + host_name = "localhost" + thrift_port = None + agent_port = 6831 + collector_endpoint = "/api/traces?format=jaeger.thrift" + exporter = jaeger_exporter.JaegerSpanExporter(service_name) + + self.assertEqual(exporter.service_name, service_name) + self.assertEqual(exporter.collector_host_name, None) + self.assertEqual(exporter.agent_host_name, host_name) + self.assertEqual(exporter.agent_port, agent_port) + self.assertEqual(exporter.collector_port, thrift_port) + self.assertEqual(exporter.collector_endpoint, collector_endpoint) + self.assertEqual(exporter.username, None) + self.assertEqual(exporter.password, None) + self.assertTrue(exporter.collector is None) + self.assertTrue(exporter.agent_client is not None) + + def test_constructor_explicit(self): + service = "my-opentelemetry-jaeger" + collector_host_name = "opentelemetry.io" + collector_port = 15875 + collector_endpoint = "/myapi/traces?format=jaeger.thrift" + + agent_port = 14268 + agent_host_name = "opentelemetry.com" + + username = "username" + password = "password" + auth = (username, password) + + exporter = jaeger_exporter.JaegerSpanExporter( + service_name=service, + collector_host_name=collector_host_name, + collector_port=collector_port, + collector_endpoint=collector_endpoint, + agent_host_name=agent_host_name, + agent_port=agent_port, + username=username, + password=password, + ) + self.assertEqual(exporter.service_name, service) + self.assertEqual(exporter.agent_host_name, agent_host_name) + self.assertEqual(exporter.agent_port, agent_port) + self.assertEqual(exporter.collector_host_name, collector_host_name) + self.assertEqual(exporter.collector_port, collector_port) + self.assertTrue(exporter.collector is not None) + self.assertEqual(exporter.collector.auth, auth) + # property should not construct new object + collector = exporter.collector + self.assertEqual(exporter.collector, collector) + # property should construct new object + # pylint: disable=protected-access + exporter._collector = None + exporter.username = None + exporter.password = None + self.assertNotEqual(exporter.collector, collector) + self.assertTrue(exporter.collector.auth is None) + + # pylint: disable=too-many-locals + def test_translate_to_jaeger(self): + # pylint: disable=invalid-name + self.maxDiff = None + + span_names = ("test1", "test2", "test3") + trace_id = 0x6E0C63257DE34C926F9EFCD03927272E + trace_id_high = 0x6E0C63257DE34C92 + trace_id_low = 0x6F9EFCD03927272E + span_id = 0x34BF92DEEFC58C92 + parent_id = 0x1111111111111111 + other_id = 0x2222222222222222 + + base_time = 683647322 * 1e9 # in ns + start_times = (base_time, base_time + 150 * 1e6, base_time + 300 * 1e6) + durations = (50 * 1e6, 100 * 1e6, 200 * 1e6) + end_times = ( + start_times[0] + durations[0], + start_times[1] + durations[1], + start_times[2] + durations[2], + ) + + span_context = trace_api.SpanContext(trace_id, span_id) + parent_context = trace_api.SpanContext(trace_id, parent_id) + other_context = trace_api.SpanContext(trace_id, other_id) + + event_attributes = { + "annotation_bool": True, + "annotation_string": "annotation_test", + "key_float": 0.3, + } + + event_timestamp = base_time + 50e6 + event = trace_api.Event( + name="event0", + timestamp=event_timestamp, + attributes=event_attributes, + ) + + link_attributes = {"key_bool": True} + + link = trace_api.Link( + context=other_context, attributes=link_attributes + ) + + otel_spans = [ + trace.Span( + name=span_names[0], + context=span_context, + parent=parent_context, + events=(event,), + links=(link,), + ), + trace.Span( + name=span_names[1], context=parent_context, parent=None + ), + trace.Span(name=span_names[2], context=other_context, parent=None), + ] + + otel_spans[0].start_time = start_times[0] + # added here to preserve order + otel_spans[0].set_attribute("key_bool", False) + otel_spans[0].set_attribute("key_string", "hello_world") + otel_spans[0].set_attribute("key_float", 111.22) + otel_spans[0].end_time = end_times[0] + + otel_spans[1].start_time = start_times[1] + otel_spans[1].end_time = end_times[1] + + otel_spans[2].start_time = start_times[2] + otel_spans[2].end_time = end_times[2] + + # pylint: disable=protected-access + spans = jaeger_exporter._translate_to_jaeger(otel_spans) + + expected_spans = [ + jaeger.Span( + operationName=span_names[0], + traceIdHigh=trace_id_high, + traceIdLow=trace_id_low, + spanId=span_id, + parentSpanId=parent_id, + startTime=start_times[0] / 1e3, + duration=durations[0] / 1e3, + flags=0, + tags=[ + jaeger.Tag( + key="key_bool", vType=jaeger.TagType.BOOL, vBool=False + ), + jaeger.Tag( + key="key_string", + vType=jaeger.TagType.STRING, + vStr="hello_world", + ), + jaeger.Tag( + key="key_float", + vType=jaeger.TagType.DOUBLE, + vDouble=111.22, + ), + ], + references=[ + jaeger.SpanRef( + refType=jaeger.SpanRefType.FOLLOWS_FROM, + traceIdHigh=trace_id_high, + traceIdLow=trace_id_low, + spanId=other_id, + ) + ], + logs=[ + jaeger.Log( + timestamp=event_timestamp / 1e3, + fields=[ + jaeger.Tag( + key="annotation_bool", + vType=jaeger.TagType.BOOL, + vBool=True, + ), + jaeger.Tag( + key="annotation_string", + vType=jaeger.TagType.STRING, + vStr="annotation_test", + ), + jaeger.Tag( + key="key_float", + vType=jaeger.TagType.DOUBLE, + vDouble=0.3, + ), + jaeger.Tag( + key="message", + vType=jaeger.TagType.STRING, + vStr="event0", + ), + ], + ) + ], + ), + jaeger.Span( + operationName=span_names[1], + traceIdHigh=trace_id_high, + traceIdLow=trace_id_low, + spanId=parent_id, + parentSpanId=0, + startTime=int(start_times[1] // 1e3), + duration=int(durations[1] // 1e3), + flags=0, + ), + jaeger.Span( + operationName=span_names[2], + traceIdHigh=trace_id_high, + traceIdLow=trace_id_low, + spanId=other_id, + parentSpanId=0, + startTime=int(start_times[2] // 1e3), + duration=int(durations[2] // 1e3), + flags=0, + ), + ] + + # events are complicated to compare because order of fields + # (attributes) is otel is not important but in jeager it is + self.assertCountEqual( + spans[0].logs[0].fields, expected_spans[0].logs[0].fields + ) + # get rid of fields to be able to compare the whole spans + spans[0].logs[0].fields = None + expected_spans[0].logs[0].fields = None + + self.assertEqual(spans, expected_spans) diff --git a/ext/opentelemetry-ext-jaeger/thrift/agent.thrift b/ext/opentelemetry-ext-jaeger/thrift/agent.thrift new file mode 100644 index 0000000000..5d3c9201b6 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/thrift/agent.thrift @@ -0,0 +1,27 @@ +# Copyright (c) 2016 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +include "jaeger.thrift" +include "zipkincore.thrift" + +namespace cpp jaegertracing.agent.thrift +namespace java io.jaegertracing.agent.thrift +namespace php Jaeger.Thrift.Agent +namespace netcore Jaeger.Thrift.Agent +namespace lua jaeger.thrift.agent + +service Agent { + oneway void emitZipkinBatch(1: list spans) + oneway void emitBatch(1: jaeger.Batch batch) +} diff --git a/ext/opentelemetry-ext-jaeger/thrift/jaeger.thrift b/ext/opentelemetry-ext-jaeger/thrift/jaeger.thrift new file mode 100644 index 0000000000..ae9fcaa014 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/thrift/jaeger.thrift @@ -0,0 +1,85 @@ +# Copyright (c) 2016 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +namespace cpp jaegertracing.thrift +namespace java io.jaegertracing.thriftjava +namespace php Jaeger.Thrift +namespace netcore Jaeger.Thrift +namespace lua jaeger.thrift + +# TagType denotes the type of a Tag's value. +enum TagType { STRING, DOUBLE, BOOL, LONG, BINARY } + +# Tag is a basic strongly typed key/value pair. It has been flattened to reduce the use of pointers in golang +struct Tag { + 1: required string key + 2: required TagType vType + 3: optional string vStr + 4: optional double vDouble + 5: optional bool vBool + 6: optional i64 vLong + 7: optional binary vBinary +} + +# Log is a timed even with an arbitrary set of tags. +struct Log { + 1: required i64 timestamp + 2: required list fields +} + +enum SpanRefType { CHILD_OF, FOLLOWS_FROM } + +# SpanRef describes causal relationship of the current span to another span (e.g. 'child-of') +struct SpanRef { + 1: required SpanRefType refType + 2: required i64 traceIdLow + 3: required i64 traceIdHigh + 4: required i64 spanId +} + +# Span represents a named unit of work performed by a service. +struct Span { + 1: required i64 traceIdLow # the least significant 64 bits of a traceID + 2: required i64 traceIdHigh # the most significant 64 bits of a traceID; 0 when only 64bit IDs are used + 3: required i64 spanId # unique span id (only unique within a given trace) + 4: required i64 parentSpanId # since nearly all spans will have parents spans, CHILD_OF refs do not have to be explicit + 5: required string operationName + 6: optional list references # causal references to other spans + 7: required i32 flags # a bit field used to propagate sampling decisions. 1 signifies a SAMPLED span, 2 signifies a DEBUG span. + 8: required i64 startTime + 9: required i64 duration + 10: optional list tags + 11: optional list logs +} + +# Process describes the traced process/service that emits spans. +struct Process { + 1: required string serviceName + 2: optional list tags +} + +# Batch is a collection of spans reported out of process. +struct Batch { + 1: required Process process + 2: required list spans +} + +# BatchSubmitResponse is the response on submitting a batch. +struct BatchSubmitResponse { + 1: required bool ok # The Collector's client is expected to only log (or emit a counter) when not ok equals false +} + +service Collector { + list submitBatches(1: list batches) +} diff --git a/ext/opentelemetry-ext-jaeger/thrift/zipkincore.thrift b/ext/opentelemetry-ext-jaeger/thrift/zipkincore.thrift new file mode 100644 index 0000000000..d5259e78b9 --- /dev/null +++ b/ext/opentelemetry-ext-jaeger/thrift/zipkincore.thrift @@ -0,0 +1,346 @@ +# Copyright 2012 Twitter Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +namespace cpp twitter.zipkin.thrift +namespace java com.twitter.zipkin.thriftjava +#@namespace scala com.twitter.zipkin.thriftscala +namespace rb Zipkin +namespace php Jaeger.Thrift.Agent.Zipkin +namespace netcore Jaeger.Thrift.Agent.Zipkin +namespace lua jaeger.thrift.agent + + +#************** Annotation.value ************** +/** + * The client sent ("cs") a request to a server. There is only one send per + * span. For example, if there's a transport error, each attempt can be logged + * as a WIRE_SEND annotation. + * + * If chunking is involved, each chunk could be logged as a separate + * CLIENT_SEND_FRAGMENT in the same span. + * + * Annotation.host is not the server. It is the host which logged the send + * event, almost always the client. When logging CLIENT_SEND, instrumentation + * should also log the SERVER_ADDR. + */ +const string CLIENT_SEND = "cs" +/** + * The client received ("cr") a response from a server. There is only one + * receive per span. For example, if duplicate responses were received, each + * can be logged as a WIRE_RECV annotation. + * + * If chunking is involved, each chunk could be logged as a separate + * CLIENT_RECV_FRAGMENT in the same span. + * + * Annotation.host is not the server. It is the host which logged the receive + * event, almost always the client. The actual endpoint of the server is + * recorded separately as SERVER_ADDR when CLIENT_SEND is logged. + */ +const string CLIENT_RECV = "cr" +/** + * The server sent ("ss") a response to a client. There is only one response + * per span. If there's a transport error, each attempt can be logged as a + * WIRE_SEND annotation. + * + * Typically, a trace ends with a server send, so the last timestamp of a trace + * is often the timestamp of the root span's server send. + * + * If chunking is involved, each chunk could be logged as a separate + * SERVER_SEND_FRAGMENT in the same span. + * + * Annotation.host is not the client. It is the host which logged the send + * event, almost always the server. The actual endpoint of the client is + * recorded separately as CLIENT_ADDR when SERVER_RECV is logged. + */ +const string SERVER_SEND = "ss" +/** + * The server received ("sr") a request from a client. There is only one + * request per span. For example, if duplicate responses were received, each + * can be logged as a WIRE_RECV annotation. + * + * Typically, a trace starts with a server receive, so the first timestamp of a + * trace is often the timestamp of the root span's server receive. + * + * If chunking is involved, each chunk could be logged as a separate + * SERVER_RECV_FRAGMENT in the same span. + * + * Annotation.host is not the client. It is the host which logged the receive + * event, almost always the server. When logging SERVER_RECV, instrumentation + * should also log the CLIENT_ADDR. + */ +const string SERVER_RECV = "sr" +/** + * Message send ("ms") is a request to send a message to a destination, usually + * a broker. This may be the only annotation in a messaging span. If WIRE_SEND + * exists in the same span, it follows this moment and clarifies delays sending + * the message, such as batching. + * + * Unlike RPC annotations like CLIENT_SEND, messaging spans never share a span + * ID. For example, "ms" should always be the parent of "mr". + * + * Annotation.host is not the destination, it is the host which logged the send + * event: the producer. When annotating MESSAGE_SEND, instrumentation should + * also tag the MESSAGE_ADDR. + */ +const string MESSAGE_SEND = "ms" +/** + * A consumer received ("mr") a message from a broker. This may be the only + * annotation in a messaging span. If WIRE_RECV exists in the same span, it + * precedes this moment and clarifies any local queuing delay. + * + * Unlike RPC annotations like SERVER_RECV, messaging spans never share a span + * ID. For example, "mr" should always be a child of "ms" unless it is a root + * span. + * + * Annotation.host is not the broker, it is the host which logged the receive + * event: the consumer. When annotating MESSAGE_RECV, instrumentation should + * also tag the MESSAGE_ADDR. + */ +const string MESSAGE_RECV = "mr" +/** + * Optionally logs an attempt to send a message on the wire. Multiple wire send + * events could indicate network retries. A lag between client or server send + * and wire send might indicate queuing or processing delay. + */ +const string WIRE_SEND = "ws" +/** + * Optionally logs an attempt to receive a message from the wire. Multiple wire + * receive events could indicate network retries. A lag between wire receive + * and client or server receive might indicate queuing or processing delay. + */ +const string WIRE_RECV = "wr" +/** + * Optionally logs progress of a (CLIENT_SEND, WIRE_SEND). For example, this + * could be one chunk in a chunked request. + */ +const string CLIENT_SEND_FRAGMENT = "csf" +/** + * Optionally logs progress of a (CLIENT_RECV, WIRE_RECV). For example, this + * could be one chunk in a chunked response. + */ +const string CLIENT_RECV_FRAGMENT = "crf" +/** + * Optionally logs progress of a (SERVER_SEND, WIRE_SEND). For example, this + * could be one chunk in a chunked response. + */ +const string SERVER_SEND_FRAGMENT = "ssf" +/** + * Optionally logs progress of a (SERVER_RECV, WIRE_RECV). For example, this + * could be one chunk in a chunked request. + */ +const string SERVER_RECV_FRAGMENT = "srf" + +#***** BinaryAnnotation.key ****** +/** + * The value of "lc" is the component or namespace of a local span. + * + * BinaryAnnotation.host adds service context needed to support queries. + * + * Local Component("lc") supports three key features: flagging, query by + * service and filtering Span.name by namespace. + * + * While structurally the same, local spans are fundamentally different than + * RPC spans in how they should be interpreted. For example, zipkin v1 tools + * center on RPC latency and service graphs. Root local-spans are neither + * indicative of critical path RPC latency, nor have impact on the shape of a + * service graph. By flagging with "lc", tools can special-case local spans. + * + * Zipkin v1 Spans are unqueryable unless they can be indexed by service name. + * The only path to a service name is by (Binary)?Annotation.host.serviceName. + * By logging "lc", a local span can be queried even if no other annotations + * are logged. + * + * The value of "lc" is the namespace of Span.name. For example, it might be + * "finatra2", for a span named "bootstrap". "lc" allows you to resolves + * conflicts for the same Span.name, for example "finatra/bootstrap" vs + * "finch/bootstrap". Using local component, you'd search for spans named + * "bootstrap" where "lc=finch" + */ +const string LOCAL_COMPONENT = "lc" + +#***** BinaryAnnotation.key where value = [1] and annotation_type = BOOL ****** +/** + * Indicates a client address ("ca") in a span. Most likely, there's only one. + * Multiple addresses are possible when a client changes its ip or port within + * a span. + */ +const string CLIENT_ADDR = "ca" +/** + * Indicates a server address ("sa") in a span. Most likely, there's only one. + * Multiple addresses are possible when a client is redirected, or fails to a + * different server ip or port. + */ +const string SERVER_ADDR = "sa" +/** + * Indicates the remote address of a messaging span, usually the broker. + */ +const string MESSAGE_ADDR = "ma" + +/** + * Indicates the network context of a service recording an annotation with two + * exceptions. + * + * When a BinaryAnnotation, and key is CLIENT_ADDR or SERVER_ADDR, + * the endpoint indicates the source or destination of an RPC. This exception + * allows zipkin to display network context of uninstrumented services, or + * clients such as web browsers. + */ +struct Endpoint { + /** + * IPv4 host address packed into 4 bytes. + * + * Ex for the ip 1.2.3.4, it would be (1 << 24) | (2 << 16) | (3 << 8) | 4 + */ + 1: i32 ipv4 + /** + * IPv4 port + * + * Note: this is to be treated as an unsigned integer, so watch for negatives. + * + * Conventionally, when the port isn't known, port = 0. + */ + 2: i16 port + /** + * Service name in lowercase, such as "memcache" or "zipkin-web" + * + * Conventionally, when the service name isn't known, service_name = "unknown". + */ + 3: string service_name + /** + * IPv6 host address packed into 16 bytes. Ex Inet6Address.getBytes() + */ + 4: optional binary ipv6 +} + +/** + * An annotation is similar to a log statement. It includes a host field which + * allows these events to be attributed properly, and also aggregatable. + */ +struct Annotation { + /** + * Microseconds from epoch. + * + * This value should use the most precise value possible. For example, + * gettimeofday or syncing nanoTime against a tick of currentTimeMillis. + */ + 1: i64 timestamp + 2: string value // what happened at the timestamp? + /** + * Always the host that recorded the event. By specifying the host you allow + * rollup of all events (such as client requests to a service) by IP address. + */ + 3: optional Endpoint host + // don't reuse 4: optional i32 OBSOLETE_duration // how long did the operation take? microseconds +} + +enum AnnotationType { BOOL, BYTES, I16, I32, I64, DOUBLE, STRING } + +/** + * Binary annotations are tags applied to a Span to give it context. For + * example, a binary annotation of "http.uri" could the path to a resource in a + * RPC call. + * + * Binary annotations of type STRING are always queryable, though more a + * historical implementation detail than a structural concern. + * + * Binary annotations can repeat, and vary on the host. Similar to Annotation, + * the host indicates who logged the event. This allows you to tell the + * difference between the client and server side of the same key. For example, + * the key "http.uri" might be different on the client and server side due to + * rewriting, like "/api/v1/myresource" vs "/myresource. Via the host field, + * you can see the different points of view, which often help in debugging. + */ +struct BinaryAnnotation { + 1: string key, + 2: binary value, + 3: AnnotationType annotation_type, + /** + * The host that recorded tag, which allows you to differentiate between + * multiple tags with the same key. There are two exceptions to this. + * + * When the key is CLIENT_ADDR or SERVER_ADDR, host indicates the source or + * destination of an RPC. This exception allows zipkin to display network + * context of uninstrumented services, or clients such as web browsers. + */ + 4: optional Endpoint host +} + +/** + * A trace is a series of spans (often RPC calls) which form a latency tree. + * + * The root span is where trace_id = id and parent_id = Nil. The root span is + * usually the longest interval in the trace, starting with a SERVER_RECV + * annotation and ending with a SERVER_SEND. + */ +struct Span { + 1: i64 trace_id # unique trace id, use for all spans in trace + /** + * Span name in lowercase, rpc method for example + * + * Conventionally, when the span name isn't known, name = "unknown". + */ + 3: string name, + 4: i64 id, # unique span id, only used for this span + 5: optional i64 parent_id, # parent span id + 6: list annotations, # all annotations/events that occured, sorted by timestamp + 8: list binary_annotations # any binary annotations + 9: optional bool debug = 0 # if true, we DEMAND that this span passes all samplers + /** + * Microseconds from epoch of the creation of this span. + * + * This value should be set directly by instrumentation, using the most + * precise value possible. For example, gettimeofday or syncing nanoTime + * against a tick of currentTimeMillis. + * + * For compatibilty with instrumentation that precede this field, collectors + * or span stores can derive this via Annotation.timestamp. + * For example, SERVER_RECV.timestamp or CLIENT_SEND.timestamp. + * + * This field is optional for compatibility with old data: first-party span + * stores are expected to support this at time of introduction. + */ + 10: optional i64 timestamp, + /** + * Measurement of duration in microseconds, used to support queries. + * + * This value should be set directly, where possible. Doing so encourages + * precise measurement decoupled from problems of clocks, such as skew or NTP + * updates causing time to move backwards. + * + * For compatibilty with instrumentation that precede this field, collectors + * or span stores can derive this by subtracting Annotation.timestamp. + * For example, SERVER_SEND.timestamp - SERVER_RECV.timestamp. + * + * If this field is persisted as unset, zipkin will continue to work, except + * duration query support will be implementation-specific. Similarly, setting + * this field non-atomically is implementation-specific. + * + * This field is i64 vs i32 to support spans longer than 35 minutes. + */ + 11: optional i64 duration + /** + * Optional unique 8-byte additional identifier for a trace. If non zero, this + * means the trace uses 128 bit traceIds instead of 64 bit. + */ + 12: optional i64 trace_id_high +} + +# define TChannel service + +struct Response { + 1: required bool ok +} + +service ZipkinCollector { + list submitZipkinBatch(1: list spans) +} diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/util.py b/opentelemetry-sdk/src/opentelemetry/sdk/util.py index ede52e8307..c047435370 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/util.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/util.py @@ -62,7 +62,7 @@ def __len__(self): def __iter__(self): with self._lock: - return iter(self._dq.copy()) + return iter(deque(self._dq)) def append(self, item): with self._lock: diff --git a/pyproject.toml b/pyproject.toml index a8f43fefdf..eff7e2e3ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,2 +1,21 @@ [tool.black] line-length = 79 +exclude = ''' +( + /( + \.eggs # exclude a few common directories in the + | \.git # root of the project + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist + | ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen # generated files + )/ + | foo.py # also separately exclude a file named foo.py in + # the root of the project +) +''' diff --git a/tox.ini b/tox.ini index f8d2f55d28..0db2364f19 100644 --- a/tox.ini +++ b/tox.ini @@ -2,8 +2,8 @@ skipsdist = True skip_missing_interpreters = True envlist = - py3{4,5,6,7,8}-test-{api,sdk,example-app,ext-wsgi,ext-http-requests} - pypy3-test-{api,sdk,example-app,ext-wsgi,ext-http-requests} + py3{4,5,6,7,8}-test-{api,sdk,example-app,ext-wsgi,ext-http-requests,ext-jaeger} + pypy3-test-{api,sdk,example-app,ext-wsgi,ext-http-requests,ext-jaeger} lint py37-{mypy,mypyinstalled} docs @@ -23,6 +23,7 @@ changedir = test-api: opentelemetry-api/tests test-sdk: opentelemetry-sdk/tests test-ext-http-requests: ext/opentelemetry-ext-http-requests/tests + test-ext-jaeger: ext/opentelemetry-ext-jaeger/tests test-ext-wsgi: ext/opentelemetry-ext-wsgi/tests test-example-app: examples/opentelemetry-example-app/tests @@ -38,6 +39,8 @@ commands_pre = ext: pip install {toxinidir}/opentelemetry-api wsgi: pip install {toxinidir}/ext/opentelemetry-ext-wsgi http-requests: pip install {toxinidir}/ext/opentelemetry-ext-http-requests + jaeger: pip install {toxinidir}/opentelemetry-sdk + jaeger: pip install {toxinidir}/ext/opentelemetry-ext-jaeger ; Using file:// here because otherwise tox invokes just "pip install ; opentelemetry-api", leading to an error @@ -68,6 +71,7 @@ commands_pre = pip install -e {toxinidir}/opentelemetry-sdk pip install -e {toxinidir}/ext/opentelemetry-ext-azure-monitor pip install -e {toxinidir}/ext/opentelemetry-ext-http-requests + pip install -e {toxinidir}/ext/opentelemetry-ext-jaeger pip install -e {toxinidir}/ext/opentelemetry-ext-wsgi pip install -e {toxinidir}/examples/opentelemetry-example-app @@ -84,6 +88,8 @@ commands = ext/opentelemetry-ext-azure-monitor/tests/ \ ext/opentelemetry-ext-http-requests/src/ \ ext/opentelemetry-ext-http-requests/tests/ \ + ext/opentelemetry-ext-jaeger/src/opentelemetry \ + ext/opentelemetry-ext-jaeger/tests/ \ ext/opentelemetry-ext-wsgi/tests/ \ examples/opentelemetry-example-app/src/opentelemetry_example_app/ \ examples/opentelemetry-example-app/tests/