Skip to content

Commit 5e69ac5

Browse files
authored
ByteBody: add optional operation tracking (#12087)
ByteBody: add optional operation tracking with suppressed context on invalid reuse; inline failClaim in InternalByteBody and use instance method; cache system property at class load; update http and http-netty implementations; style fixes
1 parent c97bfac commit 5e69ac5

File tree

8 files changed

+78
-24
lines changed

8 files changed

+78
-24
lines changed

http-netty/src/main/java/io/micronaut/http/netty/body/AvailableNettyByteBody.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public static CloseableByteBody createChecked(@NonNull EventLoop loop, @NonNull
8989
public ByteBuf peek() {
9090
ByteBuf b = buffer;
9191
if (b == null) {
92-
throw new IllegalStateException("Body already claimed.");
92+
failClaim();
9393
}
9494
return b;
9595
}
@@ -108,8 +108,9 @@ public long length() {
108108
private ByteBuf claim() {
109109
ByteBuf b = buffer;
110110
if (b == null) {
111-
BaseSharedBuffer.failClaim();
111+
failClaim();
112112
}
113+
recordPrimaryOp();
113114
this.buffer = null;
114115
BaseSharedBuffer.logClaim();
115116
return b;
@@ -125,6 +126,7 @@ public void close() {
125126
ByteBuf b = buffer;
126127
this.buffer = null;
127128
if (b != null) {
129+
recordClosed();
128130
b.release();
129131
}
130132
}
@@ -169,7 +171,7 @@ public void close() {
169171
public @NonNull CloseableAvailableByteBody split() {
170172
ByteBuf b = buffer;
171173
if (b == null) {
172-
BaseSharedBuffer.failClaim();
174+
failClaim();
173175
}
174176
return new AvailableNettyByteBody(b.retainedSlice());
175177
}

http-netty/src/main/java/io/micronaut/http/netty/body/StreamingNettyByteBody.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ public BufferConsumer.Upstream primary(BufferConsumer primary) {
7676
touch();
7777
BufferConsumer.Upstream upstream = this.upstream;
7878
if (upstream == null) {
79-
BaseSharedBuffer.failClaim();
79+
failClaim();
8080
}
81+
recordPrimaryOp();
8182
this.upstream = null;
8283
BaseSharedBuffer.logClaim();
8384
sharedBuffer.subscribe(primary, upstream, forceDelaySubscribe);
@@ -94,7 +95,7 @@ protected BaseStreamingByteBody<SharedBuffer> derive(BufferConsumer.Upstream ups
9495
touch();
9596
BufferConsumer.Upstream upstream = this.upstream;
9697
if (upstream == null) {
97-
BaseSharedBuffer.failClaim();
98+
failClaim();
9899
}
99100
UpstreamBalancer.UpstreamPair pair = UpstreamBalancer.balancer(upstream, backpressureMode);
100101
this.upstream = pair.left();
@@ -106,8 +107,9 @@ protected BaseStreamingByteBody<SharedBuffer> derive(BufferConsumer.Upstream ups
106107
public @NonNull ExecutionFlow<? extends CloseableAvailableByteBody> bufferFlow() {
107108
BufferConsumer.Upstream upstream = this.upstream;
108109
if (upstream == null) {
109-
BaseSharedBuffer.failClaim();
110+
failClaim();
110111
}
112+
recordPrimaryOp();
111113
this.upstream = null;
112114
BaseSharedBuffer.logClaim();
113115
upstream.start();
@@ -122,6 +124,7 @@ public void close() {
122124
if (upstream == null) {
123125
return;
124126
}
127+
recordClosed();
125128
this.upstream = null;
126129
BaseSharedBuffer.logClaim();
127130
upstream.allowDiscard();

http/src/main/java/io/micronaut/http/body/InternalByteBody.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
import io.micronaut.core.annotation.Internal;
1919
import io.micronaut.core.annotation.NonNull;
20+
import io.micronaut.core.annotation.Nullable;
2021
import io.micronaut.core.execution.CompletableFutureExecutionFlow;
2122
import io.micronaut.core.execution.ExecutionFlow;
2223
import io.micronaut.core.io.buffer.ReadBuffer;
24+
import org.jetbrains.annotations.Contract;
2325
import org.reactivestreams.Publisher;
2426
import reactor.core.publisher.Flux;
2527

@@ -33,6 +35,30 @@
3335
*/
3436
@Internal
3537
public abstract class InternalByteBody implements ByteBody {
38+
private static final String TRACK_OPERATIONS_PROPERTY = ByteBody.class.getName() + ".trackOperations";
39+
private static final boolean TRACK_OPERATIONS = Boolean.getBoolean(TRACK_OPERATIONS_PROPERTY);
40+
41+
private @Nullable Throwable primaryOpTrace;
42+
private @Nullable Throwable closeTrace;
43+
44+
/**
45+
* Record the first primary operation location, if tracking is enabled.
46+
*/
47+
protected final void recordPrimaryOp() {
48+
if (TRACK_OPERATIONS && primaryOpTrace == null) {
49+
primaryOpTrace = new Exception("First ByteBody primary operation performed here");
50+
}
51+
}
52+
53+
/**
54+
* Record the first close location, if tracking is enabled.
55+
*/
56+
protected final void recordClosed() {
57+
if (TRACK_OPERATIONS && closeTrace == null) {
58+
closeTrace = new Exception("ByteBody closed here");
59+
}
60+
}
61+
3662
/**
3763
* Variant of {@link #buffer()} that uses the {@link ExecutionFlow} API for extra efficiency.
3864
*
@@ -56,6 +82,27 @@ public final CompletableFuture<? extends CloseableAvailableByteBody> buffer() {
5682
@Override
5783
public abstract @NonNull Publisher<ReadBuffer> toReadBufferPublisher();
5884

85+
/**
86+
* Throw the standard "already claimed" error and attach stored traces when tracking is enabled.
87+
*/
88+
@Contract("-> fail")
89+
protected final void failClaim() {
90+
IllegalStateException e = new IllegalStateException(
91+
"Request body has already been claimed: Two conflicting sites are trying to access the request body. " +
92+
"If this is intentional, the first user must ByteBody#split the body. " +
93+
"To find out where the body was claimed, enable the -D" + TRACK_OPERATIONS_PROPERTY + "=true system property."
94+
);
95+
if (TRACK_OPERATIONS) {
96+
if (primaryOpTrace != null) {
97+
e.addSuppressed(primaryOpTrace);
98+
}
99+
if (closeTrace != null) {
100+
e.addSuppressed(closeTrace);
101+
}
102+
}
103+
throw e;
104+
}
105+
59106
public static ExecutionFlow<? extends CloseableAvailableByteBody> bufferFlow(ByteBody body) {
60107
if (body instanceof InternalByteBody internal) {
61108
return internal.bufferFlow();

http/src/main/java/io/micronaut/http/body/ReactiveByteBufferByteBody.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ private ReactiveByteBufferByteBody(SharedBuffer sharedBuffer, BufferConsumer.Ups
5555
public BufferConsumer.Upstream primary(BufferConsumer primary) {
5656
BufferConsumer.Upstream upstream = this.upstream;
5757
if (upstream == null) {
58-
BaseSharedBuffer.failClaim();
58+
failClaim();
5959
}
60+
recordPrimaryOp();
6061
this.upstream = null;
6162
BaseSharedBuffer.logClaim();
6263
sharedBuffer.subscribe(primary, upstream);
@@ -72,7 +73,7 @@ protected ReactiveByteBufferByteBody derive(BufferConsumer.Upstream upstream) {
7273
public @NonNull CloseableByteBody split(@NonNull SplitBackpressureMode backpressureMode) {
7374
BufferConsumer.Upstream upstream = this.upstream;
7475
if (upstream == null) {
75-
BaseSharedBuffer.failClaim();
76+
failClaim();
7677
}
7778
UpstreamBalancer.UpstreamPair pair = UpstreamBalancer.balancer(upstream, backpressureMode);
7879
this.upstream = pair.left();
@@ -84,8 +85,9 @@ protected ReactiveByteBufferByteBody derive(BufferConsumer.Upstream upstream) {
8485
public @NonNull ExecutionFlow<? extends CloseableAvailableByteBody> bufferFlow() {
8586
BufferConsumer.Upstream upstream = this.upstream;
8687
if (upstream == null) {
87-
BaseSharedBuffer.failClaim();
88+
failClaim();
8889
}
90+
recordPrimaryOp();
8991
this.upstream = null;
9092
BaseSharedBuffer.logClaim();
9193
upstream.start();
@@ -99,6 +101,7 @@ public void close() {
99101
if (upstream == null) {
100102
return;
101103
}
104+
recordClosed();
102105
this.upstream = null;
103106
BaseSharedBuffer.logClaim();
104107
upstream.allowDiscard();

http/src/main/java/io/micronaut/http/body/stream/AvailableByteArrayBody.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,15 @@ public static AvailableByteArrayBody create(@NonNull ReadBuffer readBuffer) {
6868
@Override
6969
public @NonNull CloseableAvailableByteBody split() {
7070
if (readBuffer == null) {
71-
BaseSharedBuffer.failClaim();
71+
failClaim();
7272
}
7373
return new AvailableByteArrayBody(readBuffer.duplicate());
7474
}
7575

7676
@Override
7777
public long length() {
7878
if (readBuffer == null) {
79-
BaseSharedBuffer.failClaim();
79+
failClaim();
8080
}
8181
return readBuffer.readable();
8282
}
@@ -92,8 +92,9 @@ public long length() {
9292
public @NonNull ReadBuffer toReadBuffer() {
9393
ReadBuffer a = readBuffer;
9494
if (a == null) {
95-
BaseSharedBuffer.failClaim();
95+
failClaim();
9696
}
97+
recordPrimaryOp();
9798
readBuffer = null;
9899
BaseSharedBuffer.logClaim();
99100
return a;
@@ -114,6 +115,7 @@ public long length() {
114115
public void close() {
115116
ReadBuffer rb = readBuffer;
116117
if (rb != null) {
118+
recordClosed();
117119
rb.close();
118120
readBuffer = null;
119121
}
@@ -128,7 +130,7 @@ public void close() {
128130
public ReadBuffer peek() {
129131
ReadBuffer b = readBuffer;
130132
if (b == null) {
131-
BaseSharedBuffer.failClaim();
133+
failClaim();
132134
}
133135
return b;
134136
}

http/src/main/java/io/micronaut/http/body/stream/BaseSharedBuffer.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import io.micronaut.http.body.ByteBody;
2626
import io.micronaut.http.exceptions.BufferLengthExceededException;
2727
import io.micronaut.http.exceptions.ContentLengthExceededException;
28-
import org.jetbrains.annotations.Contract;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
3130
import reactor.core.publisher.Flux;
@@ -104,11 +103,6 @@ public BaseSharedBuffer(ReadBufferFactory readBufferFactory, BodySizeLimits limi
104103
this.rootUpstream = rootUpstream;
105104
}
106105

107-
@Contract("-> fail")
108-
public static void failClaim() {
109-
throw new IllegalStateException("Request body has already been claimed: Two conflicting sites are trying to access the request body. If this is intentional, the first user must ByteBody#split the body. To find out where the body was claimed, turn on TRACE logging for " + SPLIT_LOG_CLASS.getName() + ".");
110-
}
111-
112106
public static void logClaim() {
113107
if (SPLIT_LOG.isTraceEnabled()) {
114108
SPLIT_LOG.trace("Body split at this location. This is not an error, but may aid in debugging other errors", new Exception());

http/src/main/java/io/micronaut/http/body/stream/BaseStreamingByteBody.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,9 @@ protected BaseStreamingByteBody(SB sharedBuffer, BufferConsumer.Upstream upstrea
8686
public final @NonNull CloseableByteBody move() {
8787
BufferConsumer.Upstream upstream = this.upstream;
8888
if (upstream == null) {
89-
BaseSharedBuffer.failClaim();
89+
failClaim();
9090
}
91+
recordPrimaryOp();
9192
this.upstream = null;
9293
return derive(upstream);
9394
}
@@ -96,7 +97,7 @@ protected BaseStreamingByteBody(SB sharedBuffer, BufferConsumer.Upstream upstrea
9697
public final @NonNull CloseableByteBody allowDiscard() {
9798
BufferConsumer.Upstream upstream = this.upstream;
9899
if (upstream == null) {
99-
BaseSharedBuffer.failClaim();
100+
failClaim();
100101
}
101102
upstream.allowDiscard();
102103
return this;

http/src/main/java/io/micronaut/http/body/stream/InputStreamByteBody.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public static CloseableByteBody create(@NonNull InputStream stream, @NonNull Opt
9494
@Override
9595
public @NonNull CloseableByteBody allowDiscard() {
9696
if (stream == null) {
97-
BaseSharedBuffer.failClaim();
97+
failClaim();
9898
}
9999
stream.allowDiscard();
100100
return this;
@@ -103,6 +103,7 @@ public static CloseableByteBody create(@NonNull InputStream stream, @NonNull Opt
103103
@Override
104104
public void close() {
105105
if (stream != null) {
106+
recordClosed();
106107
stream.close();
107108
stream = null;
108109
}
@@ -111,7 +112,7 @@ public void close() {
111112
@Override
112113
public @NonNull CloseableByteBody split(SplitBackpressureMode backpressureMode) {
113114
if (stream == null) {
114-
BaseSharedBuffer.failClaim();
115+
failClaim();
115116
}
116117
StreamPair.Pair pair = StreamPair.createStreamPair(stream, backpressureMode);
117118
stream = pair.left();
@@ -127,8 +128,9 @@ public void close() {
127128
public @NonNull ExtendedInputStream toInputStream() {
128129
ExtendedInputStream s = stream;
129130
if (s == null) {
130-
BaseSharedBuffer.failClaim();
131+
failClaim();
131132
}
133+
recordPrimaryOp();
132134
stream = null;
133135
BaseSharedBuffer.logClaim();
134136
return s;

0 commit comments

Comments
 (0)