Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new hadoop_jvm module for java/scala hadoop jobs. #9

Merged
merged 3 commits into from
Jan 9, 2013

Conversation

jcrobak
Copy link
Contributor

@jcrobak jcrobak commented Jan 6, 2013

We use scala mapreduce rather than hadoop streaming, so I'm trying to add support for that into luigi. This is a preliminary draft that I was hoping to get some feedback on.

If you don't feel like this belongs in luigi-proper, then we can keep the code in our own place, but it'd be nice to have some of the refactors I did to make it easier to share code. If you think this makes sense in luigi, maybe it'd make sense to have the streaming-specific code subclass the general purpose (poorly named) JvmHadoopJobTask. Thoughts?

commit message:

  • new JvmHadoopJobTask and extracted some common functionality from
    HadoopJobTask into BaseHadoopJobTask
  • moved jobconf logic into the BaseHadoopJobTask
  • bare-bones JvmHadoopJobRunner and extracted common functionality
    from HadoopJobRunner into static methods (submitting/tracking a
    job).

* new JvmJobTask and extracted some common functionality from
  HadoopJobTask into BaseHadoopJobTask
* moved jobconf logic into the BaseHadoopJobTask
* bare-bones JvmHadoopJobRunner and extracted common functionality
  from HadoopJobRunner into static methods (submitting/tracking a
  job).
@erikbern
Copy link
Contributor

erikbern commented Jan 6, 2013

Looks good, I'll take a closer look tomorrow. I definitely think it makes a lot of sense to have alternatives to Hadoop Streaming in Luigi proper, thanks a lot for the pull request.

I have some code for running jars, but it's pretty ugly so I didn't want to add it to Luigi yet. I attached it below just to have something to compare to.

Btw: what is the purpose of the NamedHdfsTarget class?

Btw 2: one thing that I would suggest is to make the output target atomic. The code below achieves that in an ugly way. Not sure what's best.

Btw 3: Is there a specific reason you are using inheritance? As opposed to just putting common code in a function or static method.

import subprocess
import sys, os
import luigi, spotify.luigi

class JavaTask(luigi.Task):
    def run(self):
        def fix_path(x, t):
            if isinstance(x, spotify.luigi.HdfsTarget): # input/output
                if x.exists():
                    return x.path
                else:
                    y = spotify.luigi.HdfsTarget(is_tmp=True)
                    t.append((y, x))
                    return y.path
            else:
                return str(x)

        tmp_files = []
        args = [fix_path(x, tmp_files) for x in self.args()]
        jar = self.jar()
        for d in ['.', '/usr/share/spotify-rec-sys-java']:
            f = os.path.join(d, jar)
            if os.path.exists(f):
                jar = f
                break
        else:
            raise Exception('no jar found')
        java_class = self.java_class()

        cmd = ' '.join(['hadoop', 'jar', jar, java_class] + args)

        print cmd
        process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)

        for line in process.stdout:
            print >> sys.stderr, '---', line.strip()

        if process.wait():  # wait for termination
            raise 'java process returned with nonzero status'

        for a, b in tmp_files:
            a.move(b)

    def jar(self):
        return 'rec-sys-java-1.0-SNAPSHOT.jar'

@jcrobak
Copy link
Contributor Author

jcrobak commented Jan 6, 2013

Looks good, I'll take a closer look tomorrow. I definitely think it makes a lot of sense to have alternatives to Hadoop Streaming in Luigi proper, thanks a lot for the pull request.

Cool, good to hear. We also use pig and hive and I'd want to add support for those if it sounds good.

I have some code for running jars, but it's pretty ugly so I didn't want to add it to Luigi yet. I attached it below just to have something to compare to.

Nice, thanks for sharing.

Btw: what is the purpose of the NamedHdfsTarget class?

D'oh, I went through a lot of iterations of the code and forgot to clean some things up before sending, sorry! NamedHdfsTarget is from one of those old iterations and can be removed (the idea was to use it as a named argument to the MR job, e.g. --myInput1 /path/to/input1, but I realized that wasn't very useful)

Btw 2: one thing that I would suggest is to make the output target atomic. The code below achieves that in an ugly way. Not sure what's best.

It's a bit tricky to do this in a generic way, since many of our jobs just expect a parameter that's an output directory. Some even write to multiple directories. I'll have to think about it some more. FWIW, we tend to use _SUCCESS flags to signify when data is available, since we make heavy use of S3, which doesn't have atomic renames.

Btw 3: Is there a specific reason you are using inheritance? As opposed to just putting common code in a function or static method.

I think this is the major design decision. I can see it going three ways:

  1. Have a separate Task for streaming and one for regular jars. Have a different JobRunners for each to setup args, and have some static methods to do the common things like number of reducers, cachefiles, jobconfs, and other common stuff.
  2. Make a base HadoopJarTask and have the stream job task extend it. Move the logic for constructing args into the Tasks themselves. i.e. HadoopJarTask would just have an args method to be overridden, and StreamingJarTask would override args and setup the args similar to what HadoopJarRunner currently does (the streaming specific stuff like -input and -output)
  3. Keep the Tasks as simple definitions (as they are now), but have a Meta class for each that knows how to extra args and setup the command line + do any of the pre-post task cleanup (e.g. creating tmp-staging and moving back into place).

My current implementation is a start at 2, but it's not complete... it's kind of a 1&2 hybrid. Just to add some context, a hive invocation is really another hadoop jar command, so I could also see that as a subclass of a base HadoopJarTask. You can also imagine a HDFS Balancer task and a few others.

I come from a java background where inheritance is heavily used, and I'm not super familiar with the approaches you use throughout the codebase, so I'm really eager to hear your thoughts.

@erikbern
Copy link
Contributor

erikbern commented Jan 7, 2013

2013/1/6 Joe Crobak notifications@github.com

Looks good, I'll take a closer look tomorrow. I definitely think it makes
a lot of sense to have alternatives to Hadoop Streaming in Luigi proper,
thanks a lot for the pull request.

Cool, good to hear. We also use pig and hive and I'd want to add support
for those if it sounds good.

Nice! We also have some code for Pig and Hive, but so far we thought it's
slightly too ugly to merge into Luigi proper :)

I have some code for running jars, but it's pretty ugly so I didn't want
to add it to Luigi yet. I attached it below just to have something to
compare to.

Nice, thanks for sharing.

Btw: what is the purpose of the NamedHdfsTarget class?

D'oh, I went through a lot of iterations of the code and forgot to clean
some things up before sending, sorry! NamedHdfsTarget is from one of those
old iterations and can be removed (the idea was to use it as a named
argument to the MR job, e.g. --myInput1 /path/to/input1, but I realized
that wasn't very useful)

Btw 2: one thing that I would suggest is to make the output target atomic.
The code below achieves that in an ugly way. Not sure what's best.

It's a bit tricky to do this in a generic way, since many of our jobs just
expect a parameter that's an output directory. Some even write to multiple
directories. I'll have to think about it some more. FWIW, we tend to use
_SUCCESS flags to signify when data is available, since we make heavy use
of S3, which doesn't have atomic renames.

By experience it can be very painful to have non-atomic operations. You end
up with things left in a weird state and have to clean up things manually.

We have been running a bunch of stuff on S3 too. One thing you can do is to
define a new Target class whose exists() method explicitly checks for a
_SUCCESS file. I think we used to do that. We also took it a step further
and added a dummy file that was called something like
_broken_file_this_means_that_the_job_didnt_complete. It contained broken
data and was removed if the job exited successfully. The purpose of it was
to make sure subsequent jobs didn't run on partial data.

A design goal for Luigi has been to try to make things atomic unless it's
impossible, so I think it's good to think a bit about how to achieve it. In
my jar job class I implemented it by swapping out all HdfsTarget objects,
but that's arguably a bit of a hack.

Btw 3: Is there a specific reason you are using inheritance? As opposed
to just putting common code in a function or static method.

I think this is the major design decision. I can see it going three ways:

  1. Have a separate Task for streaming and one for regular jars. Have a
    different JobRunners for each to setup args, and have some static methods
    to do the common things like number of reducers, cachefiles, jobconfs, and
    other common stuff.
  2. Make a base HadoopJarTask and have the stream job task extend it.
    Move the logic for constructing args into the Tasks themselves. i.e.
    HadoopJarTask would just have an args method to be overridden, and
    StreamingJarTask would override args and setup the args similar to
    what HadoopJarRunner currently does (the streaming specific stuff like
    -input and -output)
  3. Keep the Tasks as simple definitions (as they are now), but have a
    Meta class for each that knows how to extra args and setup the command line
  4. do any of the pre-post task cleanup (e.g. creating tmp-staging and moving
    back into place).

My current implementation is a start at 2, but it's not complete... it's
kind of a 1&2 hybrid. Just to add some context, a hive invocation is
really another hadoop jar command, so I could also see that as a subclass
of a base HadoopJarTask. You can also imagine a HDFS Balancer task and a
few others.

I come from a java background where inheritance is heavily used, and I'm
not super familiar with the approaches you use throughout the codebase, so
I'm really eager to hear your thoughts.

I don't have strong feelings about any of those options. I think
inheritance can be great, but it can lead to less maintainable code down
the road. Everything else equal I think it's good have some justification
every time inheritance over sharing functions & static methods.

Honestly I think Luigi in the current state could do remove some of the
inheritance. Your patch is pretty small so we could just refactor stuff
later if it's necessary.

Btw when you mean meta class you mean having a bunch of static methods like
a namespace right? I think it's always good to make any static method
explicit by adding the @staticmethod decorator in Python.

I'll take another look at your patch set later today!


Reply to this email directly or view it on GitHubhttps://github.com//pull/9#issuecomment-11936416.

@erikbern
Copy link
Contributor

erikbern commented Jan 7, 2013

I talked to Elias Freider (the other maintainer, and usually full of OOP wisdom). He thought your patch looked good, so that's great.

From my perspective the remaining question is whether we can make the task atomic.

@jcrobak
Copy link
Contributor Author

jcrobak commented Jan 7, 2013

Sounds good re. atomic operations. The only caveat is that we need to have the temp output location on the same filesystem as our output (i.e. not /tmp) since you can't mv between filesystems.

Let me put together some cleanups (i.e. remove NamedHdfsTarget and add an example) and update this PR. I think I can incorporate some version of your atomic stuff into this changeset. I'm also thinking of renaming JvmHadoop -> HadoopJar, which follows more closely with what it actually means.

re. meta, I was thinking of a proper class so that you could have inheritance to keep things DRY.

@erikbern
Copy link
Contributor

erikbern commented Jan 7, 2013

2013/1/7 Joe Crobak notifications@github.com

Sounds good re. atomic operations. The only caveat is that we need to have
the temp output location on the same filesystem as our output (i.e. not
/tmp) since you can't mv between filesystems.

The File class in luigi/file.py (or LocalTarget as it's confusingly called
too) implements atomic operations by just adding a random suffix to the
path. This is in order to do atomic file operations.

Let me put together some cleanups (i.e. remove NamedHdfsTarget and add an
example) and update this PR. I think I can incorporate some version of your
atomic stuff into this changeset. I'm also thinking of renaming JvmHadoop->
HadoopJar, which follows more closely with what it actually means.

Yeah, that name makes sense.

It might be hard to figure out a clean way to do atomic file operations.
I'll think a bit about it.

re. meta, I was thinking of a proper class so that you could have
inheritance to keep things DRY.


Reply to this email directly or view it on GitHubhttps://github.com//pull/9#issuecomment-11959180.

@erikbern
Copy link
Contributor

erikbern commented Jan 7, 2013

Oops, what I meant was this is in order to do keep it in the same file system :)

* renames hadoop_jvm -> hadoop_jar, and JvmHadoop* to HadoopJar*.
* remove NamedHdfsTarget
* New example for terasort/teragen.
@jcrobak
Copy link
Contributor Author

jcrobak commented Jan 7, 2013

Here's an updated PR with some examples for discussion (also hoping for feedback on those -- curious how you guys propagate params like that).

One way I can envision doing the atomic operations is to use something like from your code snippet in HadoopJarJobRunner to replace the implementation of job.output() before calling job.args. That seems pretty icky to me, though.

The other option would be to have some other method that parallels job.output() but returns a paths with .tmp-random-number appended to them. It'd be up to the user to use job.output() or job.atomicOutput() (or whatever that method would be called), though.

@erikbern
Copy link
Contributor

erikbern commented Jan 7, 2013

I thought about it a bit more. In the example I attached above:

  • You don't have to touch the internals of HdfsTarget (just do self.output() instead of self.output().path)
  • HdfsTarget is treated specially when returned as a part of args(). Depending on how you see it, it's a convenience thing or an utter disrespect of OOP principles
  • If you want to make it cleaner, you could in principle have an abstract method on the Target class that takes any Target object and returns a similar one that is temporary. Though I'm not sure if you ever want to return an object in args() that doesn't inherit from HdfsTarget

You could also replace the output() method as you pointed out but I think the problem still persist because you need to either inspect the type of it, or add some generic method on a Target level that returns a "tempified" version. Another alternative is to assume that the output() method always returns HdfsTarget types, which might be a reasonable approach.

I think the same thing applies to having an atomic_output() method as well. With the caveat that the code isn't atomic if the user forgets to use atomic_output() instead of output(). Otoh we wouldn't have to replace output() in runtime which is a great benefit.

I'll check tomorrow with some coworkers to see what they think about these options.

I probably seem crazy for being so concerned about the atomicity, but I've seen non atomic code leading to really bad problems in the past. Like realizing a month later that we are missing lots of data :)

Btw your example looks great! It's a little nonstandard since it takes raw paths as parameters. I actually never realized you can do that, probably because all our tasks map to specific paths in Hdfs and I even do that for ad hoc jobs. But I like the fact that you do it that way – it shows that there's some flexibility.

One thing that may not be obvious is that there is a convenience method called input() on the Task class. I.e. instead of

return [self.requires().output().path, self.output().path]

you can do

return [self.input().path, self.output().path]

And just to be clear in the jar code example I attached above you can also drop the .path if you want to

return [self.input(), self.output()]

@freider
Copy link
Contributor

freider commented Jan 8, 2013

Sorry for being late to the discussion. Awesome to have you here jcrobak!

I might have missed something, but in order to achieve some sort of atomicity on this task (as well as "improved" atomicity on streaming tasks), I suggest either adding another flag to the HdfsTarget constructor: HdfsTarget(path, with_success_file=False) or subclassing HdfsTarget to something like MapRedTarget, both with the change that they check the existance of the _SUCCESS flag file in the exists()-method. Then we would also have to add something in the run()-method to remove any existing outputs if they don't contain the _SUCCESS-file (indicating a previously failed attempt that hasn't been cleaned up).

What do you think? I might have missed something here.

As for the inheritance, I think a baseclass for MapReduce jobs that contains overridable common methods, variables and access to hadoop configuration would be nice and your patch is a good step in that direction. On top of that we could have both HadoopJar, HadoopStreaming, HiveJob and PigJob classes that make use of the same common attributes.
As a side note, in the near future I would like to get rid of the JobRunner class and integrate the options and configuration handling from that class into the BaseHadoopJob class instead. What do you think?

@erikbern
Copy link
Contributor

erikbern commented Jan 8, 2013

One problem with having a with_success_file flag is that the default value would be to have non-atomic file operations.

Another issue with _SUCCESS files is that it doesn't integrate well with external tasks that are built somewhere outside Luigi.

Both of these issues can be resolved by flipping it around so that instead we have a _NOT_DONE file that signals that the output is not finished yet.

But another problem remains. Where do you commit the transaction? I just started thinking that maybe commit() should be a method on any Target. So that just doing a close() doesn't commit it. However all targets that are returned in the output() methods would be autocommitted if the task finishes successfully.

Doing it this way would also mean that we don't have to deal with any special flag files for LocalTarget and HdfsTarget because we can still support atomic writes by file renames.

The only downside of this would be that if you are doing file operations outside of a task, then you have to remember to explicitly commit() the target once you are done. On the other hand we could remove the need to perform close() on the file handle so for a Target object. So for code run within a Task that manually opens/writes file this might even lead to shorter code.

Another advantage is that tasks that write multiple output would be almost transactional (except in the rare case where the code fails while looping over the output and committing them, but if you want to take it to the extreme I guess you could implement multi-phase locking).

What do you think about this idea? It would be a pretty fundamental change in how the Target object works, but otoh I think the patch wouldn't be very big.

@freider
Copy link
Contributor

freider commented Jan 8, 2013

What I meant was that the with_success_file flag (or subclass) would only be used to specify outputs of hadoop jobs where we are sure that the jobs themselves create _SUCCESS files (like standard MapReduce jobs).
That way we can still use the non-flagged target for files created in other ways (which would have to be atomic some other way).
I think it could work quite nicely with _SUCCESS checks, but adding a commit() method on targets could have other advantages. Right now a lot of "transaction close" code lives in close() methods of targets which doesn't make as much semantical sense as having it in a commit() method if the file is never "opened" from the python task code...

A problem with using a _NOT_DONE-like structure is that it could be hard to satisfy atomicity in the writing of this flag file for MapReduce jobs since a job will create its output directory itself and not from python code (possible race condition). There is also the risk that something goes wrong when writing this file and then there is a risk that incomplete data gets processed.

Here's a suggestion that mixes both solutions:

  • We introduce the commit() method on Targets and call this on all outputs at the end of a successful task execution.
  • We create two different HdfsTargets:
    • one which uses _SUCCESS checks in exists() and writes the _SUCCESS file in the commit() method if it doesn't exist. This would be required for any directories created on S3, except single file uploads which should be atomic anyway.
    • one which just checks file existence (and can thus be used for externally created/uploaded files), and uses renames for atomicity in writes using the commit() method (writes would not work on S3)
  • I think two different Target classes with some kind of inheritance would be the cleanest, but using a construction flag would also be an option.

Is this too complicated?

@jcrobak
Copy link
Contributor Author

jcrobak commented Jan 8, 2013

@freider responded as I was typing this up, but it's probably worth my input, too.

re. success flags, I think luigi needs to support both successflags and not -- at least we need it to :). The way I was envisioning this was that HdfsTarget would take a flag param (default to None), and exists would check if the flag inside the directory exists if it's not None, and otherwise if the directory exists (this is how Oozie does it). Here are some use cases I have in mind:

  1. Many of our MapReduce jobs output to s3. We could write to a staging directory, but there's no true atomic rename in S3. If I use the hadoop api to rename the directory "foo.tmp" to "foo", hadoop is really moving all the files inside of "foo.tmp" to "foo" rather than doing a directory move. This process can take minutes, so it's not even pseudo-atomic. In this case, _SUCCESS flags are a must.
  2. Some of our datasets generate data for a hive partition by directly writing into that tables directory, but hive doesn't know about the new data until you issue an ALTER TABLE ... ADD PARTITION clause -- the way we've been handling this is to drop a _HIVE flag to signify that the data is available in hive.

Re. the _NOT_DONE flag, I echo @freider here wrt to atomicity. it's even more complicated if you consider pig and hive as well as mapreduce.

re. BaseHadoopJob and inheritance -- I agree with @frieder's suggestions. I can take some time to reorganize things as part of this PR or do it as a follow up (I'd slightly prefer the latter since the scope of the changeset seems to be growing larger).

How do you guys feel about limiting the scope of this changeset a bit? I have some local changes to introduce the atomicity stuff that @erikbern and I discussed, I can add that in (i'm assuming if we later added flag support, we'd only apply the rename stuff when the Target didn't have a flag). Then we can do the flag handling and the hierarchy as follow ups?

But iff you'd rather that I roll them all in together, though, just let me know.

@erikbern
Copy link
Contributor

erikbern commented Jan 8, 2013

Great discussion. Seems like we're discussing three just slightly related things:

  1. Adding the HadoopJarJob. Seems pretty uncontroversial at this point, I think we should just merge the pull request
  2. Whether or not to use file flags to check for atomicity, whether to have two separate classes. I need to think about this a more. IMO just because S3 doesn't support atomic renames, there's no reason for us not to implement for HDFS. And same thing regarding having two different target classes – everything else equal, if we can avoid it in HDFS then let's try to. So I'm still thinking we should do some renaming scheme but we could do it as proposed in the next point:
  3. To refactor the support for transactions by moving it from close() on the file handle to commit() on the Target object. I'll try to put together a patch in the next few days to see if this is a viable approach. Seems like it could lead to cleaner code, some crude support for transactions, etc.

@jcrobak
Copy link
Contributor Author

jcrobak commented Jan 8, 2013

Ok, cool. re. #1, Do you want me to push up my fixes for atomic renames before merging? Also, a #4 would be a refactoring of the hadoop job / job runner code that @freider proposed.

@erikbern
Copy link
Contributor

erikbern commented Jan 8, 2013

8 jan 2013 kl. 16:36 skrev Joe Crobak notifications@github.com:

Ok, cool. re. #1, Do you want me to push up my fixes for atomic renames

Sure if you have some code to do it, why not, but it's up to you. Getting atomicity to work well was much harder than I thought so let's not make it a req of this PR :)
before merging? Also, a #4 would be a refactoring of the hadoop job / job runner code that @freider proposed.

I think Freider wanted to do that at some point anyway, so probably nothing required for now. But let's see what he thinks.


Reply to this email directly or view it on GitHub.

* atomic file renames.
* more docs.
* cleaned up example args.
erikbern pushed a commit that referenced this pull request Jan 9, 2013
new hadoop_jvm module for java/scala hadoop jobs.
@erikbern erikbern merged commit e20e5e1 into spotify:master Jan 9, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants