Skip to content

Commit 0c3bfe2

Browse files
committed
Prototyping ratelimited request publisher
replacement for coordinated request publisher Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
1 parent 3a5896b commit 0c3bfe2

File tree

3 files changed

+68
-28
lines changed

3 files changed

+68
-28
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313
#
14-
version=1.0.0-RC2-SNAPSHOT-TEST
14+
version=1.0.0-RC2-SNAPSHOT

rsocket-core/src/main/java/io/rsocket/internal/RateLimitableRequestPublisher.java

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,6 @@ public static <T> RateLimitableRequestPublisher<T> wrap(Publisher<T> source, lon
6060
return new RateLimitableRequestPublisher<>(source, prefetch);
6161
}
6262

63-
// public static <T> RateLimitableRequestPublisher<T> wrap(Publisher<T> source) {
64-
// return wrap(source, Long.MAX_VALUE);
65-
// }
66-
6763
@Override
6864
public void subscribe(CoreSubscriber<? super T> destination) {
6965
synchronized (this) {
@@ -103,15 +99,19 @@ private void requestN() {
10399
}
104100

105101
final long er = externalRequested;
102+
final long p = prefetch;
103+
final int pendingFulfil = pendingToFulfil;
106104

107-
if (er != Long.MAX_VALUE && prefetch != Integer.MAX_VALUE) {
105+
if (er != Long.MAX_VALUE || p != Integer.MAX_VALUE) {
108106
// shortcut
109-
if (pendingToFulfil == prefetch) {
107+
if (pendingFulfil == p) {
110108
return;
111109
}
112110

113-
r = Math.min(prefetch - pendingToFulfil, er);
114-
externalRequested -= r;
111+
r = Math.min(p - pendingFulfil, er);
112+
if (er != Long.MAX_VALUE) {
113+
externalRequested -= r;
114+
}
115115
pendingToFulfil += r;
116116
} else {
117117
r = Long.MAX_VALUE;
@@ -169,49 +169,51 @@ public void onSubscribe(Subscription s) {
169169
public void onNext(T t) {
170170
try {
171171
destination.onNext(t);
172-
deliveredElements++;
173172

174-
if (deliveredElements == limit) {
175-
deliveredElements = 0;
173+
if (prefetch == Integer.MAX_VALUE) {
174+
return;
175+
}
176+
177+
final long l = limit;
178+
int d = deliveredElements + 1;
179+
180+
if (d == l) {
181+
d = 0;
176182
final long r;
177183
final Subscription s;
178184

179185
synchronized (RateLimitableRequestPublisher.this) {
186+
long er = externalRequested;
180187
s = internalSubscription;
181188

182189
if (s == null) {
183190
return;
184191
}
185192

186-
if (externalRequested >= limit) {
187-
externalRequested -= limit;
193+
if (er >= l) {
194+
er -= l;
188195
// keep pendingToFulfil as is since it is eq to prefetch
189-
r = limit;
196+
r = l;
190197
} else {
191-
pendingToFulfil -= limit;
192-
if (externalRequested > 0) {
193-
r = externalRequested;
194-
externalRequested = 0;
198+
pendingToFulfil -= l;
199+
if (er > 0) {
200+
r = er;
201+
er = 0;
195202
pendingToFulfil += r;
196203
} else {
197204
r = 0;
198205
}
199206
}
207+
208+
externalRequested = er;
200209
}
201210

202211
if (r > 0) {
203212
s.request(r);
204213
}
205214
}
206-
// else if (deliveredElements == pendingToFulfil) {
207-
// deliveredElements = 0;
208-
// synchronized (RateLimitableRequestPublisher.this) {
209-
// pendingToFulfil -= deliveredElements;
210-
// if (deliveredElements == pendingToFulfil) {
211-
// return;
212-
// }
213-
// }
214-
// }
215+
216+
deliveredElements = d;
215217
} catch (Throwable e) {
216218
onError(e);
217219
}

rsocket-core/src/test/java/io/rsocket/internal/RateLimitableRequestPublisherTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,42 @@ public void accept(Long __) {
103103
.expectComplete()
104104
.verify(Duration.ofMillis(30000));
105105
}
106+
107+
@Test
108+
public void testThatRequestLongMaxValueWillBeDeliveredInSeparateChunks() {
109+
Flux<Integer> source =
110+
Flux.range(0, 10000000)
111+
.subscribeOn(Schedulers.parallel())
112+
.doOnRequest(r -> Assertions.assertThat(r).isLessThanOrEqualTo(128));
113+
114+
RateLimitableRequestPublisher<Integer> rateLimitableRequestPublisher =
115+
RateLimitableRequestPublisher.wrap(source, 128);
116+
117+
StepVerifier.create(rateLimitableRequestPublisher)
118+
.then(
119+
() -> rateLimitableRequestPublisher.request(Long.MAX_VALUE)
120+
)
121+
.expectNextCount(10000000)
122+
.expectComplete()
123+
.verify(Duration.ofMillis(30000));
124+
}
125+
126+
@Test
127+
public void testThatRequestLongMaxWithIntegerMaxValuePrefetchWillBeDeliveredAsLongMaxValue() {
128+
Flux<Integer> source =
129+
Flux.range(0, 10000000)
130+
.subscribeOn(Schedulers.parallel())
131+
.doOnRequest(r -> Assertions.assertThat(r).isEqualTo(Long.MAX_VALUE));
132+
133+
RateLimitableRequestPublisher<Integer> rateLimitableRequestPublisher =
134+
RateLimitableRequestPublisher.wrap(source, Integer.MAX_VALUE);
135+
136+
StepVerifier.create(rateLimitableRequestPublisher)
137+
.then(
138+
() -> rateLimitableRequestPublisher.request(Long.MAX_VALUE)
139+
)
140+
.expectNextCount(10000000)
141+
.expectComplete()
142+
.verify(Duration.ofMillis(30000));
143+
}
106144
}

0 commit comments

Comments
 (0)