Skip to content

Commit

Permalink
Merge pull request #39049 from codefromthecrypt
Browse files Browse the repository at this point in the history
* pr/39049:
  Polish "Migrate to Brave 6 and Zipkin Reporter 3"
  Migrate to Brave 6 and Zipkin Reporter 3

Closes gh-39049
  • Loading branch information
mhalbritter committed Feb 20, 2024
2 parents 3a565e4 + 7555f6c commit 52648d9
Show file tree
Hide file tree
Showing 23 changed files with 657 additions and 594 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import brave.propagation.CurrentTraceContext.ScopeDecorator;
import brave.propagation.Propagation;
import brave.propagation.Propagation.Factory;
import brave.propagation.Propagation.KeyFactory;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;

import org.springframework.beans.factory.ObjectProvider;
Expand Down Expand Up @@ -99,12 +98,11 @@ BaggagePropagation.FactoryBuilder propagationFactoryBuilder(
return builder;
}

@SuppressWarnings("deprecation")
private Factory createThrowAwayFactory() {
return new Factory() {

@Override
public <K> Propagation<K> create(KeyFactory<K> keyFactory) {
public Propagation<String> get() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.function.Predicate;
import java.util.stream.Stream;

import brave.internal.propagation.StringPropagationAdapter;
import brave.propagation.B3Propagation;
import brave.propagation.Propagation;
import brave.propagation.Propagation.Factory;
Expand Down Expand Up @@ -71,9 +70,8 @@ public boolean requires128BitTraceId() {
}

@Override
@SuppressWarnings("deprecation")
public <K> Propagation<K> create(Propagation.KeyFactory<K> keyFactory) {
return StringPropagationAdapter.create(this.propagation, keyFactory);
public Propagation<String> get() {
return this.propagation;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,135 +18,81 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.net.URI;
import java.util.List;
import java.util.zip.GZIPOutputStream;

import zipkin2.Call;
import zipkin2.CheckResult;
import zipkin2.codec.Encoding;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.Sender;
import zipkin2.reporter.BaseHttpSender;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.HttpEndpointSupplier.Factory;

import org.springframework.http.HttpHeaders;
import org.springframework.util.unit.DataSize;

/**
* A Zipkin {@link Sender} that uses an HTTP client to send JSON spans. Supports automatic
* compression with gzip.
* A Zipkin {@link BytesMessageSender} that uses an HTTP client to send JSON spans.
* Supports automatic compression with gzip.
*
* @author Moritz Halbritter
* @author Stefan Bratanov
*/
abstract class HttpSender extends Sender {
abstract class HttpSender extends BaseHttpSender<URI, byte[]> {

private static final DataSize MESSAGE_MAX_SIZE = DataSize.ofKilobytes(512);

private volatile boolean closed;

@Override
public Encoding encoding() {
return Encoding.JSON;
}
/**
* Only use gzip compression on data which is bigger than this in bytes.
*/
private static final DataSize COMPRESSION_THRESHOLD = DataSize.ofKilobytes(1);

@Override
public int messageMaxBytes() {
return (int) MESSAGE_MAX_SIZE.toBytes();
HttpSender(Encoding encoding, Factory endpointSupplierFactory, String endpoint) {
super(encoding, endpointSupplierFactory, endpoint);
}

@Override
public int messageSizeInBytes(List<byte[]> encodedSpans) {
return encoding().listSizeInBytes(encodedSpans);
protected URI newEndpoint(String endpoint) {
return URI.create(endpoint);
}

@Override
public int messageSizeInBytes(int encodedSizeInBytes) {
return encoding().listSizeInBytes(encodedSizeInBytes);
protected byte[] newBody(List<byte[]> list) {
return this.encoding.encode(list);
}

@Override
public CheckResult check() {
try {
sendSpans(Collections.emptyList()).execute();
return CheckResult.OK;
protected void postSpans(URI endpoint, byte[] body) throws IOException {
HttpHeaders headers = getDefaultHeaders();
if (needsCompression(body)) {
body = compress(body);
headers.set("Content-Encoding", "gzip");
}
catch (IOException | RuntimeException ex) {
return CheckResult.failed(ex);
}
}

@Override
public void close() throws IOException {
this.closed = true;
postSpans(endpoint, headers, body);
}

/**
* The returned {@link HttpPostCall} will send span(s) as a POST to a zipkin endpoint
* when executed.
* @param batchedEncodedSpans list of encoded spans as a byte array
* @return an instance of a Zipkin {@link Call} which can be executed
* This will send span(s) as a POST to a zipkin endpoint.
* @param endpoint the POST endpoint. For example, http://localhost:9411/api/v2/spans.
* @param headers headers for the POST request
* @param body list of possibly gzipped, encoded spans.
*/
protected abstract HttpPostCall sendSpans(byte[] batchedEncodedSpans);
abstract void postSpans(URI endpoint, HttpHeaders headers, byte[] body);

@Override
public Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (this.closed) {
throw new ClosedSenderException();
}
return sendSpans(BytesMessageEncoder.JSON.encode(encodedSpans));
HttpHeaders getDefaultHeaders() {
HttpHeaders headers = new HttpHeaders();
headers.set("b3", "0");
headers.set("Content-Type", this.encoding.mediaType());
return headers;
}

abstract static class HttpPostCall extends Call.Base<Void> {

/**
* Only use gzip compression on data which is bigger than this in bytes.
*/
private static final DataSize COMPRESSION_THRESHOLD = DataSize.ofKilobytes(1);

private final byte[] body;

HttpPostCall(byte[] body) {
this.body = body;
}

protected byte[] getBody() {
if (needsCompression()) {
return compress(this.body);
}
return this.body;
}

protected byte[] getUncompressedBody() {
return this.body;
}

protected HttpHeaders getDefaultHeaders() {
HttpHeaders headers = new HttpHeaders();
headers.set("b3", "0");
headers.set("Content-Type", "application/json");
if (needsCompression()) {
headers.set("Content-Encoding", "gzip");
}
return headers;
}

private boolean needsCompression() {
return this.body.length > COMPRESSION_THRESHOLD.toBytes();
}
private boolean needsCompression(byte[] body) {
return body.length > COMPRESSION_THRESHOLD.toBytes();
}

private byte[] compress(byte[] input) {
ByteArrayOutputStream result = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(result)) {
gzip.write(input);
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
return result.toByteArray();
private byte[] compress(byte[] input) throws IOException {
ByteArrayOutputStream result = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(result)) {
gzip.write(input);
}

return result.toByteArray();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,13 +17,12 @@
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;

import zipkin2.Span;
import zipkin2.codec.BytesEncoder;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.reporter.Sender;
import zipkin2.reporter.BytesEncoder;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.SpanBytesEncoder;

import org.springframework.boot.actuate.autoconfigure.tracing.zipkin.ZipkinConfigurations.BraveConfiguration;
import org.springframework.boot.actuate.autoconfigure.tracing.zipkin.ZipkinConfigurations.OpenTelemetryConfiguration;
import org.springframework.boot.actuate.autoconfigure.tracing.zipkin.ZipkinConfigurations.ReporterConfiguration;
import org.springframework.boot.actuate.autoconfigure.tracing.zipkin.ZipkinConfigurations.SenderConfiguration;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
Expand All @@ -44,9 +43,8 @@
* @since 3.0.0
*/
@AutoConfiguration(after = RestTemplateAutoConfiguration.class)
@ConditionalOnClass(Sender.class)
@Import({ SenderConfiguration.class, ReporterConfiguration.class, BraveConfiguration.class,
OpenTelemetryConfiguration.class })
@ConditionalOnClass(Encoding.class)
@Import({ SenderConfiguration.class, BraveConfiguration.class, OpenTelemetryConfiguration.class })
@EnableConfigurationProperties(ZipkinProperties.class)
public class ZipkinAutoConfiguration {

Expand All @@ -58,8 +56,17 @@ PropertiesZipkinConnectionDetails zipkinConnectionDetails(ZipkinProperties prope

@Bean
@ConditionalOnMissingBean
public BytesEncoder<Span> spanBytesEncoder() {
return SpanBytesEncoder.JSON_V2;
Encoding encoding(ZipkinProperties properties) {
return switch (properties.getEncoding()) {
case JSON -> Encoding.JSON;
case PROTO3 -> Encoding.PROTO3;
};
}

@Bean
@ConditionalOnMissingBean(value = Span.class, parameterizedContainer = BytesEncoder.class)
BytesEncoder<Span> zipkinSpanEncoder(Encoding encoding) {
return SpanBytesEncoder.forEncoding(encoding);
}

}
Loading

0 comments on commit 52648d9

Please sign in to comment.