Skip to content

Commit

Permalink
cherry pick fix data structure for nebula datasource (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Jan 6, 2022
1 parent 92940e6 commit 0dc354c
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ case class NebulaReadConfigEntry(address: String = "",
space: String = "",
labels: List[String] = List(),
weightCols: List[String] = List()) {
assert(weightCols.isEmpty || labels.size == weightCols.size,
"weightCols must be empty or has the same amount values with labels")
override def toString: String = {
s"NebulaReadConfigEntry: " +
s"{address: $address, space: $space, labels: ${labels.mkString(",")}, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ class NebulaReader(spark: SparkSession, configs: Configs, partitionNum: String)
.withReturnCols(returnCols.toList)
.withPartitionNum(partition)
.build()
if (dataset == null) {
dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
} else {
dataset = dataset.union(spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF())
var df = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
if (weights.nonEmpty) {
df = df.select("_srcId", "_dstId", weights(i))
}
dataset = if (dataset == null) df else dataset.union(df)
}
dataset
}
Expand Down

0 comments on commit 0dc354c

Please sign in to comment.