-
Notifications
You must be signed in to change notification settings - Fork 16.2k
[AIRFLOW-2085] Add SparkJdbc operator #3021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-2085] Add SparkJdbc operator #3021
Conversation
Codecov Report
@@ Coverage Diff @@
## master #3021 +/- ##
==========================================
- Coverage 73.18% 73.17% -0.01%
==========================================
Files 177 177
Lines 12412 12412
==========================================
- Hits 9084 9083 -1
- Misses 3328 3329 +1
Continue to review full report at Codecov.
|
|
👏 |
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM in the greater scheme of things, some small changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log should be already available:
https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L2023
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should generalise the operator it a not make it specific to Postgresql
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we condense this a bit?
reader = spark.read
reader = set_common_options(reader, url, jdbc_table, user, password, driver)
to
reader = set_common_options(spark.read, url, jdbc_table, user, password, driver)
mutable variables make me cry in general, but I don't see a nicer way to do this in Python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe change this. I guess this is legacy from sqoop. This part always made me cry since import and export does not really describe the direction of the data. Now we have the opportunity to fix this, my suggestion would be to change this to spark_to_sql and sql_to_spark to describe the direction of the flow 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed them now to spark_to_jdbc and jdbc_to_spark to avoid any confusion with spark-sql
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect, thanks! 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now these tests are sufficient for me. Maybe in the future we can try to actually perform a spark to postgres job, and assert the result :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be a nice test indeed :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why dont you add one @danielvdende ? Postgres is already available, if you launch a stand-alone spark job it should be possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would also require the Spark binary, which is quite heavy, but I really like the idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I'll try to get it working. Just to check I've understood you correctly, you guys mean to use the postgres instance that is started as part of the Travis CI config? (in that case, we could even consider doing the same for MySQL)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try it, but dont over complicate. It’s more an integration test and we should only run those once and not for every setup of Travis. So it might be more for the future. @Fokko can we merge as is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let me merge it. I've checked it and it looks good to me.
I really like this operator. This PR introduces a new layer of operators. Instead of having low level operations that just do one thing, we leverage the existing SparkSubmitOperator to add a new layer of logic.
Add the Spark JDBC hook/operator pair. This pair extends the existing SparkSubmitOperator. Also includes the spark_jdbc_script, which is the actual Spark job that is run.
b783dee to
13b5c56
Compare
Add the Spark JDBC hook/operator pair. This pair extends the existing SparkSubmitOperator. Also includes the spark_jdbc_script, which is the actual Spark job that is run. Closes apache#3021 from danielvdende/AIRFLOW-2085-add- spark-jdbc-operator
Hi all,
This PR is to add a Spark JDBC hook/operator pair. I've personally started using it to replace
Sqoop, and have found it much more stable and reliable. Thought it would be nice to make it
available to the community. Curious to hear your thoughts.
JIRA
Description
Add the Spark JDBC hook/operator pair. This pair extends
the existing SparkSubmitOperator. Also includes the
spark_jdbc_script, which is the actual Spark job that is run.
Tests
Commits
My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
Passes
git diff upstream/master -u -- "*.py" | flake8 --diff