diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/sink/AnnoyIndexBuildSink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/sink/AnnoyIndexBuildSink.scala index c9a590d6..88daa1bb 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/sink/AnnoyIndexBuildSink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/sink/AnnoyIndexBuildSink.scala @@ -32,9 +32,9 @@ class AnnoyIndexBuildSink(queryName: String, conf: TaskConf) extends Sink(queryN override def write(inputDF: DataFrame): Unit = { val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism) - if (inputDF.isStreaming) throw new IllegalStateException("AnnoyIndexBuildSink can not be run as streaming.") + if (df.isStreaming) throw new IllegalStateException("AnnoyIndexBuildSink can not be run as streaming.") else { - ALSModelProcess.buildAnnoyIndex(conf, inputDF) + ALSModelProcess.buildAnnoyIndex(conf, df) } } }