Skip to content

Conversation

@tillrohrmann
Copy link
Contributor

What is the purpose of the change

Port CoLocationConstraintITCase to new code base.

This PR is based on #7689.

Brief change log

  • "support colocation constraints and slot sharing" --> JobExecutionITCase#testCoLocationConstraintJobExecution

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 12, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ✅ 1. The [description] looks good.
  • ✅ 2. There is [consensus] that the contribution should go into to Flink.
  • ❔ 3. Needs [attention] from.
  • ✅ 4. The change fits into the overall [architecture].
  • ❌ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot approve description to approve the 1st aspect (similarly, it also supports the consensus, architecture and quality keywords)
  • @flinkbot approve all to approve all aspects
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval

receiver.setSlotSharingGroup(slotSharingGroup);
sender.setSlotSharingGroup(slotSharingGroup);

receiver.setStrictlyCoLocatedWith(sender);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even without L82-86 the test can pass. The reason is that some Sender/Receiver parallelism start and finish quickly. We can make sure that All Senders don't exit until all Receivers become running, maybe by setting a CountDownLatch like #6883

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is true. We don't have true assertions making sure that task are being co-located. The CountDownLatch would enforce that both tasks are online at the same time. I think this is not what we want to guarantee here. Instead we should test that the tasks are deployed in the same slot and, thus, using local channels for communication. Maybe a non serializable record could do the trick here. I'll try it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this doesn't work because we always serialize into a buffer independent of the channel type. The only difference is whether it goes through Netty or not I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be the solution. What we can do is to start the MiniCluster with only local communication enabled. That way we won't start netty and the communication needs to happen strictly locally :-).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tillrohrmann The test still succeeds even if local communication is set to false.

Copy link
Contributor Author

@tillrohrmann tillrohrmann Feb 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's expected that the test succeeds if localCommunication is set to false because it's the less restricted case. If localCommunication is true TMs cannot speak with each other.

What you should try is to comment the colocation constraint out to see that the test fails, because that's what we are testing here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course, that makes more sense 🤦‍♂️ . Unfortunately the test still runs successfully if the colocation constraint is removed. Based on the logs the sender tasks are finishing before the receivers are even started, so we never run out of slots, which as I understand is the failure condition here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @zentol and me talked offline, the test actually tests not only the co-location constraints but also the input preferences of normal scheduling. Thus, one needs to remove the slot sharing as well in order to make this test fail.

@zentol
Copy link
Contributor

zentol commented Feb 13, 2019

@flinkbot approve description
@flinkbot approve consensus
@flinkbot approve architecture

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flinkbot approve quality

@tillrohrmann
Copy link
Contributor Author

Thanks for the review @zentol. Merging once #7683 has been merged.

…ception, Deadline, long)

Properly pass the retryIntervalMillis to the sleep call.
- "recover a task manager failure" --> TaskExecutorITCase#testJobRecoveryWithFailingTaskExecutor
- "recover once failing forward job" --> JobRecoveryITCase#testTaskFailureRecovery
- "recover once failing forward job with slot sharing" --> JobRecoveryITCase#testTaskFailureWithSlotSharingRecovery

This closes apache#7683.
Increase the retry interval for TaskExecutorITCase and ZooKeeperLeaderElectionITCase
since they take so long that the low retry interval won't have a big effect apart
from a higher CPU load.
- "support colocation constraints and slot sharing" --> JobExecutionITCase#testCoLocationConstraintJobExecution

This closes apache#7690.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants