-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink 1.19: Run without Hadoop #7369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
7b88a34 to
4bf96fc
Compare
| ConfigOptions.key("table.exec.iceberg.expose-split-locality-info") | ||
| .booleanType() | ||
| .noDefaultValue() | ||
| .defaultValue(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is required:
py4j.protocol.Py4JJavaError: An error occurred while calling o42.executeSql.
: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configurable
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at org.apache.iceberg.hadoop.Util.usesHadoopFileIO(Util.java:122)
at org.apache.iceberg.hadoop.Util.mayHaveBlockLocations(Util.java:92)
at org.apache.iceberg.flink.source.SourceUtil.isLocalityEnabled(SourceUtil.java:43)
at org.apache.iceberg.flink.source.FlinkSource$Builder.buildFormat(FlinkSource.java:260)
at org.apache.iceberg.flink.source.FlinkSource$Builder.build(FlinkSource.java:272)
at org.apache.iceberg.flink.source.IcebergTableSource.createDataStream(IcebergTableSource.java:128)
at org.apache.iceberg.flink.source.IcebergTableSource.access$200(IcebergTableSource.java:55)
at org.apache.iceberg.flink.source.IcebergTableSource$1.produceDataStream(IcebergTableSource.java:209)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:163)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.java:99)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.java:205)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.java:127)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1296)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1138)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configurable
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
... 67 more
16681f2 to
dbe6ecb
Compare
dbe6ecb to
b4d3887
Compare
This reverts commit c088745.
857e208 to
bf64429
Compare
|
@stevenzwu @pvary do you have time to get some eyes on this one? |
|
|
||
| private FileIO fileIO(Table table) { | ||
| if (table.io() instanceof HadoopConfigurable) { | ||
| if (HadoopDependency.isHadoopCommonOnClasspath(SerializableTable.class.getClassLoader()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when the hadoop is not on the classpath, but the table is HadoopConfigurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No configuration will be passed through.
| "There should be a hive-site.xml file under the directory %s", | ||
| hiveConfDir); | ||
| newConf.addResource(new Path(hiveConfDir, "hive-site.xml")); | ||
| public static Object clusterHadoopConf() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this better in the HadoopUtil?
|
|
||
| org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); | ||
| org.apache.hadoop.conf.Configuration hadoopConf = | ||
| (org.apache.hadoop.conf.Configuration) FlinkCatalogFactory.clusterHadoopConf(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So FlinkSQL will still need hadoop on the classpath?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with Peter that this seems problematic, as the return value can be null
| } | ||
|
|
||
| static TableLoader fromHadoopTable(String location, Configuration hadoopConf) { | ||
| static TableLoader fromHadoopTable(String location, Object hadoopConf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hate this part of the change.
We have a public API where type is not defined.
Do we have any better solution for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of the ParquetConfiguration that Parquet-Java did: apache/parquet-java#1141 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems quite a bit of effort to define a IcebergHadoopConfiguration class like parquet-java.
another possible option is to add a new overloaded method with Flink Configuration arg. Flink provides a util method for the conversion.
public class HadoopUtils {
@SuppressWarnings("deprecation")
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration)
core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java
Show resolved
Hide resolved
| public SerializableConfiguration(Object hadoopConf) { | ||
| this.hadoopConf = (Configuration) hadoopConf; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the point here? When there's no Hadoop one classpath then it will blow up no matter what, right?
Additionally explicit casts are just brittle. This question applies to all other such places where Object is passed.
In Flink this is solved in a way that Hadoop specific class usages are protected with isHadoopCommonOnClasspath and that's it, works like charm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also interested in the answer to Gabor's question.
also wondering if we can get overload ambiguity from the two constructors?
| ConfigOptions.key("table.exec.iceberg.expose-split-locality-info") | ||
| .booleanType() | ||
| .noDefaultValue() | ||
| .defaultValue(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a backwards incompatible change. I'm not sure how widely it's used, but we need to bdo some research and be more vocal about this change, if we decide to go ahead with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with @pvary that this changes the default behavior, which calls Util.mayHaveBlockLocations(table.io(), table.location()) from Hadoop module to figure out if locality is enabled for hdfs scheme.
would it work if we add the isHadoopCommonOnClasspath check at the beginning of the Util#mayHaveBlockLocations class in Hadoop module? return false if Hadoop common not on class path?
public static boolean mayHaveBlockLocations(FileIO io, String location) {
if (usesHadoopFileIO(io, location)) {
InputFile inputFile = io.newInputFile(location);
if (inputFile instanceof HadoopInputFile) {
String scheme = ((HadoopInputFile) inputFile).getFileSystem().getScheme();
return LOCALITY_WHITELIST_FS.contains(scheme);
} else {
return false;
}
}
return false;
}
| return hadoopConf; | ||
| } | ||
|
|
||
| public Configuration getClone() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe name this method just as config().
getCone can look like a clone of this SeriazableConfiguration class.
| public SerializableConfiguration(Object hadoopConf) { | ||
| this.hadoopConf = (Configuration) hadoopConf; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also interested in the answer to Gabor's question.
also wondering if we can get overload ambiguity from the two constructors?
|
|
||
| org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); | ||
| org.apache.hadoop.conf.Configuration hadoopConf = | ||
| (org.apache.hadoop.conf.Configuration) FlinkCatalogFactory.clusterHadoopConf(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with Peter that this seems problematic, as the return value can be null
| ConfigOptions.key("table.exec.iceberg.expose-split-locality-info") | ||
| .booleanType() | ||
| .noDefaultValue() | ||
| .defaultValue(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with @pvary that this changes the default behavior, which calls Util.mayHaveBlockLocations(table.io(), table.location()) from Hadoop module to figure out if locality is enabled for hdfs scheme.
would it work if we add the isHadoopCommonOnClasspath check at the beginning of the Util#mayHaveBlockLocations class in Hadoop module? return false if Hadoop common not on class path?
public static boolean mayHaveBlockLocations(FileIO io, String location) {
if (usesHadoopFileIO(io, location)) {
InputFile inputFile = io.newInputFile(location);
if (inputFile instanceof HadoopInputFile) {
String scheme = ((HadoopInputFile) inputFile).getFileSystem().getScheme();
return LOCALITY_WHITELIST_FS.contains(scheme);
} else {
return false;
}
}
return false;
}
| } | ||
|
|
||
| static TableLoader fromHadoopTable(String location, Configuration hadoopConf) { | ||
| static TableLoader fromHadoopTable(String location, Object hadoopConf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems quite a bit of effort to define a IcebergHadoopConfiguration class like parquet-java.
another possible option is to add a new overloaded method with Flink Configuration arg. Flink provides a util method for the conversion.
public class HadoopUtils {
@SuppressWarnings("deprecation")
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration)
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
|
@Fokko I wonder why this is closed. I face this issue when submitting jobs to AWS managed Flink. |
|
Same, for some reason Flink requires Hadoop and yet does not come with it? We're using the Kubernetes Operator and would like to avoid customizing the image to fix this bug. |
|
What was the last status for this PR, as in, how much work is left before it can be reviewed? |
Allow Flink to run without Hadoop
This PR aims to remove Hadoop's
Configurationclass from the main code path, so we can also run Flink without having the Hadoop JARs on the Java Classpath.Testing
Testing is still pending. This PR focusses on read operations. For write operations, upstream changes need to be done to Parquet-MR. With the main focus on the ParquetWriter class: https://github.com/apache/parquet-mr/blob/4bf606905924896403d25cd6287399cfe7050ce9/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java#L25
Resolves #7332
Resolves #3117