Skip to content

Conversation

@azagrebin
Copy link
Contributor

What is the purpose of the change

Extract ShuffleEnvironment interface from NetworkEnvironment.

Brief change log

  • Add page size config also to TaskManagerServicesConfiguration because this general option is also used for MemoryManager independently from NetworkEnvironment. Any other ShuffleEnvironment can potentially use its own page size.
  • Extract ShuffleEnvironment interface from NetworkEnvironment with its public methods.
  • Make NetworkEnvironment its default implementation
  • Rename NetworkEnvironment to NettyShuffleEnvironment.

Verifying this change

Refactoring covered by existing tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 4, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@azagrebin
Copy link
Contributor Author

@flinkbot attention @zhijiangW

@azagrebin azagrebin force-pushed the FLINK-11392 branch 3 times, most recently from d92a811 to e57ce16 Compare June 4, 2019 19:43
long timerServiceShutdownTimeout,
RetryingRegistrationConfiguration retryingRegistrationConfiguration,
Optional<Time> systemResourceMetricsProbingInterval) {
InetAddress taskManagerAddress,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep the previous indentation

MemoryType memoryType,
boolean preAllocateMemory,
float memoryFraction,
int pageSize, long timerServiceShutdownTimeout,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate line for parameters


nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(),
getPageSize(configuration), ConfigurationParserUtils.getSlot(configuration), configuration);
ConfigurationParserUtils.getPageSize(configuration), ConfigurationParserUtils.getSlot(configuration), configuration);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate line for all the parameters

Configuration config) throws IOException {
return TaskManagerServicesConfiguration.fromConfiguration(
config, MEM_SIZE_PARAM, InetAddress.getLocalHost(), true);
config, InetAddress.getLocalHost());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate line for every parameter or we do not need break line here because it might not be long after removing above parameters.


/**
* Verifies that {@link TaskManagerServicesConfiguration#fromConfiguration(Configuration, long, InetAddress, boolean)}
* Verifies that {@link TaskManagerServicesConfiguration#fromConfiguration(Configuration, InetAddress)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test is only for verifying the generation of NetworkEnvironmentConfiguration. The scope of TaskManagerServicesConfiguration#fromConfiguration covers many things. Also this class name TaskManagerServicesConfigurationTest might also be renamed to NetworkEnvironmentConfigurationTest or something else.

this.isShutdown = false;
}

public static NetworkEnvironment fromConfiguration(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to put this static factory at the end of this class, also for the following create method.

maxJvmHeapMemory,
localTaskManagerCommunication,
taskManagerAddress);
return NetworkEnvironment.create(networkConfig, taskEventPublisher, metricGroup, ioManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done in previous hotfix commit while introducing fromConfiguration firstly.


public static NetworkEnvironment create(
NetworkEnvironmentConfiguration config,
NetworkEnvironmentConfiguration config,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the changes for commit [FLINK-11392][network] Introduce ShuffleEnvironment interface, this indentation should not be changed.

public boolean updatePartitionInfo(
ExecutionAttemptID consumerID,
PartitionInfo partitionInfo) throws IOException, InterruptedException {
ExecutionAttemptID consumerID,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: keep previous indentation

}
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ATM this method is only used for testing, so it seems no reasonable to define an interface method only for testing. If the previous test could be refactored not relying on this method. I ever considered this issue to remove this method.

final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();

final NetworkEnvironment networkEnv = new NetworkEnvironmentBuilder().build();
final ShuffleEnvironment networkEnv = new NetworkEnvironmentBuilder().build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename shuffleEnvironment for networkEnv

Executor executor = mock(Executor.class);

NetworkEnvironment network = new NetworkEnvironmentBuilder().build();
ShuffleEnvironment network = new NetworkEnvironmentBuilder().build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: rename network as shuffleEnvironment

mock(MemoryManager.class),
mock(IOManager.class),
networkEnvironment,
shuffleEnvironment,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation issue here.

taskConfig);

NetworkEnvironment network = new NetworkEnvironmentBuilder().build();
ShuffleEnvironment network = new NetworkEnvironmentBuilder().build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename network as shuffleEnvironment

import java.util.Collection;

/**
* Interface for the implementation of shuffle service locally on task executor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shuffle service -> shuffle environment

/**
* Interface for the implementation of shuffle service locally on task executor.
*
* <p>Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shuffle service -> shuffle environment

* Interface for the implementation of shuffle service locally on task executor.
*
* <p>Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
* The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove next?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then available as well if we do not want to provide a lot of details here

*
* <p>Input/Output interface of local shuffle service is based on memory {@link org.apache.flink.runtime.io.network.buffer.Buffer}s.
* The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data
* and buffers from created here {@link InputGate}s to read it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the buffers are not requested from InputGate by task, and it is done by network thread.
Maybe and the task can read buffers from created here {@link InputGate}s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think requests has similar meaning in this context because task still gets the buffers from gates (network thread is an internal detail how to give buffers to the Task) but I will rephrase it a bit.

* The task can request next available memory buffers from created here {@link ResultPartitionWriter}s to write shuffle data
* and buffers from created here {@link InputGate}s to read it.
*
* <h2>Service lifecycle management.</h2>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove Service here, only Lifecycle management

*
* <p>The interface implements a factory of result partition writers for the task output: {@code createResultPartitionWriters}.
* The created writers are grouped per task and handed over to the task thread upon its startup.
* The task is responsible for the writers lifecycle from that moment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writers -> writers'

* <li>{@code releasePartitions(Collection<ResultPartitionID>)} is called outside of the task thread,
* e.g. to manage the local resource lifecycle of external partitions which outlive the task production.</li>
* </ol>
* The partitions, which currently still occupy local resources, can be queried with {@code updatePartitionInfo}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queried -> updated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually the method is wrong :) should be updatePartitionInfo -> getUnreleasedPartitions

*
* <p>The interface implements a factory for the task input gates: {@code createInputGates}.
* The created gates are grouped per task and handed over to the task thread upon its startup.
* The task is responsible for the gates lifecycle from that moment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gates -> gates'

public interface ShuffleEnvironment {

/**
* Starts the internal related services upon {@link TaskExecutor}'s startup.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starts -> Start to keep the same form with the following methods.

* Interface for the implementation of shuffle service local environment.
*
* <p>Input/Output interface of local shuffle service environment is based on memory {@link Buffer Buffers}.
* A producer can write shuffle data into the buffers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that putting subordinate clauses on separate lines increase readability. If it does, then we break this convention later in the docs by putting multiple sentences on the same line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's stick to keeping similar line lengths as in other classes.

*
* <p>Input/Output interface of local shuffle service environment is based on memory {@link Buffer Buffers}.
* A producer can write shuffle data into the buffers,
* obtained from the created here {@link ResultPartitionWriter ResultPartitionWriters}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here seems wrong

* <p>Input/Output interface of local shuffle service environment is based on memory {@link Buffer Buffers}.
* A producer can write shuffle data into the buffers,
* obtained from the created here {@link ResultPartitionWriter ResultPartitionWriters}
* and a consumer read the buffers from the created here {@link InputGate InputGates}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read -> reads and here seems wrong

* if the production has failed.</li>
* <li>{@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called
* if the production is done. The actual release can take some time
* if 'the end of consumption' confirmation is being awaited implicitly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, now I got it. You are talking here about the ResultPartition and not the ResultPartitionWriter. This makes sense now.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments about the JavaDocs of ShuffleEnvironment.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for resolving all of my comments @azagrebin. The changes now look really good. I'll be merging the PR now. While merging I'll address my own last two comments.

this.hostAddress = checkNotNull(hostAddress);
this.eventPublisher = eventPublisher;
this.parentMetricGroup = parentMetricGroup;
this.ioManager = ioManager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not doing the checkNotNull for the last three parameters?

RetryingRegistrationConfiguration retryingRegistrationConfiguration,
Optional<Time> systemResourceMetricsProbingInterval) {
this.configuration = configuration;
this.resourceID = resourceID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkNotNull missing for these parameters.

Move <Flink configuration> -> NetworkEnvironmentConfiguration parsing from TaskManagerServicesConfiguration
to NetworkEnviroment.fromConfiguration(flinkConfiguration) factory method and
use it to create NetworkEnviroment in TaskManagerServices.
This way TaskManagerServices does not depend on network internals.

Memory page size is additionally parsed for TaskManagerServicesConfiguration
to decouple it from NetworkEnvironmentConfiguration because
the page size is used also for memory manager outside of NetworkEnviroment.
Theoretically shuffle implementations can have their own page size in future,
different from TaskManagerOptions.MEMORY_SEGMENT_SIZE.
tillrohrmann pushed a commit to azagrebin/flink that referenced this pull request Jun 7, 2019
azagrebin added a commit to azagrebin/flink that referenced this pull request Jun 7, 2019
@azagrebin
Copy link
Contributor Author

Thanks for the review @tillrohrmann, I've pushed the final version with the last changes including yours.

return parentMetricGroup;
}

public IOManager getIoManager() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be getIOManager, upper case for both I and O

* @param localCommunicationOnly True if only local communication is possible.
* Use only in cases where only one task manager runs.
*
* @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, etc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return TaskManagerServicesConfiguration

return memoryType;
}

public long getFreeHeapMemoryWithDefrag() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package private

return freeHeapMemoryWithDefrag;
}

public long getMaxJvmHeapMemory() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: package private

return taskManagerAddress;
}

public boolean isLocalCommunicationOnly() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package private

@zhijiangW
Copy link
Contributor

@azagrebin @tillrohrmann sorry for leaving several nit comments at the point of merging this PR.
My above comments are not critical and might be addressed conveniently future.

azagrebin added a commit to azagrebin/flink that referenced this pull request Jun 10, 2019
@azagrebin
Copy link
Contributor Author

Thanks @zhijiangW , addressed

@tillrohrmann
Copy link
Contributor

Thanks for the review @zhijiangW and the good work @azagrebin. Merging now.

sjwiesman pushed a commit to sjwiesman/flink that referenced this pull request Jun 26, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants