Skip to content

Commit 3490077

Browse files
authored
Recycle via reference counting (#587)
1 parent 0d9ce57 commit 3490077

File tree

26 files changed

+217
-96
lines changed

26 files changed

+217
-96
lines changed

apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public <T> Transaction startTransaction(TraceContext.ChildContextCreator<T> chil
235235
if (!coreConfiguration.isActive()) {
236236
transaction = noopTransaction();
237237
} else {
238-
transaction = transactionPool.createInstance().start(childContextCreator, parent, epochMicros, sampler);
238+
transaction = createTransaction().start(childContextCreator, parent, epochMicros, sampler);
239239
}
240240
if (logger.isDebugEnabled()) {
241241
logger.debug("startTransaction {} {", transaction);
@@ -252,7 +252,16 @@ public <T> Transaction startTransaction(TraceContext.ChildContextCreator<T> chil
252252
}
253253

254254
public Transaction noopTransaction() {
255-
return transactionPool.createInstance().startNoop();
255+
return createTransaction().startNoop();
256+
}
257+
258+
private Transaction createTransaction() {
259+
Transaction transaction = transactionPool.createInstance();
260+
while (transaction.getReferenceCount() != 0) {
261+
logger.warn("Tried to start a transaction with a non-zero reference count {} {}", transaction.getReferenceCount(), transaction);
262+
transaction = transactionPool.createInstance();
263+
}
264+
return transaction;
256265
}
257266

258267
@Nullable
@@ -295,7 +304,7 @@ public Span startSpan(AbstractSpan<?> parent, long epochMicros) {
295304
* @see #startSpan(TraceContext.ChildContextCreator, Object)
296305
*/
297306
public <T> Span startSpan(TraceContext.ChildContextCreator<T> childContextCreator, T parentContext, long epochMicros) {
298-
Span span = spanPool.createInstance();
307+
Span span = createSpan();
299308
final boolean dropped;
300309
Transaction transaction = currentTransaction();
301310
if (transaction != null) {
@@ -313,6 +322,15 @@ public <T> Span startSpan(TraceContext.ChildContextCreator<T> childContextCreato
313322
return span;
314323
}
315324

325+
private Span createSpan() {
326+
Span span = spanPool.createInstance();
327+
while (span.getReferenceCount() != 0) {
328+
logger.warn("Tried to start a span with a non-zero reference count {} {}", span.getReferenceCount(), span);
329+
span = spanPool.createInstance();
330+
}
331+
return span;
332+
}
333+
316334
private boolean isTransactionSpanLimitReached(Transaction transaction) {
317335
return coreConfiguration.getTransactionMaxSpans() <= transaction.getSpanCount().getStarted().get();
318336
}
@@ -372,7 +390,7 @@ public void endTransaction(Transaction transaction) {
372390
// we do report non-sampled transactions (without the context)
373391
reporter.report(transaction);
374392
} else {
375-
transaction.recycle();
393+
transaction.decrementReferences();
376394
}
377395
}
378396

@@ -387,7 +405,7 @@ public void endSpan(Span span) {
387405
}
388406
reporter.report(span);
389407
} else {
390-
span.recycle();
408+
span.decrementReferences();
391409
}
392410
}
393411

@@ -489,14 +507,14 @@ public List<ActivationListener> getActivationListeners() {
489507

490508
public void activate(TraceContextHolder<?> holder) {
491509
if (logger.isDebugEnabled()) {
492-
logger.debug("Activating {} on thread {}", holder.getTraceContext(), Thread.currentThread().getId());
510+
logger.debug("Activating {} on thread {}", holder, Thread.currentThread().getId());
493511
}
494512
activeStack.get().push(holder);
495513
}
496514

497515
public void deactivate(TraceContextHolder<?> holder) {
498516
if (logger.isDebugEnabled()) {
499-
logger.debug("Deactivating {} on thread {}", holder.getTraceContext(), Thread.currentThread().getId());
517+
logger.debug("Deactivating {} on thread {}", holder, Thread.currentThread().getId());
500518
}
501519
final Deque<TraceContextHolder<?>> stack = activeStack.get();
502520
assertIsActive(holder, stack.poll());

apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeBaseWrapper.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,11 @@ protected boolean beforeDelegation(final AbstractSpan<?> localSpan) {
5959

6060
protected void afterDelegation(final AbstractSpan<?> localSpan, boolean activated) {
6161
try {
62-
if (localSpan != null && activated) {
63-
localSpan.deactivate();
62+
if (localSpan != null) {
63+
if (activated) {
64+
localSpan.deactivate();
65+
}
66+
localSpan.decrementReferences();
6467
}
6568
doRecycle();
6669
} catch (Throwable t) {

apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public SpanInScopeCallableWrapper(ElasticApmTracer tracer) {
4848
public SpanInScopeCallableWrapper<V> wrap(Callable<V> delegate, AbstractSpan<?> span) {
4949
this.delegate = delegate;
5050
this.span = span;
51+
span.incrementReferences();
5152
return this;
5253
}
5354

apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public SpanInScopeRunnableWrapper(ElasticApmTracer tracer) {
4747
public SpanInScopeRunnableWrapper wrap(Runnable delegate, AbstractSpan<?> span) {
4848
this.delegate = delegate;
4949
this.span = span;
50+
span.incrementReferences();
5051
return this;
5152
}
5253

apm-agent-core/src/main/java/co/elastic/apm/agent/impl/error/ErrorPayload.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,6 @@ public int getPayloadSize() {
6868
return errors.size();
6969
}
7070

71-
@Override
72-
public void recycle() {
73-
for (ErrorCapture error : errors) {
74-
error.recycle();
75-
}
76-
}
77-
7871
@Override
7972
public void resetState() {
8073
errors.clear();

apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/Payload.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,4 @@ public SystemInfo getSystem() {
8181

8282
public abstract int getPayloadSize();
8383

84-
public abstract void recycle();
8584
}

apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/TransactionPayload.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,4 @@ public int getPayloadSize() {
7878
return transactions.size() + spans.size();
7979
}
8080

81-
@Override
82-
public void recycle() {
83-
for (int i = 0; i < transactions.size(); i++) {
84-
transactions.get(i).recycle();
85-
}
86-
for (int i = 0; i < spans.size(); i++) {
87-
spans.get(i).recycle();
88-
}
89-
}
9081
}

apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java

Lines changed: 55 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,33 +32,40 @@
3232
import javax.annotation.Nullable;
3333
import java.util.concurrent.Callable;
3434
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.atomic.AtomicInteger;
3536

3637
public abstract class AbstractSpan<T extends AbstractSpan> extends TraceContextHolder<T> {
3738
private static final Logger logger = LoggerFactory.getLogger(AbstractSpan.class);
3839
protected static final double MS_IN_MICROS = TimeUnit.MILLISECONDS.toMicros(1);
3940
protected final TraceContext traceContext;
4041

41-
// used to mark this span as expected to switch lifecycle-managing-thread, eg span created by one thread and ended by another
42-
private volatile boolean isLifecycleManagingThreadSwitch;
43-
4442
/**
4543
* Generic designation of a transaction in the scope of a single service (eg: 'GET /users/:id')
4644
*/
4745
protected final StringBuilder name = new StringBuilder();
4846
private long timestamp;
47+
4948
/**
5049
* How long the transaction took to complete, in ms with 3 decimal points
5150
* (Required)
5251
*/
5352
protected double duration;
53+
protected AtomicInteger references = new AtomicInteger();
54+
protected volatile boolean finished = true;
5455

55-
private volatile boolean finished = true;
56+
public int getReferenceCount() {
57+
return references.get();
58+
}
5659

5760
public AbstractSpan(ElasticApmTracer tracer) {
5861
super(tracer);
5962
traceContext = TraceContext.with64BitId(this.tracer);
6063
}
6164

65+
public boolean isReferenced() {
66+
return references.get() > 0;
67+
}
68+
6269
/**
6370
* How long the transaction took to complete, in ms with 3 decimal points
6471
* (Required)
@@ -116,8 +123,8 @@ public void resetState() {
116123
name.setLength(0);
117124
timestamp = 0;
118125
duration = 0;
119-
isLifecycleManagingThreadSwitch = false;
120126
traceContext.resetState();
127+
references.set(0);
121128
}
122129

123130
public boolean isChildOf(AbstractSpan<?> parent) {
@@ -129,6 +136,7 @@ public Span createSpan() {
129136
return createSpan(traceContext.getClock().getEpochMicros());
130137
}
131138

139+
@Override
132140
public Span createSpan(long epochMicros) {
133141
return tracer.startSpan(this, epochMicros);
134142
}
@@ -153,8 +161,14 @@ public void addLabel(String key, Boolean value) {
153161

154162
public abstract AbstractContext getContext();
155163

156-
protected void onStart() {
164+
/**
165+
* Called after the span has been started and its parent references are set
166+
*/
167+
protected void onAfterStart() {
157168
this.finished = false;
169+
// this final reference is decremented when the span is reported
170+
// or even after its reported and the last child span is ended
171+
incrementReferences();
158172
}
159173

160174
public void end() {
@@ -163,12 +177,13 @@ public void end() {
163177

164178
public final void end(long epochMicros) {
165179
if (!finished) {
166-
this.finished = true;
167-
this.duration = (epochMicros - timestamp) / AbstractSpan.MS_IN_MICROS;
180+
this.duration = (epochMicros - timestamp) / AbstractSpan.MS_IN_MICROS;
168181
if (name.length() == 0) {
169182
name.append("unnamed");
170183
}
171184
doEnd(epochMicros);
185+
// has to be set last so doEnd callbacks don't think it has already been finished
186+
this.finished = true;
172187
} else {
173188
logger.warn("End has already been called: {}", this);
174189
assert false;
@@ -182,20 +197,19 @@ public boolean isChildOf(TraceContextHolder other) {
182197
return getTraceContext().isChildOf(other);
183198
}
184199

185-
public void markLifecycleManagingThreadSwitchExpected() {
186-
isLifecycleManagingThreadSwitch = true;
200+
@Override
201+
public T activate() {
202+
incrementReferences();
203+
return super.activate();
187204
}
188205

189206
@Override
190-
public T activate() {
191-
if (isLifecycleManagingThreadSwitch) {
192-
// This serves two goals:
193-
// 1. resets the lifecycle management flag, so that the executing thread will remain in charge until set otherwise
194-
// by setting this flag once more
195-
// 2. reading this volatile field when span is activated on a new thread ensures proper visibility of other span data
196-
isLifecycleManagingThreadSwitch = false;
207+
public T deactivate() {
208+
try {
209+
return super.deactivate();
210+
} finally {
211+
decrementReferences();
197212
}
198-
return super.activate();
199213
}
200214

201215
/**
@@ -208,11 +222,7 @@ public T activate() {
208222
*/
209223
@Override
210224
public Runnable withActive(Runnable runnable) {
211-
if (isLifecycleManagingThreadSwitch) {
212-
return tracer.wrapRunnable(runnable, this);
213-
} else {
214-
return tracer.wrapRunnable(runnable, traceContext);
215-
}
225+
return tracer.wrapRunnable(runnable, this);
216226
}
217227

218228
/**
@@ -225,15 +235,32 @@ public Runnable withActive(Runnable runnable) {
225235
*/
226236
@Override
227237
public <V> Callable<V> withActive(Callable<V> callable) {
228-
if (isLifecycleManagingThreadSwitch) {
229-
return tracer.wrapCallable(callable, this);
230-
} else {
231-
return tracer.wrapCallable(callable, traceContext);
232-
}
238+
return tracer.wrapCallable(callable, this);
233239
}
234240

235241
public void setStartTimestamp(long epochMicros) {
236242
timestamp = epochMicros;
237243
}
238244

245+
public void incrementReferences() {
246+
references.incrementAndGet();
247+
if (logger.isDebugEnabled()) {
248+
logger.debug("increment references to {} ({})", this, references);
249+
if (logger.isTraceEnabled()) {
250+
logger.trace("incrementing references at",
251+
new RuntimeException("This is an expected exception. Is just used to record where the reference count has been incremented."));
252+
}
253+
}
254+
}
255+
256+
public void decrementReferences() {
257+
if (logger.isDebugEnabled()) {
258+
logger.debug("decrement references to {} ({})", this, references);
259+
if (logger.isTraceEnabled()) {
260+
logger.trace("decrementing references at",
261+
new RuntimeException("This is an expected exception. Is just used to record where the reference count has been decremented."));
262+
}
263+
}
264+
}
265+
239266
}

0 commit comments

Comments
 (0)