Skip to content

PARQUET-164: Add a counter and increment when parquet memory manager kicks in#120

Closed
dongche wants to merge 10 commits intoapache:masterfrom
dongche:PARQUET-164
Closed

PARQUET-164: Add a counter and increment when parquet memory manager kicks in#120
dongche wants to merge 10 commits intoapache:masterfrom
dongche:PARQUET-164

Conversation

@dongche
Copy link
Contributor

@dongche dongche commented Feb 12, 2015

Add a counter for writers, and increment it when memory manager scaling down row group size.

Hive could use this counter to warn users.

@rdblue
Copy link
Contributor

rdblue commented Feb 12, 2015

@dongche, rather than adding more shims what do you think about adding a callback mechanism? That would allow Hive or other systems to be notified and would be overall less code for both Parquet and Hive to maintain.

@dongche
Copy link
Contributor Author

dongche commented Feb 12, 2015

@rdblue , thanks for your review!

Callback mechanism is a good idea. Not sure MapReduce has api supporting this, it seems only a job completed callback there. Then if we need to implement a callback mechanism by ourselves in Parquet, I think it should be a remote callback, since generating event code in Parquet executes in MapReduce tasks, and it has to notify Hive which run as a MR client. Not sure my idea is ok...
Any suggestions?

This patch use polling. After submitting jobs, Hive/other systems could use MR api to get counters of jobs from JobTracker, and check the "blockDownsize" counter monitored by Parquet memory manager in MR tasks.

@rdblue
Copy link
Contributor

rdblue commented Feb 12, 2015

I think a callback mechanism should be independent of the execution environment. For a given process, you should register callbacks with a memory manager and Parquet shouldn't provide anything else.

To get this working in MR, you'd register the callback at the appropriate time in your OutputFormat. I'd say add the logic to getHiveRecordWriter in this case. This would need to handle ensuring only one counter callback is registered, so the callback API should probably provide named callbacks and a way to query whether a particular callback name is present.

@rdblue
Copy link
Contributor

rdblue commented Feb 12, 2015

Actually, the callback deduplication could easily be done in the MR OutputFormat too, so I could go either way. It wouldn't be a bad thing to be able to check what callbacks are already registered.

@dongche
Copy link
Contributor Author

dongche commented Feb 13, 2015

Really clear! Thanks! I will update the patch later.

One more thinking: It is fine in local mode. But if distributed, say Hive submits a job to write Parquet data, the instance of MemoryManger and the registered instance of Callback are both in a map (or reduce) task process. The HS2 of Hive, which needs callback to warn users, is in another process. In this distributed case, maybe a RPC is needed.

How does this sound? Please correct me if I misunderstand the problem :)

@rdblue
Copy link
Contributor

rdblue commented Feb 14, 2015

The HS2 of Hive, which needs callback to warn users, is in another process. In this distributed case, maybe a RPC is needed.

@brockn can correct me if I'm wrong, but I think the plan was to add the counter support in order to avoid RPC. Hive can monitor the counter as the job runs and give the user feedback as needed.

@dongche
Copy link
Contributor Author

dongche commented Mar 10, 2015

Hi @rdblue, sorry for the late response. I took vacation several days ago, and worked on Hive + Parquet vectorization after vacation...

A new patch is updated, using the callback mechanism instead of add more shims. Could you please help to review it? Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to have a generic callback that is used for counters in Hive, rather than having an API here that is focused around counters. Ideally, the Callback would just be a Runnable. Naming should happen when the callback is registered:

final Counter counter = ...;
MemoryManager.registerScaleCallback( "increment-hive-counter", new Runnable() {
    public void run() {
      counter.increment(1);
    }
} );

@dongche
Copy link
Contributor Author

dongche commented Mar 13, 2015

Hi, @rdblue , I like the suggestion of using a Runnable as a generic callback. Thanks! Patch updated.

@rdblue
Copy link
Contributor

rdblue commented Mar 24, 2015

Thanks @dongche! This looks good to me, although we might want to open up some of the MemoryManager methods for inspection in the future, so the callback can inspect the current scale value. That doesn't need to be part of this though.

I'll commit this soon, but I'll leave it open for a little bit so @isnotinvain and @julienledem to have a chance to comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to use a Callable<MemorManager> or Callable<MemoryManagerInfo> or something like that so the callable can be passed things like the scale value etc.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is good for callback to be able to get values of MemoryManager. Using Callable<T> is ok. Besides that, I was thinking public a method like getCurrentScale in MemoryManger, so that cackback could get wanted value in its implementation through MemoryManger instance.

How does this sound? If this way is also ok, shall we work it in a follow on PR?

@isnotinvain
Copy link
Contributor

I'd vote for making those two minor changes in this PR if you can. The callable could be:

public static class MemoryManagerStats {
  public final double scale;
  // in the future we could add more, w/o breaking our API
}

and use a Callable<MemoryManagerStats>

I think the registerScaleCallBack being strict about duplicates is a quick change and will catch mistake earlier rather than later.

@dongche
Copy link
Contributor Author

dongche commented Mar 27, 2015

Thanks for your review! @isnotinvain

Code updated. Could you take a look again? Thanks.

@rdblue
Copy link
Contributor

rdblue commented Mar 28, 2015

@isnotinvain: did you want the MemoryManagerStats to be passed to the Callable or returned by it? I'd say it would be valuable to pass the stats into the callback, but Callable doesn't allow that, only returning something (something that the manager already as access to). I also wasn't sure if it was worth the trouble to create a special callback class with a scale argument, since the MemoryManager is a singleton. The callback can always inspect it without being handed data, though you might want to add accessors for scale and total allocation.

@dongche
Copy link
Contributor Author

dongche commented Apr 13, 2015

Hi @isnotinvain , @rdblue , are there any changes I could do for this patch?

BTW, the Travis CI failed and seems not related with code changes (test passed locally). So I wanted to retry again. Is there a way to trigger Travis test without pushing code change? Thanks. It seems Travis is only triggered when submit or updated code in PR...

@rdblue
Copy link
Contributor

rdblue commented Apr 13, 2015

@dongche, I think you want to revert the change to use Callable instead of Runnable as I mentioned above. Callable doesn't allow you to pass objects, so you have to make the MemoryManager information public. Using a Callable to return it isn't very valuable, it would only be valuable to pass it to the callback, but you'd need to write a callback interface for that and I don't think that is necessary.

@isnotinvain
Copy link
Contributor

Sorry, @rdblue is right about Callable, I forgot it's the return value. Runnable is fine, or an interface for the inverse of Callable would be fine too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catch RuntimeException not Exception

Exception includes InterruptedException and other things you shouldn't catch by accident.

@dongche
Copy link
Contributor Author

dongche commented Apr 14, 2015

Thanks for the feedback. @rdblue @isnotinvain
Patch updated.

@isnotinvain
Copy link
Contributor

LGTM, +1, though I'm not sure if we're still waiting for the 1.6.0 release before merging to master? @rdblue ?

@rdblue
Copy link
Contributor

rdblue commented Apr 14, 2015

@isnotinvain: yeah, I'd prefer not to merge anything until we do the rename and 1.7.0 branching

Conflicts:
	parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java
@rdblue
Copy link
Contributor

rdblue commented May 15, 2015

@dongche, the package rename has gone in and I'd like to get this merged now. could you rebase it on the current master? Thanks!

@dongche
Copy link
Contributor Author

dongche commented May 18, 2015

Sure. Rebased with current master at https://github.com/apache/parquet-mr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this return an UnmodifiableMap instead of the one used by the memory manager itself?

@dongche
Copy link
Contributor Author

dongche commented May 19, 2015

Address comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants