Skip to content

Fixes map-only jobs to accommodate both an lzo source and sink binary converter#1431

Merged
ulyssepence merged 1 commit intotwitter:developfrom
ulyssepence:map_only_converter
Aug 27, 2015
Merged

Fixes map-only jobs to accommodate both an lzo source and sink binary converter#1431
ulyssepence merged 1 commit intotwitter:developfrom
ulyssepence:map_only_converter

Conversation

@ulyssepence
Copy link
Contributor

The LzoGenericScheme was using the same jobconf key for both the source and sink BinaryConverter, leading to a ClassCastException whenever a map-only job read one type and wrote another using LzoGenericScheme. After the fix, a job like this, having no reduce phase, no longer fails:

val sourceA = LzoGenericSource(ScroogeBinaryConverter[ThriftTypeA], classOf[ThriftTypeA], inputPathA)
val sourceB = LzoGenericSource(ScroogeBinaryConverter[ThriftTypeB], classOf[ThriftTypeB], inputPathB)
val sink    = LzoGenericSource(ScroogeBinaryConverter[ThriftTypeC], classOf[ThriftTypeC], outputPath)

val unioned = TypedPipe.from(sourceA) ++ TypedPipe.from(sourceB)

unioned
  .map {
    case ThriftTypeA(i) => ThriftTypeC(i)
    case ThriftTypeB(i) => ThriftTypeC(i)
  }
  .write(sink)

Unfortunately, it is difficult to create a good test because running this in a local hadoop cluster requires GPL compression libraries to be installed.

@ulyssepence ulyssepence changed the title Fixes map-only jobs to accommodate both an lzo scrooge source and sink binary converter Fixes map-only jobs to accommodate both an lzo source and sink binary converter Aug 26, 2015
@johnynek
Copy link
Collaborator

What if there are two different source types? Also, are we 100% sure cascading cannot write two sinks in the same job?

I thought cascading had a means to keep keys separated on a per tap basis. Are we not reimplementing that?

/cc @cwensel

@johnynek
Copy link
Collaborator

To be more clear: I see how your job has two different source types, but I wonder if there is a reduce phase if this fix would have also worked.

Also, how can we have code that is okay (with respect to GPL) but testing it requires GPL code? Also, I think Hadoop-lzo is GPL, so that module that depends on it has to be GPL (we can dual license it but the Lzo stuff is probably GPL).

@ulyssepence
Copy link
Contributor Author

Oh I'm so dumb I didn't realize GPL was GPL and thought it was some compression suite. What I should have said is it requires the LZO system libraries to be installed.

Mr. @ianoc told me multiple writes in Scalding always lead to multiple jobs, but I can look into it tomorrow.

I don't understand your question about the reducer.

@ianoc
Copy link
Collaborator

ianoc commented Aug 26, 2015

@johnynek if there was a force to disk/reduce phase this would have been ok -- why shard gets people unblocked who've hit it. We aren't re-implemented the multi-source/multi-tap stuff here, we actually still require it.

This code used the same key in the job conf's in inputs and outputs previously, which could cause a collision if one was in the global job conf. Hadoop MR formats use separate keys in the input and output sides for the same thing, so here we are just aligning with that, so it should support any multi-source/tap scenario the same as the built in formats I believe.

That make sense?

@isnotinvain
Copy link
Contributor

@johnynek our (maybe unconfirmed) understanding is that cascading separates / isolates configs for each source but not for each sink, but there can be only one sink (not source) per MR job, so isolated sink configs is not needed / isn't implemented.

@isnotinvain
Copy link
Contributor

We can't write an integration test because of how painful it is to install the native hadoop lzo libraries on mac laptops (though we could probably write a test that only runs in travis). However, we could probably write a unit test that does some setup and then inspects some configs for the right keys?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this object, nor SinkConfigBinaryConverterProvider

just put these vals in:

object ConfigBinaryConverterProvider {
  val sourceKey = "com.twitter.scalding.lzo.converter.provider.source"
  val sinkKey = "com.twitter.scalding.lzo.converter.provider.sink"
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see, you need these so you can have a class to put in the conf. nevermind.

@isnotinvain
Copy link
Contributor

LGTM, think you can write a unit test though? If you skip the parts that actually read data, you can just assert that the configs contain the right keys / values

@ulyssepence
Copy link
Contributor Author

In order to test this without running a local hadoop cluster/using the lzo system binary, I believe we would need to strip away all the elements that would make this a good test (per conversation with @ianoc).

@rubanm
Copy link
Contributor

rubanm commented Aug 27, 2015

LGTM. Given the additional lzo binary setup, adding an e2e test internally for now sgtm.

ulyssepence pushed a commit that referenced this pull request Aug 27, 2015
Fixes map-only jobs to accommodate both an lzo source and sink binary converter
@ulyssepence ulyssepence merged commit 4ebb2d1 into twitter:develop Aug 27, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants