Skip to content

Conversation

@azagrebin
Copy link
Contributor

@azagrebin azagrebin commented May 20, 2019

What is the purpose of the change

At the moment, partition/gate create methods in NetworkEnvironment have a lot of metrics arguments to maintain original layout for metric groups. This approach is not quite encapsulated and clean for shuffle API. We can have just one parent group for shuffle metrics. The old layout can be still maintained in parallel and deprecated. At the moment we can do it with a couple of casts (if shuffle implementation is NetworkEnvironment) and adding an additional legacy metric registration which can be removed later.

Brief change log

  • Change NetworkEnvironment.createResultPartitionWriters/createInputGates to have only one parent metric group argument.
  • Add InputChannelMetricsWithLegacy to increment input metrics from the legacy group as well
  • Move legacy metric group creation to deprecated NetworkEnvironment.registerLegacyNetworkMetrics
  • clean Task of legacy code but add call to NetworkEnvironment.registerLegacyNetworkMetrics after creation of partitions and gates (later needs instanceOf check for actual ShuffleService)

Verifying this change

simple refactoring, existing tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@azagrebin
Copy link
Contributor Author

The new metric group layout is still to discuss.
We also need to agree on how we deprecate the old groups in docs.

@flinkbot
Copy link
Collaborator

flinkbot commented May 20, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@azagrebin
Copy link
Contributor Author

@flinkbot attention @zentol @zhijiangW

@rmetzger rmetzger requested a review from zentol May 20, 2019 09:38
@azagrebin azagrebin changed the title [FLINK-12555] Introduce an incapsulated metric group layout for shuffle API [FLINK-12555] Introduce an encapsulated metric group layout for shuffle API May 20, 2019
@zentol zentol self-assigned this May 22, 2019
Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the deprecation in the documentation, could you write down a list of all metrics that are being modified, with the original and modified scope?


@Override
public void incNumBytesInLocalCounter(long inc) {
super.incNumBytesInLocalCounter(inc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's quite easy to encapsulate this logic in a Counter implementation, which would require significantly less overall changes and would no longer introduce a new special case on how metrics are being used.

public static class CounterWrapper implements Counter {
	private final Counter counter1;
	private final Counter counter2;

	private CounterWrapper(Counter counter1, Counter counter2) {
		this.counter1 = counter1;
		this.counter2 = counter2;
	}

	@Override
	public void inc() {
		counter1.inc();
		counter2.inc();
	}

	@Override
	public void inc(long n) {
		counter1.inc(n);
		counter2.inc(n);

	}

	@Override
	public void dec() {
		counter1.dec();
		counter2.dec();
	}

	@Override
	public void dec(long n) {
		counter1.dec(n);
		counter2.dec(n);
	}

	@Override
	public long getCount() {
		// assume that the counters are not accessed directly elsewhere
		return counter1.getCount();
	}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, switched to the counter wrapper

@rmetzger rmetzger requested a review from zentol May 23, 2019 12:48
@azagrebin
Copy link
Contributor Author

Thanks @zentol ! addressed comments


@Override
public long getCount() {
// assume that the counters are not accessed directly elsewhere
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether getCounter would be actually used. If used we should keep the return counter as previous structure. That means calling `new InputChannelMetrics(parentGroup, networkGroup) instead?

Copy link
Contributor

@zhijiangW zhijiangW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR @azagrebin !

TBH I also do not like the way of involving in multiple metric related parameters when introducing ShuffleService#createResultPartitionWriters/InputGates, and we also talked about this issue before. My previous thought and way was also providing only a parent metric in creation methods, and make create Network and buffers groups inside createResultPartitionWriters/InputGates . But the concerns were that the metric groups are created more than once which was not consistent with previous way and this is also relied on the internal implementations of metric framework, so we gave up that way then.

But in this PR we still create the Network metric group twice which stills the same as my previous thought. I am not very clear what is the new thoughts behind this?

Actually there are two issues in this PR:

  • One is avoiding multiple metric parameters in creation of partition/gate. And if we think there are no concerns for creating one metric group twice, we could only pass the metrics.getIOMetricGroup() to replace all, and create the buffers and Network groups twice inside creatResultPartitionWriters/InputGates separately. The previous metric structure is no need to change for this motivation.

  • Another is we want to change the previous metric structure for shuffle service, that means make previous buffers group under the Network group. I agree with this point which seems more clearly within the same root parent architecture.

I think it is better to make the first issue as a separate hotfix commit.

@zentol
Copy link
Contributor

zentol commented May 27, 2019

And if we think there are no concerns for creating one metric group twice

There are concerns; it prints a warning to the user as it is not how the API is supposed to be used. If any components need access to a shared group then this group should be created once and passed around as needed.
Things were implemented this way on purpose to prevent components from interfering with each other by accident.

@zhijiangW
Copy link
Contributor

Thanks for the replies @zentol .

But in this PR parentGroup.addGroup("Network") is called twice in NetworkEnvironment#createResultPartitionWriters/InputGates. So we should change to create it still in task class and then pass it into createResultPartitionWriters/InputGates separately?

@azagrebin
Copy link
Contributor Author

True, thanks for noticing it @zhijiangW, it was not an intent. I will change it to create group only once. It can be called e.g. NetworkInput/NetworkOutput.

@azagrebin
Copy link
Contributor Author

azagrebin commented Jun 3, 2019

@zentol
I have updated the docs.
Here are the moved metrics:

Deprecated New
Type Scope Infix Name Type Scope Infix Name
IO Task - numBytesInLocal Default shuffle service Task Shuffle.Netty.Input numBytesInLocal
IO Task - numBytesInLocalPerSecond Default shuffle service Task Shuffle.Netty.Input numBytesInLocalPerSecond
IO Task - numBytesInRemote Default shuffle service Task Shuffle.Netty.Input numBytesInRemote
IO Task - numBytesInRemotePerSecond Default shuffle service Task Shuffle.Netty.Input numBytesInRemotePerSecond
IO Task - numBuffersInLocal Default shuffle service Task Shuffle.Netty.Input numBuffersInLocal
IO Task - numBuffersInLocalPerSecond Default shuffle service Task Shuffle.Netty.Input numBuffersInLocalPerSecond
IO Task - numBuffersInRemote Default shuffle service Task Shuffle.Netty.Input numBuffersInRemote
IO Task - numBuffersInRemotePerSecond Default shuffle service Task Shuffle.Netty.Input numBuffersInRemotePerSecond
Network TaskManager Status.Network AvailableMemorySegments Default shuffle service TaskManager Status.Shuffle.Netty AvailableMemorySegments
Network TaskManager Status.Network TotalMemorySegments Default shuffle service TaskManager Status.Shuffle.Netty TotalMemorySegments
Network Task Network.<Input|Output>.<gate|partition> totalQueueLen Default shuffle service Task Shuffle.Netty.<Input|Output>.<gate|partition> totalQueueLen
Network Task Network.<Input|Output>.<gate|partition> minQueueLen Default shuffle service Task Shuffle.Netty.<Input|Output>.<gate|partition> minQueueLen
Network Task Network.<Input|Output>.<gate|partition> maxQueueLen Default shuffle service Task Shuffle.Netty.<Input|Output>.<gate|partition> maxQueueLen
Network Task Network.<Input|Output>.<gate|partition> avgQueueLen Default shuffle service Task Shuffle.Netty.<Input|Output>.<gate|partition> avgQueueLen
Network Task buffers inputQueueLength Default shuffle service Task Shuffle.Netty.Input.Buffers inputQueueLength
Network Task buffers outputQueueLength Default shuffle service Task Shuffle.Netty.Output.Buffers outputQueueLength
Network Task buffers inPoolUsage Default shuffle service Task Shuffle.Netty.Input.Buffers inPoolUsage
Network Task buffers outPoolUsage Default shuffle service Task Shuffle.Netty.Output.Buffers outPoolUsage

@zentol
Copy link
Contributor

zentol commented Jun 5, 2019

I'd prefer if the infix were Shuffle.Netty.Input/Shuffle.Netty.Output, so far we've followed a hierarchical style similarly to out config options.

The PR also needs a rebase.

<th rowspan="12"><strong>Task</strong></th>
<td>numBytesInLocal</td>
<td>The total number of bytes this task has read from a local source.</td>
<td>The total number of bytes this task has read from a local source. Deprecated: see NettyShuffle.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove the description entirely and put a nice attention icon before the deprecation notice.

@rmetzger rmetzger requested a review from zentol June 5, 2019 09:50
</tbody>
</table>

### NettyShuffle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do users actually know what NettyShuffle is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True users do not know about NettyShuffle, we will still have only one at the moment.
It could be Default network shuffle then.

@rmetzger rmetzger requested a review from zentol June 5, 2019 09:52
@azagrebin azagrebin force-pushed the FLINK-12555 branch 2 times, most recently from 134fef6 to 97144b2 Compare June 5, 2019 14:12
final MetricGroup networkGroup = metrics.getIOMetricGroup().addGroup("Network");
final MetricGroup outputGroup = networkGroup.addGroup("Output");
final MetricGroup inputGroup = networkGroup.addGroup("Input");
MetricGroup taskShuffleMetricGroup = networkEnvironment.createTaskMetricGroup(metrics.getIOMetricGroup());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean ShuffleEnvironment interface need provide the method createTaskMetricGroup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to expose TaskShuffleContext ShuffleEnvironment.createTaskShuffleContext(taskName, execId, parentMetricGroup) in ShuffleEnvironment . TaskShuffleContext would contain taskName, execId and created in/output metric group as well. createPartition/Gate methods would take the TaskShuffleContext. This way ShuffleEnvironment can create its own metric group once and other task common parameters are encapsulated.

// similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
final MetricGroup networkGroup = metricGroup.addGroup("Network");
final MetricGroup outputGroup = networkGroup.addGroup("Output");
final MetricGroup inputGroup = networkGroup.addGroup("Input");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use METRIC_GROUP_OUTPUT and METRIC_GROUP_INPUT instead of Output and Input, also might define a class var for Network.


private static final String METRIC_GROUP_NETWORK = "Network";
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated private static final String METRIC_GROUP_NETWORK_DEPRECATED = "Network";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put @Deprecated in separate line?

private static final String METRIC_INPUT_QUEUE_LENGTH = "inputQueueLength";
private static final String METRIC_INPUT_POOL_USAGE = "inPoolUsage";

private NettyShuffleMetricFactory() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor seems not necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an indicator that it is a utility stateless class with only static methods which is not supposed to be instantiated.

@zhijiangW
Copy link
Contributor

Thanks for the updates @azagrebin !
I like the way of metric factory to make NetworkEnvironment simple. Only several nit comments.

@azagrebin azagrebin force-pushed the FLINK-12555 branch 2 times, most recently from 083b0e4 to 46901b2 Compare June 6, 2019 11:06
<td>Gauge</td>
</tr>
<tr>
<td rowspan="2">Shuffle.Netty.Output.buffers</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffers is lower-case here, but upper-case above for input


// we will have to check type of shuffle service later whether it is NetworkEnvironment
//noinspection deprecation
networkEnvironment.registerLegacyNetworkMetrics(metrics.getIOMetricGroup(), resultPartitionWriters, gates);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we intend to remove this in the next release It should be fine to use a cast here, so that we don't have to add such a short-lived method to the interface.

@azagrebin
Copy link
Contributor Author

Thanks for the reviews @zentol @zhijiangW, I've addressed comments
This PR is now based on #8608 which I think needs to be merged first.
While rebasing, I introduced a ShuffleIOOwnerContext which is now created by Task using ShuffleEnvironment.createShuffleIOOwnerContext before creating partitions/gates. NettyShuffleEnvironment.createShuffleIOOwnerContext also creates netty metric groups, only once.

final MetricGroup outputGroup = networkGroup.addGroup("Output");
final MetricGroup inputGroup = networkGroup.addGroup("Input");
ShuffleIOOwnerContext taskShuffleContext = shuffleEnvironment
.createShuffleIOOwnerContext(taskNameWithSubtaskAndId, executionId, metrics.getIOMetricGroup());
Copy link
Contributor

@zhijiangW zhijiangW Jun 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The formatting : shuffleEnvironment.createShuffleIOOwnerContext in one line and make every parameter in separate line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we do not break the arg list, the whole function is on the new line, same as chained calls.

.createShuffleIOOwnerContext(taskNameWithSubtaskAndId, executionId, metrics.getIOMetricGroup());

// produced intermediate result partitions
final ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment.createResultPartitionWriters(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove final for resultPartitionWriters for consistent with other temp vars?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file actually looks like more using finals atm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the vars of taskShuffleContext and following gates are not defined as final. ATM only taskNameWithSubtaskAndId and resultPartitionWriters are final. Or we could make the new added taskShuffleContext as final to keep consistency.


// create the reader and writer structures

final String taskNameWithSubtaskAndId = taskNameWithSubtask + " (" + executionId + ')';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might remove this local var to use taskNameWithSubtask + " (" + executionId + ')' directly below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep it to not mix concerns in one line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could agree your way to keep it. :)

My previous concern is that taskNameWithSubtaskAndId was used in many places before, so it worths defining a local var. Now it is only used in one place and we could reduce one line to make task constructor short to remove it.
Another small consideration is avoiding final issue as above mentioned.

* <p>This method has to be called only once to avoid duplicated internal metric group registration.
*
* @param ownerName the owner name, used for logs
* @param executionAttemptID execution attempt id of the producer or consumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of the producer or consumer -> of the owner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment we have exec id only for tasks/executions, in this context producer or consumer. I thought about owner as a potentially any shuffle IO component within task/execution, possibly even source/sink

checkNotNull(ownerName),
checkNotNull(executionAttemptID),
parentGroup,
nettyGroup.addGroup(METRIC_GROUP_INPUT),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to put these two addGroup covered by above createShuffleIOOwnerMetricGroup then return tuple2 to reference here because all the metrics related operations should be done inside NettyShuffleMetricFactory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it, I think the operation is too primitive atm for an extra utility method. Besides, I think that in general tuple2 is very adhoc and not so readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I also do not like the way of tuple.
But we provide the method of createShuffleIOOwnerMetricGroup in factory, and it is only used for adding group simple. So it might bring confused that which addGroup should be done inside factory and which should be done outside. The scope/rule seems not clear.

registerOutputMetrics(
config.isNetworkDetailedMetrics(),
outputMetricGroup,
outputMetricGroup.addGroup(METRIC_GROUP_BUFFERS),
Copy link
Contributor

@zhijiangW zhijiangW Jun 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to cover all the metric related details in NettyShuffleMetricFactory. I mean the logic of outputMetricGroup.addGroup(METRIC_GROUP_BUFFERS) here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add overloaded methods for this in NettyShuffleMetricFactory.


MetricGroup networkInputGroup = ownerContext.getInputGroup();
@SuppressWarnings("deprecation")
InputChannelMetrics inputChannelMetrics = new InputChannelMetrics(networkInputGroup, ownerContext.getParentGroup());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could delay the creation of InputChannelMetrics in the SingleInputGateFactory#createInputChannels, then the following would be
SingleInputGate inputGate = singleInputGateFactory.create(ownerContext, icdd, partitionProducerStateProvider) instead.
To do so we could make the current createInputGates simple and make all the things done when required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gate creation loop will create InputChannelMetrics multiple times, I would avoid this because InputChannelMetrics are shared by gates at the moment.

registerInputMetrics(
config.isNetworkDetailedMetrics(),
networkInputGroup,
networkInputGroup.addGroup(METRIC_GROUP_BUFFERS),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: for the issue of networkInputGroup.addGroup(METRIC_GROUP_BUFFERS)

@zhijiangW
Copy link
Contributor

Thanks for the updates @azagrebin .
I left some final nit comments relevant with new introduced context.

@azagrebin azagrebin force-pushed the FLINK-12555 branch 2 times, most recently from b23888c to 8f13b9d Compare June 10, 2019 13:02
@azagrebin
Copy link
Contributor Author

Thanks @zhijiangW , I have addressed comments

* @param parentGroup parent of shuffle specific metric group
* @return context of the shuffle input/output owner used to create partitions or gates belonging to the owner
*/
ShuffleIOOwnerContext createShuffleIOOwnerContext(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be useful the partition lifecycle management if this method would also accept the JobID..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shuffle service does not need it at the moment, would be nice actually to keep it decoupled from the job id, I would suggest we discuss it at the corresponding partition lifecycle PR and add it if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see this discussion then: #8687 (comment)

* @param resultPartitionDeploymentDescriptors descriptors of the partition, produced by the owner
* @return collection of the {@link ResultPartitionWriter ResultPartitionWriters}
*/
Collection<P> createResultPartitionWriters(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you considered moving this method and createInputGates into the ShuffleIOOwnerContext, and only pass this into the task instead of the entire ShuffleEnvironment? The task only requires access to these 2 methods, while any other users of the ShuffleEnvironment doesn't need these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting idea, we can consider it as a follow-up issue.

@azagrebin
Copy link
Contributor Author

Rebased on #8680

@zentol zentol merged commit 3c558d8 into apache:master Jun 24, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants