Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
qishipengqsp committed Sep 13, 2024
1 parent 1da63f3 commit af8adb1
Showing 1 changed file with 11 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,24 +118,23 @@ class ActivityGenerator()(implicit spark: SparkSession)
mediumRDD: RDD[Medium],
accountRDD: RDD[Account]
): RDD[SignIn] = {
val accountSampleList = accountRDD
.sample(
withReplacement = false,
DatagenParams.accountSignedInFraction,
sampleRandom.nextLong()
)
.collect()
.grouped(accountRDD.partitions.length)
.map(_.toList.asJava)
.toList
.asJava
val accountSampleList = spark.sparkContext.broadcast(
accountRDD
.sample(
withReplacement = false,
DatagenParams.accountSignedInFraction,
sampleRandom.nextLong()
)
.collect()
.toList
)

val signInEvent = new SignInEvent
mediumRDD.mapPartitionsWithIndex((index, mediums) => {
signInEvent
.signIn(
mediums.toList.asJava,
accountSampleList.get(index),
accountSampleList.value.asJava,
index
)
.iterator()
Expand Down

0 comments on commit af8adb1

Please sign in to comment.