-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Part 11] Introducing Source::close #162
Conversation
@Override | ||
public void close() throws IOException { | ||
try { | ||
Closeables.combine(jobs).close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is how we intend to use it, I think it might be better to write this as a utility function.
Alternatively, we just make jobs
a Closeables
and keep on adding to it instead of the array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't Closeables::combine (https://github.com/Netflix/mantis/blob/master/mantis-common/src/main/java/com/mantisrx/common/utils/Closeables.java) a good enough utility function? What more do you want to move into a function?
Regarding the second comment, I prefer to keep it stored in the original DS because if you capture it as a closeable, you lose all other capabilities.
import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse; | ||
|
||
@Value | ||
class ClientWithResponse<R, E> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting class to exist. Could you please add javadoc for why it's needed OR how it's used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have moved this within HttpSourceImpl as an inner class because it's only used from within it. I think the JavaDoc is unnecessary, given that this is strictly an implementation detail.
@@ -708,7 +693,11 @@ public void serverRemoved(ServerInfo server) { | |||
} | |||
|
|||
public void reset() { | |||
connectedServers.clear(); | |||
Set<ServerInfo> connectedServerInfos = connectedServers.keySet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only concern is that iterating over keyset may throw exception if map is changed. i'm not familiar enough with concurrent hashmap to know if that can happen or not. you can also just use getConnectedServers()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@calvin681 I can look into the behavior of keySet() under changes to the underlying concurrent map but do you think removeConnectedServer where we close the underlying HttpClient makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The streams API might provide a more compact / safe way to iterate over the elements in that list. I think the removeConnectedServer
function makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the keyset() doc the iterator is 'weakly consistent' and shall never emit modification exception.
In the meantime, I wonder if there is anything blocking/timeout inside the httpclient shutdown call that in extreme cases can cause this operation to hang/stuck for a long time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so maybe this can be done as fire-and-forget with timeout in a different thread?
@@ -708,7 +693,11 @@ public void serverRemoved(ServerInfo server) { | |||
} | |||
|
|||
public void reset() { | |||
connectedServers.clear(); | |||
Set<ServerInfo> connectedServerInfos = connectedServers.keySet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The streams API might provide a more compact / safe way to iterate over the elements in that list. I think the removeConnectedServer
function makes sense.
Context
Every source needs to close any resources such as HTTP clients or thread pools associated with creating the Observable as part of the Source::call API. This diff adds a new method to the source API, which will eventually be called as part of stage execution.
Checklist
./gradlew build
compiles code correctly./gradlew test
passes all testsCONTRIBUTING.md