Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export resources from Jaeger #348

Merged
merged 7 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions exporters/jaeger/lib/opentelemetry/exporters/jaeger/exporter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
require 'agent'
require 'opentelemetry/sdk'
require 'socket'
require 'opentelemetry/exporters/jaeger/exporter/encoding_utils'
require 'opentelemetry/exporters/jaeger/exporter/span_encoder'

module OpenTelemetry
module Exporters
module Jaeger
# An OpenTelemetry trace exporter that sends spans over UDP as Thrift Compact encoded Jaeger spans.
class Exporter
include EncodingUtils

SUCCESS = OpenTelemetry::SDK::Trace::Export::SUCCESS
FAILURE = OpenTelemetry::SDK::Trace::Export::FAILURE
private_constant(:SUCCESS, :FAILURE)
Expand Down Expand Up @@ -70,12 +73,20 @@ def batcher
# and the remaining batches will be discarded. Returns SUCCESS after all batches
# have been successfully yielded.
def encoded_batches(span_data)
encoded_spans = span_data.map(&method(:encoded_span))
encoded_span_sizes = encoded_spans.map(&method(:encoded_span_size))
return FAILURE if encoded_span_sizes.any? { |size| size > @max_packet_size }
grouped_encoded_spans = \
span_data.each_with_object(Hash.new { |h, k| h[k] = [] }) do |span, memo|
encoded_data = encoded_span(span)
encoded_size = encoded_span_size(encoded_data)
return FAILURE if encoded_size > @max_packet_size

memo[span.resource] << [encoded_data, encoded_size]
end

encoded_spans.zip(encoded_span_sizes).chunk(&batcher).each do |batch_and_spans_with_size|
yield Thrift::Batch.new('process' => encoded_process, 'spans' => batch_and_spans_with_size.last.map(&:first))
grouped_encoded_spans.each_pair do |resource, encoded_spans|
process = encoded_process(resource)
encoded_spans.chunk(&batcher).each do |batch_and_spans_with_size|
yield Thrift::Batch.new('process' => process, 'spans' => batch_and_spans_with_size.last.map(&:first))
end
end
SUCCESS
end
Expand Down Expand Up @@ -122,14 +133,9 @@ def encoded_span_size(encoded_span)
@transport.size
end

def encoded_process
@encoded_process ||= begin
tags = [] # TODO: figure this out.
# tags = OpenTelemetry.tracer.resource.label_enumerator.map do |key, value|
# Thrift::Tag.new('key' => key, 'vType' => Thrift::TagType::STRING, 'vStr' => value)
# end
Thrift::Process.new('serviceName' => @service_name, 'tags' => tags)
end
def encoded_process(resource)
tags = resource ? encoded_tags(resource.label_enumerator) : EMPTY_ARRAY
Thrift::Process.new('serviceName' => @service_name, 'tags' => tags)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# frozen_string_literal: true

# Copyright 2020 OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module Exporters
module Jaeger
class Exporter
# @api private
module EncodingUtils
def encoded_tags(attributes)
attributes&.map do |key, value|
encoded_tag(key, value)
end || EMPTY_ARRAY
end

def encoded_tag(key, value)
@type_map ||= {
LONG => Thrift::TagType::LONG,
DOUBLE => Thrift::TagType::DOUBLE,
STRING => Thrift::TagType::STRING,
BOOL => Thrift::TagType::BOOL
}.freeze

value_key = case value
when Integer then LONG
when Float then DOUBLE
when String, Array then STRING
when false, true then BOOL
end
value = value.to_json if value.is_a?(Array)
Thrift::Tag.new(
KEY => key,
TYPE => @type_map[value_key],
value_key => value
)
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ module Jaeger
class Exporter
# @api private
class SpanEncoder
include EncodingUtils

def encoded_span(span_data) # rubocop:disable Metrics/AbcSize
start_time = (span_data.start_timestamp.to_f * 1_000_000).to_i
duration = (span_data.end_timestamp.to_f * 1_000_000).to_i - start_time
Expand Down Expand Up @@ -90,34 +92,6 @@ def encoded_instrumentation_library(instrumentation_library)
tags
end

def encoded_tags(attributes)
attributes&.map do |key, value|
encoded_tag(key, value)
end || EMPTY_ARRAY
end

def encoded_tag(key, value)
@type_map ||= {
LONG => Thrift::TagType::LONG,
DOUBLE => Thrift::TagType::DOUBLE,
STRING => Thrift::TagType::STRING,
BOOL => Thrift::TagType::BOOL
}.freeze

value_key = case value
when Integer then LONG
when Float then DOUBLE
when String, Array then STRING
when false, true then BOOL
end
value = value.to_json if value.is_a?(Array)
Thrift::Tag.new(
KEY => key,
TYPE => @type_map[value_key],
value_key => value
)
end

def int64(byte_string)
int = byte_string.unpack1('Q>')
int < (1 << 63) ? int : int - (1 << 64)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,38 @@
packet1 = socket.recvfrom(65_000)
packet2 = socket.recvfrom(65_000)
socket.close

_(result).must_equal(SUCCESS)
_(packet1.size).must_be :<=, 128
_(packet2.size).must_be :<=, 128
end

it 'batches per resource' do
socket = UDPSocket.new
socket.bind('127.0.0.1', 0)
exporter = OpenTelemetry::Exporters::Jaeger::Exporter.new(service_name: 'test', host: '127.0.0.1', port: socket.addr[1], max_packet_size: 128)

span_data1 = create_span_data(resource: OpenTelemetry::SDK::Resources::Resource.create('k1' => 'v1'))
span_data2 = create_span_data(resource: OpenTelemetry::SDK::Resources::Resource.create('k2' => 'v2'))

result = exporter.export([span_data1, span_data2])
packet1 = socket.recvfrom(65_000)
packet2 = socket.recvfrom(65_000)
socket.close

_(result).must_equal(SUCCESS)
_(packet1).wont_be_nil
_(packet2).wont_be_nil
end
end

def create_span_data(name: '', kind: nil, status: nil, parent_span_id: OpenTelemetry::Trace::INVALID_SPAN_ID, child_count: 0,
total_recorded_attributes: 0, total_recorded_events: 0, total_recorded_links: 0, start_timestamp: Time.now,
end_timestamp: Time.now, attributes: nil, links: nil, events: nil, library_resource: nil, instrumentation_library: nil,
end_timestamp: Time.now, attributes: nil, links: nil, events: nil, resource: nil, instrumentation_library: nil,
span_id: OpenTelemetry::Trace.generate_span_id, trace_id: OpenTelemetry::Trace.generate_trace_id,
trace_flags: OpenTelemetry::Trace::TraceFlags::DEFAULT, tracestate: nil)
OpenTelemetry::SDK::Trace::SpanData.new(name, kind, status, parent_span_id, child_count, total_recorded_attributes,
total_recorded_events, total_recorded_links, start_timestamp, end_timestamp,
attributes, links, events, library_resource, instrumentation_library, span_id, trace_id, trace_flags, tracestate)
attributes, links, events, resource, instrumentation_library, span_id, trace_id, trace_flags, tracestate)
end
end
8 changes: 4 additions & 4 deletions sdk/lib/opentelemetry/sdk/trace/span.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ module Trace
class Span < OpenTelemetry::Trace::Span
# The following readers are intended for the use of SpanProcessors and
# should not be considered part of the public interface for instrumentation.
attr_reader :name, :status, :kind, :parent_span_id, :start_timestamp, :end_timestamp, :links, :library_resource, :instrumentation_library
attr_reader :name, :status, :kind, :parent_span_id, :start_timestamp, :end_timestamp, :links, :resource, :instrumentation_library

# Return a frozen copy of the current attributes. This is intended for
# use of SpanProcesses and should not be considered part of the public
Expand Down Expand Up @@ -227,7 +227,7 @@ def to_span_data
@attributes,
@links,
@events,
@library_resource,
@resource,
@instrumentation_library,
context.span_id,
context.trace_id,
Expand All @@ -237,15 +237,15 @@ def to_span_data
end

# @api private
def initialize(context, name, kind, parent_span_id, trace_config, span_processor, attributes, links, start_timestamp, library_resource, instrumentation_library) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
def initialize(context, name, kind, parent_span_id, trace_config, span_processor, attributes, links, start_timestamp, resource, instrumentation_library) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
super(span_context: context)
@mutex = Mutex.new
@name = name
@kind = kind
@parent_span_id = parent_span_id.freeze || OpenTelemetry::Trace::INVALID_SPAN_ID
@trace_config = trace_config
@span_processor = span_processor
@library_resource = library_resource
@resource = resource
@instrumentation_library = instrumentation_library
@ended = false
@status = nil
Expand Down
2 changes: 1 addition & 1 deletion sdk/lib/opentelemetry/sdk/trace/span_data.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module Trace
:attributes,
:links,
:events,
:library_resource,
:resource,
:instrumentation_library,
:span_id,
:trace_id,
Expand Down