Skip to content

Commit

Permalink
wrap EmitBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
EGHornbostel committed Jul 11, 2019
1 parent fa5fbd8 commit 9aaa138
Showing 1 changed file with 1 addition and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
// <copyright file="JaegerUdpBatcher.cs" company="OpenTelemetry Authors">// Copyright 2018, OpenTelemetry Authors//// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.// </copyright>namespace OpenTelemetry.Exporter.Jaeger.Implimentation{ using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks;#if NET46 using Thrift.Protocol;#else using Thrift.Protocols;#endif public class JaegerUdpBatcher : IDisposable { private const int DefaultMaxPacketSize = 65000; private readonly int? maxPacketSize; private readonly TCompactProtocol.Factory protocolFactory; private readonly JaegerThriftClientTransport clientTransport; private readonly JaegerThriftClient thriftClient; private readonly Process process; private readonly int processByteSize; private readonly List<JaegerSpan> currentBatch = new List<JaegerSpan>(); private int batchByteSize; private bool disposedValue = false; // To detect redundant calls public JaegerUdpBatcher(JaegerExporterOptions options) { this.maxPacketSize = options.MaxPacketSize == 0 ? DefaultMaxPacketSize : options.MaxPacketSize; this.protocolFactory = new TCompactProtocol.Factory(); this.clientTransport = new JaegerThriftClientTransport(options.AgentHost, options.AgentPort.Value); this.thriftClient = new JaegerThriftClient(this.protocolFactory.GetProtocol(this.clientTransport)); this.process = new Process(options.ServiceName); this.processByteSize = this.GetSize(this.process); this.batchByteSize = this.processByteSize; } public async Task<int> AppendAsync(JaegerSpan span, CancellationToken cancellationToken) { int spanSize = this.GetSize(span); if (spanSize > this.maxPacketSize) { throw new JaegerExporterException($"ThriftSender received a span that was too large, size = {spanSize}, max = {this.maxPacketSize}", null); } this.batchByteSize += spanSize; if (this.batchByteSize <= this.maxPacketSize) { this.currentBatch.Add(span); if (this.batchByteSize < this.maxPacketSize) { return 0; } return await this.FlushAsync(cancellationToken).ConfigureAwait(false); } int n; try { n = await this.FlushAsync(cancellationToken).ConfigureAwait(false); } catch (JaegerExporterException ex) { // +1 for the span not submitted in the buffer above throw new JaegerExporterException(ex.Message, ex); } this.currentBatch.Add(span); this.batchByteSize = this.processByteSize + spanSize; return n; } public async Task<int> FlushAsync(CancellationToken cancellationToken) { if (this.currentBatch.Count == 0) { return 0; } int n = this.currentBatch.Count; try { await this.SendAsync(this.process, this.currentBatch, cancellationToken).ConfigureAwait(false); } catch (JaegerExporterException ex) { throw new JaegerExporterException("Failed to flush spans.", ex); } finally { this.currentBatch.Clear(); this.batchByteSize = this.processByteSize; } return n; } public virtual Task<int> CloseAsync(CancellationToken cancellationToken) { return this.FlushAsync(cancellationToken); } public void Dispose() { // Do not change this code. Put cleanup code in Dispose(bool disposing). this.Dispose(true); }#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously protected async Task SendAsync(Process process, List<JaegerSpan> spans, CancellationToken cancellationToken)#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously { try { var batch = new Batch(process, spans);#if NET46 this.thriftClient.EmitBatch(batch);#else await this.thriftClient.EmitBatchAsync(batch, cancellationToken).ConfigureAwait(false);#endif } catch (Exception ex) { throw new JaegerExporterException($"Could not send {spans.Count} spans", ex); } } protected virtual void Dispose(bool disposing) { if (!this.disposedValue) { if (disposing) { this.thriftClient.Dispose(); this.clientTransport.Dispose(); } this.disposedValue = true; } } private int GetSize(TAbstractBase thriftBase) { using (var memoryTransport = new InMemoryTransport()) {#if NET46 thriftBase.Write(this.protocolFactory.GetProtocol(memoryTransport));#else thriftBase.WriteAsync(this.protocolFactory.GetProtocol(memoryTransport), CancellationToken.None).GetAwaiter().GetResult();#endif return memoryTransport.GetBuffer().Length; } } }}
// <copyright file="JaegerUdpBatcher.cs" company="OpenTelemetry Authors">// Copyright 2018, OpenTelemetry Authors//// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.// </copyright>namespace OpenTelemetry.Exporter.Jaeger.Implimentation{ using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks;#if NET46 using Thrift.Protocol;#else using Thrift.Protocols;#endif public class JaegerUdpBatcher : IDisposable { private const int DefaultMaxPacketSize = 65000; private readonly int? maxPacketSize; private readonly TCompactProtocol.Factory protocolFactory; private readonly JaegerThriftClientTransport clientTransport; private readonly JaegerThriftClient thriftClient; private readonly Process process; private readonly int processByteSize; private readonly List<JaegerSpan> currentBatch = new List<JaegerSpan>(); private int batchByteSize; private bool disposedValue = false; // To detect redundant calls public JaegerUdpBatcher(JaegerExporterOptions options) { this.maxPacketSize = options.MaxPacketSize == 0 ? DefaultMaxPacketSize : options.MaxPacketSize; this.protocolFactory = new TCompactProtocol.Factory(); this.clientTransport = new JaegerThriftClientTransport(options.AgentHost, options.AgentPort.Value); this.thriftClient = new JaegerThriftClient(this.protocolFactory.GetProtocol(this.clientTransport)); this.process = new Process(options.ServiceName); this.processByteSize = this.GetSize(this.process); this.batchByteSize = this.processByteSize; } public async Task<int> AppendAsync(JaegerSpan span, CancellationToken cancellationToken) { int spanSize = this.GetSize(span); if (spanSize > this.maxPacketSize) { throw new JaegerExporterException($"ThriftSender received a span that was too large, size = {spanSize}, max = {this.maxPacketSize}", null); } this.batchByteSize += spanSize; if (this.batchByteSize <= this.maxPacketSize) { this.currentBatch.Add(span); if (this.batchByteSize < this.maxPacketSize) { return 0; } return await this.FlushAsync(cancellationToken).ConfigureAwait(false); } int n; try { n = await this.FlushAsync(cancellationToken).ConfigureAwait(false); } catch (JaegerExporterException ex) { // +1 for the span not submitted in the buffer above throw new JaegerExporterException(ex.Message, ex); } this.currentBatch.Add(span); this.batchByteSize = this.processByteSize + spanSize; return n; } public async Task<int> FlushAsync(CancellationToken cancellationToken) { if (this.currentBatch.Count == 0) { return 0; } int n = this.currentBatch.Count; try { await this.SendAsync(this.process, this.currentBatch, cancellationToken).ConfigureAwait(false); } catch (JaegerExporterException ex) { throw new JaegerExporterException("Failed to flush spans.", ex); } finally { this.currentBatch.Clear(); this.batchByteSize = this.processByteSize; } return n; } public virtual Task<int> CloseAsync(CancellationToken cancellationToken) { return this.FlushAsync(cancellationToken); } public void Dispose() { // Do not change this code. Put cleanup code in Dispose(bool disposing). this.Dispose(true); } protected async Task SendAsync(Process process, List<JaegerSpan> spans, CancellationToken cancellationToken) { try { var batch = new Batch(process, spans);#if NET46 await Task.Run(() => this.thriftClient.EmitBatch(batch));#else await this.thriftClient.EmitBatchAsync(batch, cancellationToken).ConfigureAwait(false);#endif } catch (Exception ex) { throw new JaegerExporterException($"Could not send {spans.Count} spans", ex); } } protected virtual void Dispose(bool disposing) { if (!this.disposedValue) { if (disposing) { this.thriftClient.Dispose(); this.clientTransport.Dispose(); } this.disposedValue = true; } } private int GetSize(TAbstractBase thriftBase) { using (var memoryTransport = new InMemoryTransport()) {#if NET46 thriftBase.Write(this.protocolFactory.GetProtocol(memoryTransport));#else thriftBase.WriteAsync(this.protocolFactory.GetProtocol(memoryTransport), CancellationToken.None).GetAwaiter().GetResult();#endif return memoryTransport.GetBuffer().Length; } } }}
Expand Down

0 comments on commit 9aaa138

Please sign in to comment.