Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure Databricks java.lang.ClassNotFoundException: com.twosigma.flint.timeseries.TimeSeriesRDDBuilder #55

Open
josephd000 opened this issue May 26, 2021 · 6 comments

Comments

@josephd000
Copy link

Error

# Step 2: specify how the Spark dataframe should be interpreted as a time series by Flint
ts_rdd <- fromSDF(sdf, is_sorted = TRUE, time_unit = "SECONDS", time_column = "t")
Error : java.lang.ClassNotFoundException: com.twosigma.flint.timeseries.TimeSeriesRDDBuilder

Expectation

That I can use basic sparklyr.flint functions on Azure Databricks without classpath errors by using install.packages("sparklyr.flint").

Details

I've created a "Library" with flint-0.6.0 from Maven and installed it onto my cluster, detached and reattached my notebook, called library(sparklyr.flint) before spark_connect() and it still can't find the library.

Config

  • Databricks 7.3 ML LTS (1 driver, 1 worker)
  • Spark 3.0.1
  • sparklyr 1.6.2
  • sparklyr.flint 0.2.1

Reproducible code

install.packages("sparklyr")
install.packages("sparklyr.flint")
library(sparklyr)
library(sparklyr.flint)

# Step 0: decide which Spark version to use, how to connect to Spark, etc
# spark_version <- "3.0.0"
Sys.setenv(SPARK_HOME = "~/spark/spark-3.0.1-bin-hadoop3.2")
sc <- spark_connect(method = "databricks")

example_time_series <- data.frame(
  t = c(1, 3, 4, 6, 7, 10, 15, 16, 18, 19),
  v = c(4, -2, NA, 5, NA, 1, -4, 5, NA, 3)
)

# Step 1: import example time series data into a Spark dataframe
sdf <- copy_to(sc, example_time_series, overwrite = TRUE)

# Step 2: specify how the Spark dataframe should be interpreted as a time series by Flint
ts_rdd <- fromSDF(sdf, is_sorted = TRUE, time_unit = "SECONDS", time_column = "t")
Error : java.lang.ClassNotFoundException: com.twosigma.flint.timeseries.TimeSeriesRDDBuilder Error : java.lang.ClassNotFoundException: com.twosigma.flint.timeseries.TimeSeriesRDDBuilder
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
	at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:264)
	at sparklyr.StreamHandler.handleMethodCall(stream.scala:106)
	at sparklyr.StreamHandler.read(stream.scala:61)
	at sparklyr.BackendHandler.$anonfun$channelRead0$1(handler.scala:58)
	at scala.util.control.Breaks.breakable(Breaks.scala:42)
	at sparklyr.BackendHandler.channelRead0(handler.scala:39)
	at sparklyr.BackendHandler.channelRead0(handler.scala:14)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
@yitao-li
Copy link
Contributor

yitao-li commented May 27, 2021

@josephd000 I think that is a good question for the Databricks folks.

My understanding is there are some extra levels of indirection with Spark connection when working with a Databricks cluster and also some form of jar file loading logic built into Databricks runtime which is entirely proprietary, so you will need some additional steps to make it work on a Databricks cluster.

Meanwhile if I do find something simple that make the Databricks use case work I'll let you know.

@josephd000
Copy link
Author

@yitao-li , I went digging through the sparklyr.flint code and found the non-exported function, sparklyr.flint:::spark_dependencies(). Running this, it returned:

sparklyr.flint:::spark_dependencies(spark_version = "3.0.1", scala_version = "2.12")
$jars
NULL

$packages
[1] "org.clapper:grizzled-slf4j_2.12:1.3.4"      "org.sparklyr:sparklyr-flint_3-0_2-12:0.7.0"

$initializer
NULL

$catalog
NULL

$repositories
[1] "https://github.com/org-sparklyr/sparklyr.flint/raw/maven2"

attr(,"class")
[1] "spark_dependency"

I then created those "Libraries" on Databricks by passing in the "packages" and "repositories" where the Databricks Library GUI asks for "Coordinates" and "Repository", respectively. After installing these two "Libraries" on my cluster, I was able to successfully use sparklyr.flint::from_sdf()! :)

@yitao-li
Copy link
Contributor

@josephd000 Good to know! 👍
I guess I can look into whether those things can be streamlined a bit for Databricks clusters. In all other scenarios (e.g., working with a EMR cluster or running Spark in local mode) all dependencies are taken care of automatically based on what sparklyr.flint:::spark_dependencies() returns. I think sparklyr is trying to do the same with Databricks connection as well but probably installed the jar files to the wrong location somehow.

@kehldaniel
Copy link

I have the same issue with Spark 3.1.1, Scala 2.12, Sparklyr 1.7.1 and Sparklyr.flint 0.2.1. I don't think I can install libraries on the cluster, I hope there will be some smooth solution soon. Thank you for the great looking package!

@yitao-li
Copy link
Contributor

@kehldaniel Did you also create a sparklyr connection using

sc <- spark_connect(method = "databricks")

or similar?

@kehldaniel
Copy link

Yes, (after trying hard with my own code that is running on my own laptop) I am running the exact same lines of code as in the original post by josephd000 and get the same error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants