-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Part 12] Calling Source::close, Sink::close as part of stage execution #163
[Part 12] Calling Source::close, Sink::close as part of stage execution #163
Conversation
Only the last commit in this diff needs to be reviewed. It's stacked on top of Part 11. |
this.propService = ServiceRegistry.INSTANCE.getPropertiesService(); | ||
this.jobName = jobName; | ||
} | ||
|
||
@SuppressWarnings( {"rawtypes", "unchecked"}) | ||
@Override | ||
public void start(final StageConfig<T, R> stage, Observable<Observable<R>> toServe) { | ||
public void start(final StageConfig<?, T> stage, Observable<Observable<T>> toServe) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting change, wasn't expecting it.
LGTM but would appreciate a closer look from one more person here. perhaps @codyrioux or @Andyz26
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just took a look through and this appears to be fine. I like that it is more explicit about the fact that we are not using the first type parameter in the StageConfig
in these locations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I appreciate that you took the time to leverage functional features and get rid of the ad-hoc interface implementations.
Before both the APIs were not available, we would get rid of the container as part of shutting down the workload. In the new Titus world, we want to gracefully shut down all the resources associated with the stage execution.
9d9819e
to
5d380c1
Compare
public class SinkPublisher<T, R> implements WorkerPublisher<T, R> { | ||
/** | ||
* Implementation that publishes the results of a stage to a sink such as an SSE port. | ||
* @param <T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
private final Action0 onUnsubscribeAction; | ||
private final Action0 observableOnCompleteCallback; | ||
private final Action1<Throwable> observableOnErrorCallback; | ||
private Subscription eagerSubscription; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: separate the state from the rest of the fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
} finally { | ||
if (eagerSubscription != null) { | ||
eagerSubscription.unsubscribe(); | ||
eagerSubscription = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should sink also be set to null eventually?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -100,6 +100,7 @@ private void registerMetrics(Metrics metrics) { | |||
} | |||
|
|||
@Override | |||
public void stop() {} | |||
public void close() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is there nothing to close for this executor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing the Observable handles most of the behavior under the hood when the unsubscribe occurs. Or just depends on the whole JVM being blown away.
private String name; | ||
private int serverPort; | ||
private final String name; | ||
private final int serverPort; | ||
private RemoteRxServer server; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separate state from the fields
|
||
@Override | ||
public void close() throws IOException { | ||
if (future != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reset the state in the finally block to make close() idempotent
|
||
@Override | ||
public void close() throws IOException { | ||
executor.shutdownNow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set executor to null
@@ -129,6 +136,7 @@ public WorkerExecutionOperationsNetworkStage( | |||
String locateSpectatorRegistry = | |||
ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.locate.spectator.registry", "true"); | |||
lookupSpectatorRegistry = Boolean.valueOf(locateSpectatorRegistry); | |||
scheduledExecutorService = new ScheduledThreadPoolExecutor(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name the thread pool factory please.
@@ -640,7 +653,7 @@ private WorkerConsumer connectToObservableAtPreviousStages(Observable<JobSchedul | |||
} | |||
|
|||
@Override | |||
public void shutdownStage() { | |||
public void shutdownStage() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sure shutdownStage cannot be called when executeStage is being executed.
@@ -650,6 +663,7 @@ public void shutdownStage() { | |||
} | |||
} | |||
|
|||
System.exit(0); | |||
Closeables.combine(closeables).close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clean the closeables to make this idempotent.
@@ -100,6 +100,7 @@ private void registerMetrics(Metrics metrics) { | |||
} | |||
|
|||
@Override | |||
public void stop() {} | |||
public void close() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing the Observable handles most of the behavior under the hood when the unsubscribe occurs. Or just depends on the whole JVM being blown away.
Context
Before both the APIs were not available, we would get rid of the container as part of shutting down the workload. In the new Titus world, we want to gracefully shut down all the resources associated with the stage execution.
Checklist
./gradlew build
compiles code correctly./gradlew test
passes all testsCONTRIBUTING.md