Skip to content

Commit

Permalink
improves traceQueue cleaning logic (#2241)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlegDokuka authored Jan 3, 2023
1 parent 29eddf1 commit 6a3612c
Show file tree
Hide file tree
Showing 4 changed files with 420 additions and 99 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2021 the original author or authors.
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,9 +18,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.Queue;
import java.util.function.Function;

import org.apache.commons.logging.Log;
Expand All @@ -42,8 +39,6 @@
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.context.scope.refresh.RefreshScope;
import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent;
import org.springframework.cloud.sleuth.CurrentTraceContext;
import org.springframework.cloud.sleuth.TraceContext;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.autoconfig.brave.BraveAutoConfiguration;
import org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth;
Expand Down Expand Up @@ -260,7 +255,7 @@ static void addQueueWrapper(ConfigurableApplicationContext springContext) {
if (log.isTraceEnabled()) {
log.trace("Decorating queues");
}
Hooks.addQueueWrapper(SLEUTH_TRACE_REACTOR_KEY, queue -> traceQueue(springContext, queue));
Hooks.addQueueWrapper(SLEUTH_TRACE_REACTOR_KEY, queue -> ReactorSleuth.traceQueue(springContext, queue));
}

@Override
Expand All @@ -274,84 +269,6 @@ public void close() throws IOException {
Schedulers.resetOnScheduleHook(TraceReactorAutoConfiguration.SLEUTH_REACTOR_EXECUTOR_SERVICE_KEY);
}

private static Queue<?> traceQueue(ConfigurableApplicationContext springContext, Queue<?> queue) {
if (!springContext.isActive()) {
return queue;
}
CurrentTraceContext currentTraceContext = springContext.getBean(CurrentTraceContext.class);
@SuppressWarnings("unchecked")
Queue envelopeQueue = queue;
return new AbstractQueue<Object>() {

@Override
public int size() {
return envelopeQueue.size();
}

@Override
public boolean offer(Object o) {
TraceContext traceContext = currentTraceContext.context();
return envelopeQueue.offer(new Envelope(o, traceContext));
}

@Override
public Object poll() {
Object object = envelopeQueue.poll();
if (object == null) {
// to clear thread-local
currentTraceContext.maybeScope(null);
return null;
}
else if (object instanceof Envelope) {
Envelope envelope = (Envelope) object;
restoreTheContext(envelope);
return envelope.body;
}
return object;
}

private void restoreTheContext(Envelope envelope) {
if (envelope.traceContext != null) {
currentTraceContext.maybeScope(envelope.traceContext);
}
}

@Override
public Object peek() {
Object peek = queue.peek();
if (peek instanceof Envelope) {
Envelope envelope = (Envelope) peek;
restoreTheContext(envelope);
return (envelope).body;
}
return peek;
}

@Override
@SuppressWarnings("unchecked")
public Iterator<Object> iterator() {
Iterator<?> iterator = queue.iterator();
return new Iterator<Object>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public Object next() {
Object next = iterator.next();
if (next instanceof Envelope) {
Envelope envelope = (Envelope) next;
restoreTheContext(envelope);
return (envelope).body;
}
return next;
}
};
}
};
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (log.isTraceEnabled()) {
Expand All @@ -367,17 +284,4 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
springContext = (ConfigurableApplicationContext) applicationContext;
}

static class Envelope {

final Object body;

final TraceContext traceContext;

Envelope(Object body, TraceContext traceContext) {
this.body = body;
this.traceContext = traceContext;
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2021 the original author or authors.
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,9 @@

package org.springframework.cloud.sleuth.instrument.reactor;

import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.Queue;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -323,6 +326,9 @@ public static Function<Runnable, Runnable> scopePassingOnScheduleHook(
try (CurrentTraceContext.Scope scope = currentTraceContext.maybeScope(traceContext)) {
delegate.run();
}
// extra step to ensure context is cleared when publishOn or similar
// operators leaks different context and leaves it uncleared
currentTraceContext.maybeScope(null);
};
}
return delegate;
Expand Down Expand Up @@ -632,6 +638,125 @@ public static Span spanFromContext(Tracer tracer, CurrentTraceContext currentTra
return newSpan;
}

public static Queue<?> traceQueue(ConfigurableApplicationContext springContext, Queue<?> queue) {
if (!springContext.isActive()) {
return queue;
}
CurrentTraceContext currentTraceContext = springContext.getBean(CurrentTraceContext.class);
@SuppressWarnings("unchecked")
Queue envelopeQueue = queue;
return new AbstractQueue<Object>() {

boolean cleanOnNull;

boolean hasPrevious = false;

Thread lastReader;

@Override
public int size() {
return envelopeQueue.size();
}

@Override
public boolean offer(Object o) {
TraceContext traceContext = currentTraceContext.context();
return envelopeQueue.offer(new Envelope(o, traceContext));
}

@Override
public Object poll() {
Object object = envelopeQueue.poll();
if (object == null) {
if (cleanOnNull) {
// to clear thread-local if was just restored
currentTraceContext.maybeScope(null);
}
cleanOnNull = true;
lastReader = Thread.currentThread();
hasPrevious = false;
return null;
}
else if (object instanceof Envelope) {
Envelope envelope = (Envelope) object;
restoreTheContext(envelope);
hasPrevious = true;
return envelope.body;
}
hasPrevious = true;
return object;
}

private void restoreTheContext(Envelope envelope) {
TraceContext traceContext = envelope.traceContext;
if (traceContext != null) {
if (!traceContext.equals(currentTraceContext.context())) {
if (!hasPrevious || !Thread.currentThread().equals(this.lastReader)) {
// means context was restored form the envelope, thus it has
// to be cleared
cleanOnNull = true;
lastReader = Thread.currentThread();
}
currentTraceContext.maybeScope(traceContext);
}
else if (!hasPrevious || !Thread.currentThread().equals(this.lastReader)) {
// means same context was already available, no need to clean
// anything
cleanOnNull = false;
lastReader = Thread.currentThread();
}
}
}

@Override
public Object peek() {
Object peek = queue.peek();
if (peek instanceof Envelope) {
Envelope envelope = (Envelope) peek;
restoreTheContext(envelope);
return (envelope).body;
}
return peek;
}

@Override
@SuppressWarnings("unchecked")
public Iterator<Object> iterator() {
Iterator<?> iterator = queue.iterator();
return new Iterator<Object>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public Object next() {
Object next = iterator.next();
if (next instanceof Envelope) {
Envelope envelope = (Envelope) next;
restoreTheContext(envelope);
return (envelope).body;
}
return next;
}
};
}
};
}

static class Envelope {

final Object body;

final TraceContext traceContext;

Envelope(Object body, TraceContext traceContext) {
this.body = body;
this.traceContext = traceContext;
}

}

}

class SleuthContextOperator<T> implements Subscription, CoreSubscriber<T>, Scannable {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.sleuth.brave.instrument.reactor;

import org.springframework.cloud.sleuth.CurrentTraceContext;
import org.springframework.cloud.sleuth.TraceContext;
import org.springframework.cloud.sleuth.brave.bridge.BraveAccessor;

/**
* @author Oleh Dokuka
*/
public class QueueWrapperTests extends org.springframework.cloud.sleuth.instrument.reactor.QueueWrapperTests {

brave.propagation.CurrentTraceContext traceContext = brave.propagation.CurrentTraceContext.Default.create();

CurrentTraceContext currentTraceContext = BraveAccessor.currentTraceContext(traceContext);

TraceContext context = BraveAccessor
.traceContext(brave.propagation.TraceContext.newBuilder().traceId(1).spanId(1).sampled(true).build());

@Override
protected CurrentTraceContext currentTraceContext() {
return this.currentTraceContext;
}

@Override
protected TraceContext context() {
return this.context;
}

}
Loading

0 comments on commit 6a3612c

Please sign in to comment.