-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[test-only] Simulate dialogue clients against a DeterministicScheduler #348
Changes from 101 commits
4bdc207
6b57fd5
b38159d
187af81
2c0cc5c
56a9e6d
df76f65
1f42a68
14e395e
fdd9b49
38d06ea
1ae2078
0e8e6f3
5785c19
6fb6ba3
94b7a78
347d67a
fdcc3b5
f6212e7
fee44df
6d8bc0f
448873a
233c91a
95b533e
e1e0967
118cf8e
70b04dc
5e41eee
1b39887
a44444c
ea9c18c
6e072d6
257e703
7dafc33
b4d6a79
392461c
b4d987b
6880f42
db4d4a3
7c5e843
577e9da
d655eb8
13af858
12961a6
e90a29a
369aa85
5dcd96a
481d3e8
d9c94ce
72f7e9e
543e201
84a56c5
264be81
b580e81
6e0b7b2
d1171c4
7b256ca
a009747
36b0974
3c949f0
c418523
4220edf
ff87fbc
2364460
8cafb90
2ad087d
31f7a9e
49cf31c
367bf60
a9c0547
42f6339
128051b
c164cfa
6cb43b3
5ce1e84
be42176
67a7d75
7a18393
d1fa9ad
f3c94dd
11d1162
32f77fe
5571cc0
821c637
aeda3ed
1821e6d
c72ac47
994340d
941bc58
31eed09
2cabe21
de5fa01
dde229a
199e585
2565396
562f7b2
f323f14
a2955fa
2f954ff
54c5646
5ffefea
dc43ad4
12a24f1
3468b5a
90889ee
cf3acae
888242f
765b6b5
28d9b4b
c445cd5
19ee73a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,5 +56,3 @@ __init__.pyc | |
.cache/ | ||
.ipynb_checkpoints/ | ||
.vscode/ | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/* | ||
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.palantir.dialogue.core; | ||
|
||
import com.google.common.util.concurrent.Futures; | ||
import com.google.common.util.concurrent.ListenableFuture; | ||
import com.palantir.conjure.java.api.config.service.ServicesConfigBlock; | ||
import com.palantir.dialogue.Channel; | ||
import com.palantir.dialogue.Endpoint; | ||
import com.palantir.dialogue.Request; | ||
import com.palantir.dialogue.Response; | ||
import com.palantir.logsafe.exceptions.SafeIllegalStateException; | ||
import java.util.Optional; | ||
import java.util.function.Supplier; | ||
|
||
final class GenericRefreshingChannel implements Channel { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. split this change to a separate PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pulled out here: #352 |
||
private final Supplier<Channel> channelSupplier; | ||
|
||
private GenericRefreshingChannel(Supplier<Channel> channelSupplier) { | ||
this.channelSupplier = channelSupplier; | ||
} | ||
|
||
/** | ||
* Returns a refreshing {@link Channel} for a service identified by {@code service} in | ||
* {@link ServicesConfigBlock#services()}. | ||
*/ | ||
public static <T> Channel create(Supplier<T> confSupplier, ChannelFactory<T> channelFactory) { | ||
Supplier<Channel> channelSupplier = new MemoizingComposingSupplier<>(confSupplier, conf -> { | ||
return channelFactory.create(conf).orElse(AlwaysThrowingChannel.INSTANCE); | ||
}); | ||
return new GenericRefreshingChannel(channelSupplier); | ||
} | ||
|
||
@Override | ||
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) { | ||
return channelSupplier.get().execute(endpoint, request); | ||
} | ||
|
||
private enum AlwaysThrowingChannel implements Channel { | ||
INSTANCE; | ||
|
||
@Override | ||
public ListenableFuture<Response> execute(Endpoint _endpoint, Request _request) { | ||
return Futures.immediateFailedFuture(new SafeIllegalStateException("Service not configured")); | ||
} | ||
} | ||
|
||
public interface ChannelFactory<T> { | ||
Optional<Channel> create(T conf); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -27,6 +27,7 @@ | |||||
import com.palantir.dialogue.Endpoint; | ||||||
import com.palantir.dialogue.Request; | ||||||
import com.palantir.dialogue.Response; | ||||||
import com.palantir.logsafe.SafeArg; | ||||||
import java.io.ByteArrayInputStream; | ||||||
import java.io.InputStream; | ||||||
import java.util.List; | ||||||
|
@@ -37,6 +38,8 @@ | |||||
import java.util.concurrent.LinkedBlockingDeque; | ||||||
import java.util.concurrent.atomic.AtomicInteger; | ||||||
import org.immutables.value.Value; | ||||||
import org.slf4j.Logger; | ||||||
import org.slf4j.LoggerFactory; | ||||||
|
||||||
/** | ||||||
* A {@link Channel} that queues requests while the underlying {@link LimitedChannel} is unable to accept any new | ||||||
|
@@ -57,7 +60,7 @@ | |||||
* TODO(jellis): record metrics for queue sizes, num requests in flight, time spent in queue, etc. | ||||||
*/ | ||||||
final class QueuedChannel implements Channel { | ||||||
|
||||||
private static final Logger log = LoggerFactory.getLogger(QueuedChannel.class); | ||||||
private static final Executor DIRECT = MoreExecutors.directExecutor(); | ||||||
|
||||||
private final BlockingDeque<DeferredCall> queuedCalls; | ||||||
|
@@ -114,8 +117,12 @@ private void onCompletion() { | |||||
* Try to schedule as many tasks as possible. Called when requests are submitted and when they complete. | ||||||
*/ | ||||||
private void schedule() { | ||||||
int numScheduled = 0; | ||||||
while (scheduleNextTask()) { | ||||||
// Do nothing | ||||||
numScheduled++; | ||||||
} | ||||||
if (numScheduled > 1) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
zero and one are also helpful data points imo. |
||||||
log.debug("Scheduled {} at the same time", SafeArg.of("numScheduled", numScheduled)); | ||||||
} | ||||||
} | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
versionsLock { | ||
testProject() | ||
} | ||
|
||
dependencies { | ||
implementation project(':dialogue-core') | ||
implementation project(':dialogue-target') | ||
implementation 'org.jmock:jmock' | ||
implementation 'org.knowm.xchart:xchart' | ||
|
||
testImplementation 'com.palantir.safe-logging:preconditions-assertj' | ||
testImplementation 'junit:junit' | ||
testImplementation 'org.assertj:assertj-core' | ||
testImplementation 'org.mockito:mockito-core' | ||
|
||
testRuntimeOnly 'org.slf4j:slf4j-simple' | ||
|
||
annotationProcessor 'org.immutables:value' | ||
compile 'org.immutables:value::annotations' | ||
} | ||
|
||
tasks.withType(JavaCompile) { | ||
options.errorprone.errorproneArgs += '-Xep:Slf4jLogsafeArgs:OFF' | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LongSupplier or a custom interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out the netflix builder requires a
Supplier<Long>
, so I've used the same caffeine Ticker interface we use everywhere else.