Skip to content

Conversation

@azagrebin
Copy link
Contributor

What is the purpose of the change

Introduce ShuffleService interface with two factory methods for ShuffleMaster and ShuffleEnvironment and its configuration. Implement and configure by default NettyShuffleService.

Brief change log

  • Introduce ShuffleService interface with two factory methods for ShuffleMaster and ShuffleEnvironment
  • Introduce configuration core option with the full name of the implementation class and a ShuffleServiceLoader utility class to load the configured ShuffleService implementation from the class path
  • Implement and configure by default NettyShuffleService.

Verifying this change

Refactoring, existing unit tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (np)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (docs / JavaDocs)

@azagrebin
Copy link
Contributor Author

The PR is based on #8608 at the moment, only last two commits are to review.

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 10, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@azagrebin
Copy link
Contributor Author

@flinkbot attention @tillrohrmann @zhijiangW

.key("shuffle.service.class.name")
.defaultValue("org.apache.flink.runtime.io.network.NettyShuffleService")
.withDescription("The full name of shuffle service implementation class. " +
"The default implentation uses netty network communication, local memory and file system on task executor");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implentation -> implementation

/**
* The shuffle service implementation class full name.
*/
public static final ConfigOption<String> SHUFFLE_SERVICE = ConfigOptions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not put this option into runtime module? Are there any considerations

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the one hand, it would not hurt to put it into the flink-runtime module because it should be a cluster only option and thus not needed in an API module. On the other hand we also put the NettyShuffleEnvironmentOptions and HistoryServerOptions into flink-core. Thus, one could argue that the current placement is in line with the other options.

TaskEventPublisher taskEventPublisher,
MetricGroup metricGroup,
IOManager ioManager) {
checkNotNull(ioManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to obey the parameters sequence during checkNotNull.
also check for the left taskExecutorLocation and metricGroup?

@VisibleForTesting
static NettyShuffleEnvironment createNettyShuffleEnvironment(
NettyShuffleEnvironmentConfiguration config,
ResourceID taskExecutorLocation,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskExecutorLocation -> resourceID, because we have the TaskManagerLocation class which might be confused

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also be called taskExecutorResourceId

}

private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
checkNotNull(metricGroup);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might remove this if this check could be done before in createNettyShuffleEnvironment

/**
* Task executor local context of shuffle service used to create {@link ShuffleEnvironment}.
*/
class ShuffleLocalContext {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class seems not be used atm. What is the motivation for bringing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, some rebase leftover, it is now ShuffleEnvironmentContext

* Factory method to create a specific {@link ShuffleMaster} implementation in job master.
*
* @param configuration Flink configuration
* @return shuffle manager implementation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shuffle manager -> shuffle master

/**
* Utility to load the pluggable {@link ShuffleService} implementations.
*/
public class ShuffleServiceLoader {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might need a unit test for covering this class

Copy link
Contributor

@zhijiangW zhijiangW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR @azagrebin !

It looks overall good to me and I left some nit inline comments.

@tillrohrmann tillrohrmann self-assigned this Jun 13, 2019
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR @azagrebin. I had some comments mainly concerning the configuration options which we should resolve before moving forward.

// ------------------------------------------------------------------------

/**
* The shuffle service implementation class full name.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class name of the shuffle service implementation to be used by the cluster.

/**
* The shuffle service implementation class full name.
*/
public static final ConfigOption<String> SHUFFLE_SERVICE = ConfigOptions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to call this option SHUFFLE_SERVICE_CLASS

* The shuffle service implementation class full name.
*/
public static final ConfigOption<String> SHUFFLE_SERVICE = ConfigOptions
.key("shuffle.service.class.name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to use the following key: shuffle-service.class

public static final ConfigOption<String> SHUFFLE_SERVICE = ConfigOptions
.key("shuffle.service.class.name")
.defaultValue("org.apache.flink.runtime.io.network.NettyShuffleService")
.withDescription("The full name of shuffle service implementation class. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The full class name of the shuffle service implementation to be used by the cluster.

.key("shuffle.service.class.name")
.defaultValue("org.apache.flink.runtime.io.network.NettyShuffleService")
.withDescription("The full name of shuffle service implementation class. " +
"The default implentation uses netty network communication, local memory and file system on task executor");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default implementation uses Netty for network communication and local memory as well disk space to store results on a TaskExecutor.

ShuffleMaster<SD> createShuffleMaster(Configuration configuration);

/**
* Factory method to create a specific local {@link ShuffleEnvironment} implementation in task executor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove in task executor

return shuffleService.orElseThrow(() -> new FlinkException(
String.format(
"Could not instantiate the ShuffleService '%s'. " +
"Please make sure that this class is on your class path.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation seems wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still the same argument, the string is broken by concatenation, I suggest we leave it this way to distinguish it from the next argument.

/**
* Utility to load the pluggable {@link ShuffleService} implementations.
*/
public class ShuffleServiceLoader {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could simply be an enum since it only contains static methods.

ShuffleService.class,
classLoader));
} catch (ClassNotFoundException e) {
return Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we swallowing this exception here? It looks as if we recreate it at the call site of lookupEmbeddedShuffleServices if the result is empty. I think it would be better to simply return ShuffleService<?, ?, ?> and to allow this method to throw a ClassNotFoundException.

shuffleServiceClassName)));
}

private static Optional<ShuffleService<?, ?, ?>> lookupEmbeddedShuffleServices(String shuffleServiceClassName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method has some code duplication with HighAvailabilityServiceUtils#createCustomHAServices. Maybe we could factor a common method out which calls InstantiationUtil.instantiate and creates a FlinkException in case of a ClassNotFoundException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will move wrapping of ClassNotFoundException by FlinkException into InstantiationUtil.instantiate.

@azagrebin azagrebin force-pushed the FLINK-12706 branch 2 times, most recently from 9281b24 to 5e0465f Compare June 17, 2019 09:00
@azagrebin
Copy link
Contributor Author

Thanks for the review @zhijiangW @tillrohrmann, I have addressed comments

*/
public class SingleInputGateFactory {
@SuppressWarnings("LoggerInitializedWithForeignClass")
private static final Logger LOG = LoggerFactory.getLogger(SingleInputGate.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use SingleInputGateFactory.class here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to preserve the structure of the previous logging messages because the factory was a result of refactoring. I can change this if it is not important.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks for the explanation!


{% include generated/resource_manager_configuration.html %}

### Shuffle service
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upper case for Service for consistent with others.


{% include generated/resource_manager_configuration.html %}

### Shuffle service
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: Service

import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.util.FlinkException;
import org.junit.Test;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add an empty line before this import.

configuration.setString(SHUFFLE_SERVICE_FACTORY_CLASS, "org.apache.flink.runtime.shuffle.ShuffleServiceLoaderTest$CustomShuffleServiceFactory");
ShuffleServiceFactory<?, ?, ?> shuffleServiceFactory = ShuffleServiceLoader.loadShuffleServiceFactory(configuration);
assertThat(
"Loaded shuffle service factory is not the default netty implementation",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should adjust the message

@zhijiangW
Copy link
Contributor

@azagrebin thanks for the updates and I only left several minor comments.

BTW, the commit of [hotfix][network] Rename taskExecutorLocation should be tagged as fixup instead of hotfix. fixup would be squashed when merging, but hotfix would be merged in a separate commit.

@azagrebin
Copy link
Contributor Author

@zhijiangW
thanks for the review, I have addressed comments.
I actually planned [hotfix][network] Rename taskExecutorLocation as a separate commit because it does somewhat unrelated cleanup.

@zhijiangW
Copy link
Contributor

Sorry I thought the renaming issue was caused by this PR in previous commit before.
Yes, if this renaming already existed before this PR, it should be a hotfix here. :)
Thanks for updating, LGTM!

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good work @azagrebin. LGTM. Addressing my last two comments while merging this PR.

LOG.info("Shutting down the network environment and its components.");

// terminate all network connections
//noinspection OverlyBroadCatchBlock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's revert this.

/**
* Test suite for {@link ShuffleServiceLoader} utility.
*/
public class ShuffleServiceLoaderTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extends TestLogger is missing

sjwiesman pushed a commit to sjwiesman/flink that referenced this pull request Jun 26, 2019
@azagrebin azagrebin changed the title [FLINK-12706] Introduce ShuffleService interface and its configuration [FLINK-12706] Introduce ShuffleServiceFactory interface and its configuration Aug 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants