Skip to content

Commit 3b25e68

Browse files
committed
Prototyping ratelimited request publisher
replacement for coordinated request publisher Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
1 parent 3a5896b commit 3b25e68

File tree

3 files changed

+67
-29
lines changed

3 files changed

+67
-29
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313
#
14-
version=1.0.0-RC2-SNAPSHOT-TEST
14+
version=1.0.0-RC2-SNAPSHOT

rsocket-core/src/main/java/io/rsocket/internal/RateLimitableRequestPublisher.java

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,6 @@ public static <T> RateLimitableRequestPublisher<T> wrap(Publisher<T> source, lon
6060
return new RateLimitableRequestPublisher<>(source, prefetch);
6161
}
6262

63-
// public static <T> RateLimitableRequestPublisher<T> wrap(Publisher<T> source) {
64-
// return wrap(source, Long.MAX_VALUE);
65-
// }
66-
6763
@Override
6864
public void subscribe(CoreSubscriber<? super T> destination) {
6965
synchronized (this) {
@@ -103,16 +99,22 @@ private void requestN() {
10399
}
104100

105101
final long er = externalRequested;
102+
final long p = prefetch;
103+
final int pendingFulfil = pendingToFulfil;
106104

107-
if (er != Long.MAX_VALUE && prefetch != Integer.MAX_VALUE) {
105+
if (er != Long.MAX_VALUE || p != Integer.MAX_VALUE) {
108106
// shortcut
109-
if (pendingToFulfil == prefetch) {
107+
if (pendingFulfil == p) {
110108
return;
111109
}
112110

113-
r = Math.min(prefetch - pendingToFulfil, er);
114-
externalRequested -= r;
115-
pendingToFulfil += r;
111+
r = Math.min(p - pendingFulfil, er);
112+
if (er != Long.MAX_VALUE) {
113+
externalRequested -= r;
114+
}
115+
if (p != Integer.MAX_VALUE) {
116+
pendingToFulfil += r;
117+
}
116118
} else {
117119
r = Long.MAX_VALUE;
118120
}
@@ -169,49 +171,51 @@ public void onSubscribe(Subscription s) {
169171
public void onNext(T t) {
170172
try {
171173
destination.onNext(t);
172-
deliveredElements++;
173174

174-
if (deliveredElements == limit) {
175-
deliveredElements = 0;
175+
if (prefetch == Integer.MAX_VALUE) {
176+
return;
177+
}
178+
179+
final long l = limit;
180+
int d = deliveredElements + 1;
181+
182+
if (d == l) {
183+
d = 0;
176184
final long r;
177185
final Subscription s;
178186

179187
synchronized (RateLimitableRequestPublisher.this) {
188+
long er = externalRequested;
180189
s = internalSubscription;
181190

182191
if (s == null) {
183192
return;
184193
}
185194

186-
if (externalRequested >= limit) {
187-
externalRequested -= limit;
195+
if (er >= l) {
196+
er -= l;
188197
// keep pendingToFulfil as is since it is eq to prefetch
189-
r = limit;
198+
r = l;
190199
} else {
191-
pendingToFulfil -= limit;
192-
if (externalRequested > 0) {
193-
r = externalRequested;
194-
externalRequested = 0;
200+
pendingToFulfil -= l;
201+
if (er > 0) {
202+
r = er;
203+
er = 0;
195204
pendingToFulfil += r;
196205
} else {
197206
r = 0;
198207
}
199208
}
209+
210+
externalRequested = er;
200211
}
201212

202213
if (r > 0) {
203214
s.request(r);
204215
}
205216
}
206-
// else if (deliveredElements == pendingToFulfil) {
207-
// deliveredElements = 0;
208-
// synchronized (RateLimitableRequestPublisher.this) {
209-
// pendingToFulfil -= deliveredElements;
210-
// if (deliveredElements == pendingToFulfil) {
211-
// return;
212-
// }
213-
// }
214-
// }
217+
218+
deliveredElements = d;
215219
} catch (Throwable e) {
216220
onError(e);
217221
}

rsocket-core/src/test/java/io/rsocket/internal/RateLimitableRequestPublisherTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,38 @@ public void accept(Long __) {
103103
.expectComplete()
104104
.verify(Duration.ofMillis(30000));
105105
}
106+
107+
@Test
108+
public void testThatRequestLongMaxValueWillBeDeliveredInSeparateChunks() {
109+
Flux<Integer> source =
110+
Flux.range(0, 10000000)
111+
.subscribeOn(Schedulers.parallel())
112+
.doOnRequest(r -> Assertions.assertThat(r).isLessThanOrEqualTo(128));
113+
114+
RateLimitableRequestPublisher<Integer> rateLimitableRequestPublisher =
115+
RateLimitableRequestPublisher.wrap(source, 128);
116+
117+
StepVerifier.create(rateLimitableRequestPublisher)
118+
.then(() -> rateLimitableRequestPublisher.request(Long.MAX_VALUE))
119+
.expectNextCount(10000000)
120+
.expectComplete()
121+
.verify(Duration.ofMillis(30000));
122+
}
123+
124+
@Test
125+
public void testThatRequestLongMaxWithIntegerMaxValuePrefetchWillBeDeliveredAsLongMaxValue() {
126+
Flux<Integer> source =
127+
Flux.range(0, 10000000)
128+
.subscribeOn(Schedulers.parallel())
129+
.doOnRequest(r -> Assertions.assertThat(r).isEqualTo(Long.MAX_VALUE));
130+
131+
RateLimitableRequestPublisher<Integer> rateLimitableRequestPublisher =
132+
RateLimitableRequestPublisher.wrap(source, Integer.MAX_VALUE);
133+
134+
StepVerifier.create(rateLimitableRequestPublisher)
135+
.then(() -> rateLimitableRequestPublisher.request(Long.MAX_VALUE))
136+
.expectNextCount(10000000)
137+
.expectComplete()
138+
.verify(Duration.ofMillis(30000));
139+
}
106140
}

0 commit comments

Comments
 (0)