Skip to content

Conversation

@GJL
Copy link
Member

@GJL GJL commented Jun 20, 2019

What is the purpose of the change

WIP

Brief change log

  • See commits

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests, such as RegionPartitionReleaseStrategyTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@GJL GJL requested a review from tillrohrmann June 20, 2019 09:34
@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're on the right track.

// by filtering out late failure calls, we can save some work in
// avoiding redundant local failover
if (execution.getGlobalModVersion() == globalModVersion) {
partitionReleaseStrategy.vertexUnfinished(execution.getVertex().getID());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this to be necessary. This branch is only executed if the new state is FAILED, but this operation here only has any effect if the execution in question previously arrived in a FINISHED state. Since we don't allow state transitions from FINISHED to FAILED; this operation should always be a no-op.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right.


/**
* Calling this method informs the strategy that a vertex is no longer in finished state, e.g.,
* when an execution is restarted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the use of "execution" makes this javadoc a bit ambiguous, as it could either mean that we re-execute a vertex and restart an Execution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change it to "[...]e.g., when a vertex is re-executed"

for (HashSet<FailoverVertex> regionVertices : vertexToRegion.values()) {
distinctRegions.put(regionVertices, null);
}
final Set<Set<FailoverVertex>> distinctRegions = PipelinedRegionComputeUtil.computePipelinedRegions(topology);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should modify the constructor to accept the pipelined regions instead; similarly to the RegionPartitionReleaseStrategy

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not so obvious for me. RestartPipelinedRegionStrategy still needs FailoverVertex at the end of the day. Mapping from PipelinedRegion to Set<FailoverVertex> is tedious.

To me It's also not clear whether it is the right abstraction that PipelinedRegionComputeUtil#computePipelinedRegions() receives FailoverTopology as input.

Lastly, I think that this comment competes with that one.

return new PipelinedRegionConsumedBlockingPartitions(pipelinedRegion, resultPartitionsOutsideOfRegion);
}

private Set<IntermediateResultPartitionID> findResultPartitionsOutsideOfRegion(final PipelinedRegion pipelinedRegion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

findResultPartitionsInsideOfRegion; could be static

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method calls .map(schedulingTopology::getVertexOrThrow) so it cannot be static.

return filterResultPartitionsOutsideOfRegion(allConsumedPartitionsInRegion, pipelinedRegion);
}

private Set<IntermediateResultPartitionID> filterResultPartitionsOutsideOfRegion(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be static

}
}

private void initRegionExecutionViewByVertex(final Set<PipelinedRegion> pipelinedRegions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd modify these to return a collection instead and make them static.

this.schedulingTopology = checkNotNull(schedulingTopology);

checkNotNull(pipelinedRegions);
initConsumedBlockingPartitionsByRegion(pipelinedRegions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm wondering whether we shouldn't use the Failover classes here, purely because similar code (restart strategy, pipelined region computation) also works against that. I'm just worried that we end up re-inventing the wheel; everything we do here the RestartPipelinedRegionStrategy also has to do at some point.

AN example is determining the set of input vertices. If you'd rely on FailoverRegion instead of your new PipelinedRegion, you could extend the FailoverRegion with the below method (just maybe without streams), allowing us to share the logic between the release and failover strategies.

public Set<FailoverVertex> getInputVertices() {
	return getAllExecutionVertices().stream()
		.map(FailoverVertex::getInputEdges)
		.flatMap(edges -> StreamSupport.stream(edges.spliterator(), false))
		.map(FailoverEdge::getSourceVertex)
		.filter(vertex -> !executionVertices.contains(vertex))
		.collect(Collectors.toSet());
}

final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions);

regionPartitionReleaseStrategy.vertexFinished(onlyConsumerVertexId);
regionPartitionReleaseStrategy.vertexUnfinished(onlyConsumerVertexId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test would pass even if vertexUnfinished is a no-op.

Also, shouldn't we at some point mark the producer vertex as finished? A scenario where a consumer is finished without the producer being finished sounds a bit like undefined behavior.

Copy link
Member Author

@GJL GJL Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test would pass even if vertexUnfinished is a no-op.

Good catch.

Also, shouldn't we at some point mark the producer vertex as finished? A scenario where a consumer is finished without the producer being finished sounds a bit like undefined behavior.

We can, but:

  • calls to ExecutionGraph#updateState(TaskExecutionState state) can overtake each other. Therefore it is to be expected that a consumer can be finished (in our bookkeeping) before the producer.
  • I don't think it adds to the readability of the test

*/
class PipelinedRegionExecutionView {

private final PipelinedRegion pipelinedRegion;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we only have this field to ensure we don't unfinish add arbitrary vertices? You could have the same behavior by maintaining a Map<EVID, Boolean>. You would then only pass in a set of vertices through the constructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a getter for the pipelinedRegion which is actually used. Also, I don't see anything wrong with object composition here. Using Map would require additional processing of the input.

@zentol zentol self-assigned this Jun 20, 2019

private static Set<Set<FailoverVertex>> uniqueRegions(final Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion) {
// find out all the distinct regions
final IdentityHashMap<Set<FailoverVertex>, Object> distinctRegions = new IdentityHashMap<>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider Collections.newSetFromMap(new IdentityHashMap<>())

* Provides a virtual execution state of a {@link PipelinedRegion}.
*
* <p>A pipelined region can be either finished or unfinished. It is finished iff. all its
* execution have reached the finished state.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executions

@GJL
Copy link
Member Author

GJL commented Jun 21, 2019

Rebased to master


if (attempt != null) {
try {
maybeReleasePartitions(attempt, state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this must be done after the attempt is marked as finished. If the finishing fails we might still need these partitions.

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation may leak partitions if a vertex that produces a blocking partition was finished and is being reset.
The reliance on the current execution attempt may make it difficult to handle this case, as this imposes a temporal constraint: the release strategy must be informed about this event, and handle it, before the vertex is reset and a new Execution is created.

@GJL
Copy link
Member Author

GJL commented Jun 24, 2019

@zentol If a blocking partition is finished and its producer being reset, are there cases where we do not want to release that partition? I wonder if the release strategy should be responsible to release partitions in cases of errors.

@zentol
Copy link
Contributor

zentol commented Jun 24, 2019

If a blocking partition is finished and its producer being reset, are there cases where we do not want to release that partition?

I don't think so.

I wonder if the release strategy should be responsible to release partitions in cases of errors.

I'm not sure either. We'd have to handle it in the ExecutionVertex otherwise, which would be unfortunate as it adds yet another place from where release calls can originate.

/**
* Calling this method informs the strategy that a vertex is no longer in finished state, e.g.,
* when a vertex is re-executed.
* @param executionVertexID Id of the vertex that is no longer in finished state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an empty line before @param to keep consistency with above vertexFinished

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* Calling this method informs the strategy that a vertex finished.
*
* @param finishedVertex Id of the vertex that finished the execution
* @return A list of result partition that can be released
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition->partitions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, fixed

unfinishedVertices.remove(executionVertexId);
}

public void unfinish(final ExecutionVertexID executionVertexId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be package private

unfinishedVertices.add(executionVertexId);
}

public PipelinedRegion getPipelinedRegion() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: package private

* <p>A pipelined region can be either finished or unfinished. It is finished iff. all its
* executions have reached the finished state.
*/
class PipelinedRegionExecutionView {
Copy link
Contributor

@zhijiangW zhijiangW Jun 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PipelinedRegionExecutionView -> PipelinedRegionStateView? because we want to expose the region state here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PipelinedRegionExecutionStateView would be most exact. I'll think about it.

return unfinishedVertices.isEmpty();
}

public void finish(final ExecutionVertexID executionVertexId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to call vertexFinished as PartitionReleaseStrategy#vertexFinished, also for the following vertexUnfinished

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, changed it

return new PipelinedRegion(new HashSet<>(Arrays.asList(executionVertexIds)));
}

public Set<ExecutionVertexID> getExecutionVertexIds() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be package private

}

public static PipelinedRegion from(final Set<ExecutionVertexID> executionVertexIds) {
return new PipelinedRegion(executionVertexIds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkNotNull(executionVertexIds)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check added

}

public static PipelinedRegion from(final ExecutionVertexID... executionVertexIds) {
return new PipelinedRegion(new HashSet<>(Arrays.asList(executionVertexIds)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: checkNotNull(executionVertexIds)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here it is ok to omit checkNotNull.

  • PipelinedRegion.from(null) is an ambiguous call, and does not compile currently
  • PipelinedRegion.from(null, null) cannot be caught by checkNotNull.
  • Arrays.asList() already eagerly checks for a null argument. From the stacktrace it should be obvious where the NPE originates from.

this.consumedBlockingPartitions = checkNotNull(consumedBlockingPartitions);
}

public Set<IntermediateResultPartitionID> getConsumedBlockingPartitions() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package private method

Copy link
Member Author

@GJL GJL Jun 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class already has default visibilty. We could reduce visibility for all it's methods but that makes harder to move the class to a different package if it is needed in the future. Moreover, if a class can be extended,public makes it clear what the interface of this class is:

https://stackoverflow.com/questions/5260467/public-methods-in-package-private-classes

return consumedBlockingPartitions;
}

public PipelinedRegion getPipelinedRegion() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: package private


checkNotNull(pipelinedRegions);
initConsumedBlockingPartitionsByRegion(pipelinedRegions);
initRegionExecutionViewByVertex(pipelinedRegions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could loop pipelinedRegions only once for both inits.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True but I think a method should only do one thing. For performance reasons only, it's not worth it imo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could still have two individual methods via adjusting the parameter as PipelinedRegion instead of Set in initConsumedBlockingPartitionsByRegion, and make the loop in constructor.
I believe it has no performance issue here, so you could keep the way as now. :)


private final Map<PipelinedRegion, PipelinedRegionConsumedBlockingPartitions> consumedBlockingPartitionsByRegion = new IdentityHashMap<>();

private final Map<ExecutionVertexID, PipelinedRegionExecutionView> regionExecutionViewByVertex = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might give an initial size for above maps

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Size needs to be numberOfExecutionVertexIds / loadFactor to avoid rehashing. I don't think it's worth it here.

Copy link
Contributor

@zhijiangW zhijiangW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR and I like the overall implementations @GJL !

I have not finished the whole review and left some minor comments atm. I would continue on it later.

public interface PartitionReleaseStrategy {

/**
* Calling this method informs the strategy that a vertex finished.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that -> when may be better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current Javadoc is comprehensible imo.

List<IntermediateResultPartitionID> vertexFinished(ExecutionVertexID finishedVertex);

/**
* Calling this method informs the strategy that a vertex is no longer in finished state, e.g.,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, I think that -> when may be better.

// TODO: finish implementation
}

private ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add Id suffix for the parameter naming

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

final PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion);

pipelinedRegionExecutionView.vertexFinished(TEST_EXECUTION_VERTEX_ID);
pipelinedRegionExecutionView.vertexUnfinished(TEST_EXECUTION_VERTEX_ID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only confirm does it exist in real process that one finished vertex would be unfinished again?
If not, I wonder whether we need some addition checks in detail implementations of vertexUnfinished/vertexFinished.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it can happen in production that a finished vertex is reset. The simplest example is a global failover where some vertex was already finished.

There are also other cases connected to the new failover strategy, which may reset finished vertices if an upstream partition is required for a failover but has disappeared. Then this partitions producer is restarted, along with all downstream consumers, finished or not.

assertThat(partitionsToRelease, is(empty()));
}

private static Set<PipelinedRegion> pipelinedRegionsSet(final PipelinedRegion... pipelinedRegions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be further deduplication via
pipelinedRegionsSet(ExecutionVertexID producerIds, ExecutionVertexID... consumerIds)

Copy link
Member Author

@GJL GJL Jun 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the current abstraction.

In your signature it is unclear where the pipelined region boundaries are:

pipelinedRegionsSet(ExecutionVertexID producerIds, ExecutionVertexID... consumerIds)
pipelinedRegionsSet(onlyProducerVertexId, consumerVertex1, consumerVertex2);

@zhijiangW
Copy link
Contributor

I have finished the whole review @GJL !

}

// we use the map (list -> null) to imitate an IdentityHashSet (which does not exist)
// this helps to optimize the building performance as it uses reference equality
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems to be outdated since the result is now simply saved in a set backend by a IdentityHashMap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, fixed

@GJL
Copy link
Member Author

GJL commented Jun 27, 2019

rebased to master

@GJL GJL force-pushed the FLINK-12883 branch 2 times, most recently from 066a269 to 8f954d0 Compare June 28, 2019 07:14
@GJL
Copy link
Member Author

GJL commented Jun 28, 2019

Squashed commits

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a reminder, once #8928 is merged we have to extend it in this PR to also inform the release strategy of reset vertices, otherwise the scenario below could result in wrong behaviors:

Assume a failover region consisting of 3 operators (o1,o2,o3), where o1 consumes some blocking partition, o2/o3 consume o1 (pipelined) and each have a blocking output.

The job executes; o1 and o2 are finished, o3 fails.

At this point, the strategy knows that o1 and o2 is finished, and that o3 is not finished.

The job is restarted, and the entire region is re-deployed. The strategy is not informed any of these state changes, and still believes o1 and o2 to be finished.

On this run, the deployment of o2 takes a long at time, so much that o1 and o3 are finishing before o2 was ever deployed (and thus, no external state update has been received).

At this point the strategy would release the partition that o1 is consuming, because it knows that o1 and o3 just finished, and o2 never was unfinished.

GJL added 3 commits July 1, 2019 13:33
Replace instances where we use an IdentityHashMap as a set with
Collections.newSetFromMap() in RestartPipelinedRegionStrategy and
PipelineRegionComputeUtil.
Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's an edge case that wasn't considered

return false;
}
final boolean stateUpdated = updateStateInternal(state, attempt);
maybeReleasePartitions(state, attempt);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should either take stateUpdated into account here, or check the actual execution state opposed to what is in the task state update.

If you have a vertex in a CANCELING state, which then receives a FINISHED state updated, the vertex completes the canceling and switches to CANCELED. In this situation we will already issue release calls for all partitions this vertex has produced.
The strategy however would still consider this vertex as finished, and potentially release preceding partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushed a fix or this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to me

@GJL
Copy link
Member Author

GJL commented Jul 2, 2019

LGTM

GJL added 2 commits July 2, 2019 17:07
- Introduce interface PartitionReleaseStrategy.
- Introduce RegionPartitionReleaseStrategy and
  NotReleasingPartitionReleaseStrategy implementations, which can be configured
  via a new config option.
- Add unit tests for new classes.
- Increase visibility of methods in TestingSchedulingTopology for unit tests
  outside of its package.
@zentol zentol merged commit c9aa9a1 into apache:master Jul 2, 2019
@GJL GJL deleted the FLINK-12883 branch September 3, 2019 09:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants