Add pickle protocol option support for Spark #3001
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
Using spark module may become not working if python version is different between client and spark cluster. pickle module uses default protocol changed by python version in dump and dumps method. If Luigi task which inherits PySparkTask is picked in client and then it's unpicked in spark cluster and the pickle default protocol is different, pickle.load() causes AttributeError.
Default pickle protocol is defined here https://docs.python.org/3/library/pickle.html#pickle-protocols
This change enables users to specify pickle protocol version in luigi configuration file according to their spark cluster environment.
Motivation and Context
I was trying to run Sample task inheriting PySparkTask from client(python3.8 is installed) to spark cluster(python3.6 is installed).
Then I got the following error.
This can happen if pickle default protocol is different between client and cluster like client using python3 but cluster using python2 or like client 3.7 and cluster 3.8.
In my case I can't change cluster's python version, and think some people is in similar situation like me.
Have you tested this? If so, how?
I ran my jobs with this code and it works for me.