Skip to content

Conversation

@zhijiangW
Copy link
Contributor

@zhijiangW zhijiangW commented Apr 10, 2019

What is the purpose of the change

NetworkEnvironment#unregisterTask is used for closing partition/gate and releasing partition from ResultPartitionManager. partition/gate close could be done in task which already maintains the arrays of them. Further we could release partition from ResultPartitionManager inside ResultPartition via introducing ResultPartition#fail(Throwable). To do so, the NetworkEnvironment#unregisterTask could be totally replaced to remove. The benefit is simplifying the method of NetworkEnvironment which would be regarded as default ShuffleService implementation.

Brief change log

  • Remove unregisterTask from NetworkEnvironment
  • Introduce close(Throwable) in ResultPartition for releasing
  • Fix the related logic in task class

Verifying this change

This change is already covered by existing tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 10, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ✅ 1. The [description] looks good.
  • ✅ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ✅ 4. The change fits into the overall [architecture].
  • ✅ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@zhijiangW
Copy link
Contributor Author

@azagrebin I ever thought that unifying the close of partition/gate with canceler process, but it seems a bit different during handling exception, so just make them independent as now.

In addition, I am not sure why the previous implementation only catches exception for gate close, and the partition close might also cause exception. So I just refactored the processes and kept the behavior as before to not change anything.

@azagrebin
Copy link
Contributor

@flinkbot approve all

@zhijiangW zhijiangW force-pushed the FLINK-12146 branch 2 times, most recently from 8587a06 to a4e3c7e Compare April 10, 2019 14:53
Copy link
Contributor

@azagrebin azagrebin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening the PR @zhijiangW ! I have left some comments.


for (ResultPartition partition : producedPartitions) {
taskEventDispatcher.unregisterPartition(partition.getPartitionId());
partition.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess:

if (isCanceledOrFailed()) {
  partition.fail(getFailureCause());
} else {
  partition.close();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I think it would be ok, if we actually add one more method closeNetworkResources and put there partition/gate closings from TaskCanceler.run and use here in releaseNetworkResources after taskEventDispatcher.unregisterPartition and if () partition.fail loops. We will eliminate code duplication and improve log/exception handling in former unregisterTask.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you also think so, we could integrate the network release with the process in task canceler. :)

@zhijiangW
Copy link
Contributor Author

@azagrebin thanks for reviews and the good inline suggestions!

I was supposed to submit a separate fixup commit for addressing your comments. But when I ament the first commit message to add more descriptions, the new code modifications has squashed with previous commit automatically. Considering the small overall changes which might not bring trouble for your further review, I keep the current status. :)

Copy link
Contributor

@azagrebin azagrebin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zhijiangW ! I left one suggestion.

* There are two scenarios to release the network resources. One is from {@link TaskCanceler} to early
* release partitions and gates. Another is from task thread during task exiting.
*/
private void closeNetworkResources(boolean isCanceling) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would break this method down:

private void releaseNetworkResources() {
  for all partitions taskEventDispatcher.unregisterPartition
  Throwable cause = isCanceledOrFailed() ? getFailureCause() : null;
  closeOrFailNetworkResources(producedPartitions, inputGates, cause);
}

static closeNetworkResources(producedPartitions, inputGates, cause) {
  // closing loops or fail for partition if cause is null
}

TaskCanceler.run() {
  closeOrFailNetworkResources(producedPartitions, inputGates, null); // to preserve what we have
}

I would also keep TaskCanceler class static to simplify refactoring in future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the option of keeping previous static TaskCanceler class.
But the static closeNetworkResources might need two more parameters. One is for taskNameWithSubtask used for log, and another is the boolean isCanceledOrFailed because in previous behavior the result of isCanceledOrFailed is not always equal to cause != null. In the case of Task#cancelExecution, the cause is null but isCanceledOrFailed would return true. What do you think?

private static void closeNetworkResources(
ResultPartition[] producedPartitions,
InputGate[] inputGates,
boolean isCanceledOrFailed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rename it to just isFailed? Also, method closeOrFailNetworkResources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we could actually have Task in TaskCanceler constructor and make this method non-static.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we prefer this way.

closeNetworkResources(producedPartitions, inputGates, isCanceledOrFailed(), getFailureCause(), taskNameWithSubtask);
}

private static void closeNetworkResources(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be used only in one place, I would remove it.

@zhijiangW
Copy link
Contributor Author

@azagrebin thanks for review again! I submitted a new commit for addressing above comments.

Copy link
Contributor

@azagrebin azagrebin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zhijiangW ! LGTM 👍

@zhijiangW zhijiangW force-pushed the FLINK-12146 branch 3 times, most recently from 8b43ec3 to cc1173c Compare April 16, 2019 02:36
Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have left one question and one suggestion.

ResultPartition[] producedPartitions,
InputGate[] inputGates) {

TaskCanceler(Logger logger, Task task, AbstractInvokable invokable, Thread executer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you introducing circular dependency here between Task and TaskCanceler? There are various reasons why this is bad, including: is it necessary to expose 27 public methods (including things like startTaskThread() or run()) to the TaskCanceler?

In various different places we are trying to get away from this pattern of passing StreamingTask everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation is for reusing the closeOrFailNetworkResources and avoid static method and pass the arrays of ResultPartition and InputGate explicitly. The previous AbstractInvokable could also be replaced and got from new Task parameter.

I think the previous introduced AbstractInvokable here is also not a good way considering exposing more public methods besides AbstractInvokable#cancel(). Comparing with Task parameter, we might add more public methods to do so. I agree with reverting this change to pass the previous specific three parameters here even though the closeOrFailNetworkResources might seem ugly.

* @param isFailed true if the task has failed.
* @param cause the exception that caused the task to fail, or null, if the task has not failed.
*/
private void closeOrFailNetworkResources(boolean isFailed, @Nullable Throwable cause) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for de duplicating this logic already ( :) ), but you could also go one step further and maybe extract this logic to something like TaskCloser class and de-duplicate/re-use it in TaskCanceller as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduced TaskCloser seems better, agree with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to pass then Runnable closeNetworkResources to TaskCanceler constructor because it is still task concern to manage partitions/gates and we can avoid exploding methods with parameters. TaskCanceler does not really closes the whole task, just interrupts network resources as well.

Also, just thought, that TaskCanceler actually does not need isFailed case.
@zhijiangW maybe, we could simplify closeOrFailNetworkResources and do only closing there. If we remove close from partition.fail, we could move conditional partition.fail to loop in releaseNetworkResources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... ok, sorry by doing quick review I missed one that you have already deduplicated TaskCanceler code 😳 I thought that those are two separate issues that I'm commenting on, but clearly this is just one thing, so responding to both threads here in single comment:

Yes, passing invokable is also not the pretties thing, but it looks like this issue is beside the scope of this PR, right?

I haven't thought it through but passing Runnable closeNetworkResources seems fine to me as well. It has a drawback of being more or less the same thing, but with more vague name/type in an exchange of less overhead code. With simple class TaskCloser we could better specify concurrency contracts (@ThreadSafe) etc, but I think I would be fine both way.

…to simplify the interface of ShuffleService

 NetworkEnvironment#unregisterTask is used for closing partition/gate and releasing partition from ResultPartitionManager. partition/gate close could be done in task which already maintains the arrays of them.

Further we could release partition from ResultPartitionManager inside ResultPartition via introducing ResultPartition#fail(Throwable).

To do so, the NetworkEnvironment#unregisterTask could be totally replaced to remove. The benefit is simplifying the method of NetworkEnvironment which would be regarded as default ShuffleService implementation.
@zhijiangW
Copy link
Contributor Author

Thanks for further reviews again @azagrebin @pnowojski
I have squashed the commits for addressing above comments, because the previous first hotfix commit is not valid any more.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I will merge it once it's green & @azagrebin will have no further comments.

@azagrebin
Copy link
Contributor

LGTM 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants