Skip to content

Commit

Permalink
opt withdraw
Browse files Browse the repository at this point in the history
  • Loading branch information
qishipengqsp committed Aug 15, 2024
1 parent 4289f69 commit b78e465
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ldbc.finbench.datagen.generation.events;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -15,22 +15,17 @@
public class WithdrawEvent implements Serializable {
private final RandomGeneratorFarm randomFarm;
private final Random randIndex;
private final Random amountRandom;
private final Map<String, AtomicLong> multiplicityMap;
private final double probWithdraw;

public WithdrawEvent() {
this.probWithdraw = DatagenParams.accountWithdrawFraction;
randomFarm = new RandomGeneratorFarm();
randIndex = new Random(DatagenParams.defaultSeed);
amountRandom = new Random(DatagenParams.defaultSeed);
multiplicityMap = new ConcurrentHashMap<>();
}

private void resetState(int seed) {
randomFarm.resetRandomGenerators(seed);
randIndex.setSeed(seed);
amountRandom.setSeed(seed);
}

public long getMultiplicityIdAndInc(Account from, Account to) {
Expand All @@ -39,25 +34,24 @@ public long getMultiplicityIdAndInc(Account from, Account to) {
return atomicInt.getAndIncrement();
}

public List<Withdraw> withdraw(List<Account> sources, List<Account> cards, int blockId) {
public List<Withdraw> withdraw(Account from, List<Account> cards, int blockId) {
resetState(blockId);
List<Withdraw> withdraws = new ArrayList<>();
sources.forEach(from -> {
if (!from.getType().equals("debit card")
&& randomFarm.get(RandomGeneratorFarm.Aspect.ACCOUNT_WHETHER_WITHDRAW).nextDouble() < probWithdraw) {
int count = 0;
while (count++ < DatagenParams.maxWithdrawals) {
Account to = cards.get(randIndex.nextInt(cards.size()));
if (cannotWithdraw(from, to)) {
continue;
}
withdraws.add(
Withdraw.createWithdraw(randomFarm.get(RandomGeneratorFarm.Aspect.WITHDRAW_DATE), from, to,
getMultiplicityIdAndInc(from, to), amountRandom.nextDouble()
* DatagenParams.withdrawMaxAmount));

Random amountRand = randomFarm.get(RandomGeneratorFarm.Aspect.WITHDRAW_AMOUNT);
Random dateRand = randomFarm.get(RandomGeneratorFarm.Aspect.WITHDRAW_DATE);

List<Withdraw> withdraws = new LinkedList<>();
if (!from.getType().equals("debit card")) {
int count = 0;
while (count++ < DatagenParams.maxWithdrawals) {
Account to = cards.get(randIndex.nextInt(cards.size()));
if (cannotWithdraw(from, to)) {
continue;
}
withdraws.add(Withdraw.createWithdraw(dateRand, from, to, getMultiplicityIdAndInc(from, to),
amountRand.nextDouble() * DatagenParams.withdrawMaxAmount));
}
});
}
return withdraws;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public enum Aspect {
// edge: withdraw
ACCOUNT_WHETHER_WITHDRAW,
WITHDRAW_DATE,
WITHDRAW_AMOUNT,

// edge: signin
SIGNIN_DATE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class ActivityGenerator()(implicit spark: SparkSession)
.asJava

val signInEvent = new SignInEvent
val signRels = mediumRDD.mapPartitionsWithIndex((index, mediums) => {
mediumRDD.mapPartitionsWithIndex((index, mediums) => {
mediums.flatMap(medium => {
signInEvent
.signIn(
Expand All @@ -169,7 +169,6 @@ class ActivityGenerator()(implicit spark: SparkSession)
.asScala
})
})
signRels
}

def personGuaranteeEvent(personRDD: RDD[Person]): RDD[Person] = {
Expand Down Expand Up @@ -257,17 +256,26 @@ class ActivityGenerator()(implicit spark: SparkSession)

// TODO: rewrite it with account centric
def withdrawEvent(accountRDD: RDD[Account]): RDD[Withdraw] = {
val withdrawEvent = new WithdrawEvent
val cards =
accountRDD.filter(_.getType == "debit card").collect().toList.asJava
accountRDD.mapPartitions(sources => {
val withdrawList = withdrawEvent.withdraw(
sources.toList.asJava,
cards,
TaskContext.getPartitionId()
val withdrawEvent = new WithdrawEvent
accountRDD
.sample(
withReplacement = false,
DatagenParams.accountWithdrawFraction,
sampleRandom.nextLong()
)
for { withdraw <- withdrawList.iterator().asScala } yield withdraw
})
.mapPartitionsWithIndex((index, sources) => {
sources.flatMap(source => {
withdrawEvent
.withdraw(
source,
cards,
index
)
.asScala
})
})
}

// TODO: rewrite it with loan centric
Expand Down

0 comments on commit b78e465

Please sign in to comment.