diff --git a/scripts/run.py b/scripts/run.py index abc493a6..1be63a8f 100755 --- a/scripts/run.py +++ b/scripts/run.py @@ -65,6 +65,7 @@ def run_local( **({'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'}), **({'spark.executor.extraJavaOptions': '-XX:+UseG1GC'}), **({'spark.driver.maxResultSize': '5g'}), + # **({'spark.driver.extraJavaOptions': '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005'}), # Debug # **({'spark.executor.extraJavaOptions': '-verbose:gc -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'}), **spark_conf } diff --git a/scripts/run_local.sh b/scripts/run_local.sh index c397351a..45823bb7 100644 --- a/scripts/run_local.sh +++ b/scripts/run_local.sh @@ -5,4 +5,4 @@ OUTPUT_DIR=out # For more command line arguments, see the main entry for more information at # src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala -time python3 scripts/run.py --jar $LDBC_FINBENCH_DATAGEN_JAR --main-class ldbc.finbench.datagen.LdbcDatagen --memory 500g -- --scale-factor 30 --output-dir ${OUTPUT_DIR} \ No newline at end of file +time python3 scripts/run.py --jar $LDBC_FINBENCH_DATAGEN_JAR --main-class ldbc.finbench.datagen.LdbcDatagen --memory 500g -- --scale-factor 1 --output-dir ${OUTPUT_DIR} \ No newline at end of file diff --git a/src/main/java/ldbc/finbench/datagen/entities/edges/CompanyInvestCompany.java b/src/main/java/ldbc/finbench/datagen/entities/edges/CompanyInvestCompany.java index 69579224..226ef878 100644 --- a/src/main/java/ldbc/finbench/datagen/entities/edges/CompanyInvestCompany.java +++ b/src/main/java/ldbc/finbench/datagen/entities/edges/CompanyInvestCompany.java @@ -28,19 +28,17 @@ public CompanyInvestCompany(Company fromCompany, Company toCompany, this.comment = comment; } - public static CompanyInvestCompany createCompanyInvestCompany(RandomGeneratorFarm farm, - Company fromCompany, Company toCompany) { + public static void createCompanyInvestCompany(RandomGeneratorFarm farm, + Company investor, Company target) { Random dateRandom = farm.get(RandomGeneratorFarm.Aspect.COMPANY_INVEST_DATE); - long creationDate = Dictionaries.dates.randomCompanyToCompanyDate(dateRandom, fromCompany, toCompany); + long creationDate = Dictionaries.dates.randomCompanyToCompanyDate(dateRandom, investor, target); double ratio = farm.get(RandomGeneratorFarm.Aspect.INVEST_RATIO).nextDouble(); String comment = Dictionaries.randomTexts.getUniformDistRandomTextForComments( farm.get(RandomGeneratorFarm.Aspect.COMMON_COMMENT)); CompanyInvestCompany companyInvestCompany = - new CompanyInvestCompany(fromCompany, toCompany, creationDate, 0, ratio, false, comment); - fromCompany.getCompanyInvestCompanies().add(companyInvestCompany); - - return companyInvestCompany; + new CompanyInvestCompany(investor, target, creationDate, 0, ratio, false, comment); + target.getCompanyInvestCompanies().add(companyInvestCompany); } public void scaleRatio(double sum) { diff --git a/src/main/java/ldbc/finbench/datagen/entities/edges/PersonInvestCompany.java b/src/main/java/ldbc/finbench/datagen/entities/edges/PersonInvestCompany.java index 47a9e836..2eee7d42 100644 --- a/src/main/java/ldbc/finbench/datagen/entities/edges/PersonInvestCompany.java +++ b/src/main/java/ldbc/finbench/datagen/entities/edges/PersonInvestCompany.java @@ -29,19 +29,17 @@ public PersonInvestCompany(Person person, Company company, this.comment = comment; } - public static PersonInvestCompany createPersonInvestCompany(RandomGeneratorFarm farm, Person person, - Company company) { + public static void createPersonInvestCompany(RandomGeneratorFarm farm, Person investor, + Company target) { Random dateRandom = farm.get(RandomGeneratorFarm.Aspect.PERSON_INVEST_DATE); - long creationDate = Dictionaries.dates.randomPersonToCompanyDate(dateRandom, person, company); + long creationDate = Dictionaries.dates.randomPersonToCompanyDate(dateRandom, investor, target); double ratio = farm.get(RandomGeneratorFarm.Aspect.INVEST_RATIO).nextDouble(); String comment = Dictionaries.randomTexts.getUniformDistRandomTextForComments( farm.get(RandomGeneratorFarm.Aspect.COMMON_COMMENT)); - PersonInvestCompany personInvestCompany = new PersonInvestCompany(person, company, creationDate, 0, ratio, + PersonInvestCompany personInvestCompany = new PersonInvestCompany(investor, target, creationDate, 0, ratio, false, comment); - person.getPersonInvestCompanies().add(personInvestCompany); - - return personInvestCompany; + target.getPersonInvestCompanies().add(personInvestCompany); } public void scaleRatio(double sum) { diff --git a/src/main/java/ldbc/finbench/datagen/entities/edges/Transfer.java b/src/main/java/ldbc/finbench/datagen/entities/edges/Transfer.java index 0b85c15d..e112d682 100644 --- a/src/main/java/ldbc/finbench/datagen/entities/edges/Transfer.java +++ b/src/main/java/ldbc/finbench/datagen/entities/edges/Transfer.java @@ -4,6 +4,7 @@ import java.util.Comparator; import ldbc.finbench.datagen.entities.DynamicActivity; import ldbc.finbench.datagen.entities.nodes.Account; +import ldbc.finbench.datagen.generation.DatagenParams; import ldbc.finbench.datagen.generation.dictionary.Dictionaries; import ldbc.finbench.datagen.util.RandomGeneratorFarm; @@ -31,14 +32,15 @@ public Transfer(Account fromAccount, Account toAccount, double amount, long crea this.isExplicitlyDeleted = isExplicitlyDeleted; } - public static Transfer createTransferAndReturn(RandomGeneratorFarm farm, Account from, Account to, - long multiplicityId, - double amount) { + public static Transfer createTransfer(RandomGeneratorFarm farm, Account from, Account to, + long multiplicityId) { long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate()); long creationDate = Dictionaries.dates.randomAccountToAccountDate(farm.get(RandomGeneratorFarm.Aspect.TRANSFER_DATE), from, to, deleteDate); boolean willDelete = from.isExplicitlyDeleted() && to.isExplicitlyDeleted(); + double amount = + farm.get(RandomGeneratorFarm.Aspect.TRANSFER_AMOUNT).nextDouble() * DatagenParams.transferMaxAmount; Transfer transfer = new Transfer(from, to, amount, creationDate, deleteDate, multiplicityId, willDelete); // Set ordernum @@ -67,9 +69,9 @@ public static Transfer createTransferAndReturn(RandomGeneratorFarm farm, Account return transfer; } - public static Transfer createLoanTransferAndReturn(RandomGeneratorFarm farm, Account from, Account to, - long multiplicityId, - double amount) { + public static Transfer createLoanTransfer(RandomGeneratorFarm farm, Account from, Account to, + long multiplicityId, + double amount) { long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate()); long creationDate = Dictionaries.dates.randomAccountToAccountDate(farm.get(RandomGeneratorFarm.Aspect.LOAN_SUBEVENTS_DATE), diff --git a/src/main/java/ldbc/finbench/datagen/entities/nodes/Company.java b/src/main/java/ldbc/finbench/datagen/entities/nodes/Company.java index cdc9323f..7cdb72b7 100644 --- a/src/main/java/ldbc/finbench/datagen/entities/nodes/Company.java +++ b/src/main/java/ldbc/finbench/datagen/entities/nodes/Company.java @@ -9,6 +9,7 @@ import ldbc.finbench.datagen.entities.edges.CompanyGuaranteeCompany; import ldbc.finbench.datagen.entities.edges.CompanyInvestCompany; import ldbc.finbench.datagen.entities.edges.CompanyOwnAccount; +import ldbc.finbench.datagen.entities.edges.PersonInvestCompany; import ldbc.finbench.datagen.generation.dictionary.Dictionaries; public class Company implements Serializable { @@ -22,6 +23,9 @@ public class Company implements Serializable { private String description; private String url; private final List companyOwnAccounts; + // invested by persons + private final List personInvestCompanies; + // invested by companies private final List companyInvestCompanies; private final LinkedHashSet guaranteeSrc; private final LinkedHashSet guaranteeDst; @@ -29,6 +33,7 @@ public class Company implements Serializable { public Company() { companyOwnAccounts = new LinkedList<>(); + personInvestCompanies = new LinkedList<>(); companyInvestCompanies = new LinkedList<>(); guaranteeSrc = new LinkedHashSet<>(); guaranteeDst = new LinkedHashSet<>(); @@ -74,10 +79,49 @@ public List getCompanyOwnAccounts() { return companyOwnAccounts; } + public List getPersonInvestCompanies() { + return personInvestCompanies; + } + public List getCompanyInvestCompanies() { return companyInvestCompanies; } + public Company scaleInvestmentRatios() { + double sum = 0; + for (PersonInvestCompany pic : personInvestCompanies) { + sum += pic.getRatio(); + } + for (CompanyInvestCompany cic : companyInvestCompanies) { + sum += cic.getRatio(); + } + for (PersonInvestCompany pic : personInvestCompanies) { + pic.scaleRatio(sum); + } + for (CompanyInvestCompany cic : companyInvestCompanies) { + cic.scaleRatio(sum); + } + return this; + } + + public boolean hasInvestedBy(Company company) { + for (CompanyInvestCompany cic : companyInvestCompanies) { + if (cic.getFromCompany().equals(company)) { + return true; + } + } + return false; + } + + public boolean hasInvestedBy(Person person) { + for (PersonInvestCompany pic : personInvestCompanies) { + if (pic.getPerson().equals(person)) { + return true; + } + } + return false; + } + public HashSet getGuaranteeSrc() { return guaranteeSrc; } diff --git a/src/main/java/ldbc/finbench/datagen/generation/dictionary/CommonTextDictionary.java b/src/main/java/ldbc/finbench/datagen/generation/dictionary/CommonTextDictionary.java index 6c66fef0..ef7e84c4 100644 --- a/src/main/java/ldbc/finbench/datagen/generation/dictionary/CommonTextDictionary.java +++ b/src/main/java/ldbc/finbench/datagen/generation/dictionary/CommonTextDictionary.java @@ -16,7 +16,7 @@ public CommonTextDictionary(String filePath, String separator) { try { InputStreamReader inputStreamReader = new InputStreamReader( - Objects.requireNonNull(getClass().getResourceAsStream(filePath)), StandardCharsets.UTF_8); + Objects.requireNonNull(getClass().getResourceAsStream(filePath)), StandardCharsets.UTF_8); BufferedReader dictionary = new BufferedReader(inputStreamReader); String line; long totalNum = 0; diff --git a/src/main/java/ldbc/finbench/datagen/generation/dictionary/PersonNameDictionary.java b/src/main/java/ldbc/finbench/datagen/generation/dictionary/PersonNameDictionary.java index c9a37691..39902a82 100644 --- a/src/main/java/ldbc/finbench/datagen/generation/dictionary/PersonNameDictionary.java +++ b/src/main/java/ldbc/finbench/datagen/generation/dictionary/PersonNameDictionary.java @@ -6,7 +6,6 @@ import java.nio.charset.StandardCharsets; import java.util.Random; import java.util.TreeMap; -import ldbc.finbench.datagen.generation.DatagenParams; public class PersonNameDictionary { private final TreeMap personSurnames; diff --git a/src/main/java/ldbc/finbench/datagen/generation/distribution/TimeDistribution.java b/src/main/java/ldbc/finbench/datagen/generation/distribution/TimeDistribution.java index 766866d5..c25b1575 100644 --- a/src/main/java/ldbc/finbench/datagen/generation/distribution/TimeDistribution.java +++ b/src/main/java/ldbc/finbench/datagen/generation/distribution/TimeDistribution.java @@ -11,7 +11,7 @@ public class TimeDistribution { private Map hourDistribution; private double[] hourProbs; private final double[] hourCumulatives; - + public TimeDistribution(String hourDistributionFile) { loadDistribution(hourDistributionFile); hourCumulatives = new double[hourProbs.length]; diff --git a/src/main/java/ldbc/finbench/datagen/generation/events/CompanyInvestEvent.java b/src/main/java/ldbc/finbench/datagen/generation/events/CompanyInvestEvent.java index 63761581..75e5fbdf 100644 --- a/src/main/java/ldbc/finbench/datagen/generation/events/CompanyInvestEvent.java +++ b/src/main/java/ldbc/finbench/datagen/generation/events/CompanyInvestEvent.java @@ -1,6 +1,7 @@ package ldbc.finbench.datagen.generation.events; import java.io.Serializable; +import java.util.List; import java.util.Random; import ldbc.finbench.datagen.entities.edges.CompanyInvestCompany; import ldbc.finbench.datagen.entities.nodes.Company; @@ -21,7 +22,26 @@ public void resetState(int seed) { randIndex.setSeed(seed); } - public CompanyInvestCompany companyInvest(Company investor, Company invested) { - return CompanyInvestCompany.createCompanyInvestCompany(randomFarm, investor, invested); + public List companyInvestPartition(List investors, List targets) { + Random numInvestorsRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUMS_COMPANY_INVEST); + Random chooseInvestorRand = randomFarm.get(RandomGeneratorFarm.Aspect.CHOOSE_COMPANY_INVESTOR); + for (Company target : targets) { + int numInvestors = numInvestorsRand.nextInt( + DatagenParams.maxInvestors - DatagenParams.minInvestors + 1 + ) + DatagenParams.minInvestors; + for (int i = 0; i < numInvestors; i++) { + int index = chooseInvestorRand.nextInt(investors.size()); + Company investor = investors.get(index); + if (cannotInvest(investor, target)) { + continue; + } + CompanyInvestCompany.createCompanyInvestCompany(randomFarm, investor, target); + } + } + return targets; + } + + public boolean cannotInvest(Company investor, Company target) { + return (investor == target) || investor.hasInvestedBy(target) || target.hasInvestedBy(investor); } } diff --git a/src/main/java/ldbc/finbench/datagen/generation/events/LoanSubEvents.java b/src/main/java/ldbc/finbench/datagen/generation/events/LoanSubEvents.java index ad375c00..a24ac9c7 100644 --- a/src/main/java/ldbc/finbench/datagen/generation/events/LoanSubEvents.java +++ b/src/main/java/ldbc/finbench/datagen/generation/events/LoanSubEvents.java @@ -112,16 +112,16 @@ private void transferSubEvent(Loan loan) { if (actionRandom.nextDouble() < 0.5) { if (!cannotTransfer(account, target)) { transfers.add( - Transfer.createLoanTransferAndReturn(randomFarm, account, target, - getMultiplicityIdAndInc(account, target), - amountRandom.nextDouble() * DatagenParams.transferMaxAmount)); + Transfer.createLoanTransfer(randomFarm, account, target, + getMultiplicityIdAndInc(account, target), + amountRandom.nextDouble() * DatagenParams.transferMaxAmount)); } } else { if (!cannotTransfer(target, account)) { transfers.add( - Transfer.createLoanTransferAndReturn(randomFarm, target, account, - getMultiplicityIdAndInc(target, account), - amountRandom.nextDouble() * DatagenParams.transferMaxAmount)); + Transfer.createLoanTransfer(randomFarm, target, account, + getMultiplicityIdAndInc(target, account), + amountRandom.nextDouble() * DatagenParams.transferMaxAmount)); } } } diff --git a/src/main/java/ldbc/finbench/datagen/generation/events/PersonInvestEvent.java b/src/main/java/ldbc/finbench/datagen/generation/events/PersonInvestEvent.java index 14d51526..297a70f5 100644 --- a/src/main/java/ldbc/finbench/datagen/generation/events/PersonInvestEvent.java +++ b/src/main/java/ldbc/finbench/datagen/generation/events/PersonInvestEvent.java @@ -1,6 +1,7 @@ package ldbc.finbench.datagen.generation.events; import java.io.Serializable; +import java.util.List; import java.util.Random; import ldbc.finbench.datagen.entities.edges.PersonInvestCompany; import ldbc.finbench.datagen.entities.nodes.Company; @@ -22,7 +23,26 @@ public void resetState(int seed) { randIndex.setSeed(seed); } - public PersonInvestCompany personInvest(Person person, Company company) { - return PersonInvestCompany.createPersonInvestCompany(randomFarm, person, company); + public List personInvestPartition(List investors, List targets) { + Random numInvestorsRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUMS_PERSON_INVEST); + Random chooseInvestorRand = randomFarm.get(RandomGeneratorFarm.Aspect.CHOOSE_PERSON_INVESTOR); + for (Company target : targets) { + int numInvestors = numInvestorsRand.nextInt( + DatagenParams.maxInvestors - DatagenParams.minInvestors + 1 + ) + DatagenParams.minInvestors; + for (int i = 0; i < numInvestors; i++) { + int index = chooseInvestorRand.nextInt(investors.size()); + Person investor = investors.get(index); + if (cannotInvest(investor, target)) { + continue; + } + PersonInvestCompany.createPersonInvestCompany(randomFarm, investor, target); + } + } + return targets; + } + + public boolean cannotInvest(Person investor, Company target) { + return target.hasInvestedBy(investor); } } diff --git a/src/main/java/ldbc/finbench/datagen/generation/events/TransferEvent.java b/src/main/java/ldbc/finbench/datagen/generation/events/TransferEvent.java index b7ff8289..a0d02084 100644 --- a/src/main/java/ldbc/finbench/datagen/generation/events/TransferEvent.java +++ b/src/main/java/ldbc/finbench/datagen/generation/events/TransferEvent.java @@ -1,12 +1,8 @@ package ldbc.finbench.datagen.generation.events; import java.io.Serializable; -import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.Random; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import ldbc.finbench.datagen.entities.edges.Transfer; import ldbc.finbench.datagen.entities.nodes.Account; import ldbc.finbench.datagen.generation.DatagenParams; @@ -15,36 +11,23 @@ public class TransferEvent implements Serializable { private final RandomGeneratorFarm randomFarm; - private final Random shuffleRandom; - private final Random amountRandom; private final DegreeDistribution multiplicityDist; private final double partRatio; public TransferEvent() { this.partRatio = 1.0 / DatagenParams.transferShuffleTimes; randomFarm = new RandomGeneratorFarm(); - shuffleRandom = new Random(DatagenParams.defaultSeed); - amountRandom = new Random(DatagenParams.defaultSeed); multiplicityDist = DatagenParams.getTransferMultiplicityDistribution(); multiplicityDist.initialize(); } private void resetState(int seed) { randomFarm.resetRandomGenerators(seed); - shuffleRandom.setSeed(seed); - amountRandom.setSeed(seed); multiplicityDist.reset(seed); } - // Deprecated: OutDegrees is shuffled with InDegrees. It has been moved to the scala layer - private void setOutDegreeWithShuffle(List accounts) { - List degrees = accounts.parallelStream().map(Account::getMaxInDegree).collect(Collectors.toList()); - Collections.shuffle(degrees, shuffleRandom); - IntStream.range(0, accounts.size()).parallel().forEach(i -> accounts.get(i).setMaxOutDegree(degrees.get(i))); - } - - private List getIndexList(int size) { - List indexList = new LinkedList<>(); + private LinkedList getIndexList(int size) { + LinkedList indexList = new LinkedList<>(); for (int i = 0; i < size; i++) { indexList.add(i); } @@ -57,17 +40,14 @@ private List getIndexList(int size) { public List transferPart(List accounts, int blockId) { resetState(blockId); - List availableToAccountIds = getIndexList(accounts.size()); // available transferTo accountIds - // scale to percentage - accounts.forEach( - account -> { - account.setMaxOutDegree((long) Math.ceil(account.getRawMaxOutDegree() * partRatio)); - account.setMaxInDegree((long) Math.ceil(account.getRawMaxInDegree() * partRatio)); - } - ); + for (Account account : accounts) { + account.setMaxOutDegree((long) Math.ceil(account.getRawMaxOutDegree() * partRatio)); + account.setMaxInDegree((long) Math.ceil(account.getRawMaxInDegree() * partRatio)); + } List transfers = new LinkedList<>(); + LinkedList availableToAccountIds = getIndexList(accounts.size()); // available transferTo accountIds for (int i = 0; i < accounts.size(); i++) { Account from = accounts.get(i); @@ -76,16 +56,14 @@ public List transferPart(List accounts, int blockId) { for (int j = 0; j < availableToAccountIds.size(); j++) { int toIndex = availableToAccountIds.get(j); Account to = accounts.get(toIndex); - if (toIndex == i || cannotTransfer(from, to) || !distanceProbOK(j - i)) { + if (toIndex == i || cannotTransfer(from, to)) { skippedCount++; continue; } long numTransfers = Math.min(multiplicityDist.nextDegree(), Math.min(from.getAvailableOutDegree(), to.getAvailableInDegree())); for (int mindex = 0; mindex < numTransfers; mindex++) { - transfers.add(Transfer.createTransferAndReturn(randomFarm, from, to, mindex, - amountRandom.nextDouble() - * DatagenParams.transferMaxAmount)); + transfers.add(Transfer.createTransfer(randomFarm, from, to, mindex)); } if (to.getAvailableInDegree() == 0) { availableToAccountIds.remove(j); @@ -106,6 +84,7 @@ public List transferPart(List accounts, int blockId) { return transfers; } + // distanceProbOK is not applied to avoid rebundant computation private boolean distanceProbOK(int distance) { if (DatagenParams.transferGenerationMode.equals("loose")) { return true; diff --git a/src/main/java/ldbc/finbench/datagen/util/RandomGeneratorFarm.java b/src/main/java/ldbc/finbench/datagen/util/RandomGeneratorFarm.java index 7e2f620d..91079c4b 100644 --- a/src/main/java/ldbc/finbench/datagen/util/RandomGeneratorFarm.java +++ b/src/main/java/ldbc/finbench/datagen/util/RandomGeneratorFarm.java @@ -59,6 +59,7 @@ public enum Aspect { // edge: transfer TRANSFER_DATE, + TRANSFER_AMOUNT, TRANSFER_ORDERNUM, TRANSFER_PAYTYPE, TRANSFER_GOODSTYPE, @@ -82,8 +83,12 @@ public enum Aspect { // edge: person invest PERSON_INVEST_DATE, + NUMS_PERSON_INVEST, + CHOOSE_PERSON_INVESTOR, // edge: company invest COMPANY_INVEST_DATE, + NUMS_COMPANY_INVEST, + CHOOSE_COMPANY_INVESTOR, INVEST_RATIO, // edge: person guarantee diff --git a/src/main/resources/params_default.ini b/src/main/resources/params_default.ini index 15a78310..6a1bba5b 100644 --- a/src/main/resources/params_default.ini +++ b/src/main/resources/params_default.ini @@ -48,5 +48,5 @@ loan.numSubEvents:10 loan.maxLoanInterest:0.1 invest.companyInvestedFraction:1.0 -invest.minInvestors:3 -invest.maxInvestors:10 \ No newline at end of file +invest.minInvestors:1 +invest.maxInvestors:5 \ No newline at end of file diff --git a/src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala b/src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala index 88948cb0..18d74096 100644 --- a/src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala +++ b/src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala @@ -136,7 +136,7 @@ object LdbcDatagen extends SparkApp with Logging { format = args.factorFormat ) log.info("[Main] Starting factoring stage") - FactorGenerationStage.run(factorArgs) +// FactorGenerationStage.run(factorArgs) } } } diff --git a/src/main/scala/ldbc/finbench/datagen/generation/ActivitySimulator.scala b/src/main/scala/ldbc/finbench/datagen/generation/ActivitySimulator.scala index 2ff009e5..b1f7cd55 100644 --- a/src/main/scala/ldbc/finbench/datagen/generation/ActivitySimulator.scala +++ b/src/main/scala/ldbc/finbench/datagen/generation/ActivitySimulator.scala @@ -51,10 +51,7 @@ class ActivitySimulator(sink: RawSink)(implicit spark: SparkSession) ) // simulate person or company invest company event - val investRdd = activityGenerator.investEvent(personRdd, companyRdd) - log.info( - s"[Simulation] invest RDD partitions: ${investRdd.getNumPartitions}" - ) + val companyRddAfterInvest = activityGenerator.investEvent(personRdd, companyRdd) // simulate person guarantee person event and company guarantee company event val personWithAccGua = @@ -78,21 +75,14 @@ class ActivitySimulator(sink: RawSink)(implicit spark: SparkSession) // Account related activities val accountRdd = - mergeAccounts(personWithAccounts, companyWithAccounts) // merge + mergeAccountsAndShuffleDegrees(personWithAccounts, companyWithAccounts) + val signInRdd = activityGenerator.signInEvent(mediumRdd, accountRdd) + val mergedTransfers = activityGenerator.transferEvent(accountRdd) + val withdrawRdd = activityGenerator.withdrawEvent(accountRdd) log.info( s"[Simulation] Account RDD partitions: ${accountRdd.getNumPartitions}" - ) - val signInRdd = - activityGenerator.signInEvent(mediumRdd, accountRdd) // simulate signIn - val mergedTransfers = - activityGenerator.transferEvent(accountRdd) // simulate transfer - val withdrawRdd = - activityGenerator.withdrawEvent(accountRdd) // simulate withdraw - log.info( - s"[Simulation] signIn RDD partitions: ${signInRdd.getNumPartitions}" - ) - log.info( - s"[Simulation] transfer RDD partitions: ${mergedTransfers.getNumPartitions}, " + + s"[Simulation] signIn RDD partitions: ${signInRdd.getNumPartitions}" + + s"[Simulation] transfer RDD partitions: ${mergedTransfers.getNumPartitions}, " + s"withdraw RDD partitions: ${withdrawRdd.getNumPartitions}" ) @@ -118,19 +108,19 @@ class ActivitySimulator(sink: RawSink)(implicit spark: SparkSession) mergedTransfers ), activitySerializer.writeWithdraw(withdrawRdd), - activitySerializer.writeInvest(investRdd), + activitySerializer.writeInvestCompanies(companyRddAfterInvest), activitySerializer.writeLoanActivities( loanRdd, depositsRdd, repaysRdd, loanTrasfersRdd ) - ).flatten + ).flatten Await.result(Future.sequence(allFutures), Duration.Inf) } - private def mergeAccounts( + private def mergeAccountsAndShuffleDegrees( persons: RDD[Person], companies: RDD[Company] ): RDD[Account] = { diff --git a/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala b/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala index 5e9cc0ac..b010a268 100644 --- a/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala +++ b/src/main/scala/ldbc/finbench/datagen/generation/generators/ActivityGenerator.scala @@ -14,10 +14,6 @@ import scala.collection.SortedMap class ActivityGenerator()(implicit spark: SparkSession) extends Serializable with Logging { - // TODO: Move the type definitions to the some common place - type EitherPersonOrCompany = Either[Person, Company] - type EitherPersonInvestOrCompanyInvest = - Either[PersonInvestCompany, CompanyInvestCompany] val blockSize: Int = DatagenParams.blockSize val sampleRandom = @@ -82,57 +78,40 @@ class ActivityGenerator()(implicit spark: SparkSession) companyWithAccount } - // TODO: rewrite it with company centric and invide the person investors and company investors def investEvent( personRDD: RDD[Person], companyRDD: RDD[Company] - ): RDD[EitherPersonInvestOrCompanyInvest] = { - val seedRandom = new scala.util.Random(DatagenParams.defaultSeed) - val numInvestorsRandom = new scala.util.Random(DatagenParams.defaultSeed) + ): RDD[Company] = { + val persons = spark.sparkContext.broadcast(personRDD.collect().toList) + val companies = spark.sparkContext.broadcast(companyRDD.collect().toList) + val personInvestEvent = new PersonInvestEvent() val companyInvestEvent = new CompanyInvestEvent() - // Sample some companies to be invested - val investedCompanyRDD = companyRDD.sample( - withReplacement = false, - DatagenParams.companyInvestedFraction, - sampleRandom.nextLong() - ) - // Merge to either - val personEitherRDD: RDD[EitherPersonOrCompany] = - personRDD.map(person => Left(person)) - val companyEitherRDD: RDD[EitherPersonOrCompany] = - companyRDD.map(company => Right(company)) - val mergedEither = personEitherRDD.union(companyEitherRDD).collect().toList - - // TODO: optimize the Spark process when large scale - investedCompanyRDD.flatMap(investedCompany => { - numInvestorsRandom.setSeed(seedRandom.nextInt()) - sampleRandom.setSeed(seedRandom.nextInt()) - personInvestEvent.resetState(seedRandom.nextInt()) - companyInvestEvent.resetState(seedRandom.nextInt()) - - val numInvestors = numInvestorsRandom.nextInt( - DatagenParams.maxInvestors - DatagenParams.minInvestors + 1 - ) + DatagenParams.minInvestors - // Note: check if fraction 0.1 has enough numInvestors to take - val investRels = - sampleRandom.shuffle(mergedEither).take(numInvestors).map { - case Left(person) => - Left(personInvestEvent.personInvest(person, investedCompany)) - case Right(company) => - Right(companyInvestEvent.companyInvest(company, investedCompany)) - } - val ratioSum = investRels.map { - case Left(personInvest) => personInvest.getRatio - case Right(companyInvest) => companyInvest.getRatio - }.sum - investRels.foreach { - case Left(personInvest) => personInvest.scaleRatio(ratioSum) - case Right(companyInvest) => companyInvest.scaleRatio(ratioSum) + companyRDD + .sample( + withReplacement = false, + DatagenParams.companyInvestedFraction, + sampleRandom.nextLong() + ) + .mapPartitionsWithIndex { (index, targets) => + personInvestEvent.resetState(index) + personInvestEvent + .personInvestPartition(persons.value.asJava, targets.toList.asJava) + .iterator() + .asScala } - investRels - }) + .mapPartitionsWithIndex { (index, targets) => + companyInvestEvent.resetState(index) + companyInvestEvent + .companyInvestPartition( + companies.value.asJava, + targets.toList.asJava + ) + .iterator() + .asScala + } + .map(_.scaleInvestmentRatios()) } def signInEvent( @@ -282,17 +261,19 @@ class ActivityGenerator()(implicit spark: SparkSession) .asJava // TODO: optimize the map function with the Java-Scala part. - val afterLoanActions = loanRDD.mapPartitionsWithIndex((index, loans) => { - val loanSubEvents = new LoanSubEvents(accountSampleList.get(index)) - loanSubEvents.afterLoanApplied(loans.toList.asJava, index) - Iterator( - ( - loanSubEvents.getDeposits.asScala, - loanSubEvents.getRepays.asScala, - loanSubEvents.getTransfers.asScala + val afterLoanActions = loanRDD + .mapPartitionsWithIndex((index, loans) => { + val loanSubEvents = new LoanSubEvents(accountSampleList.get(index)) + loanSubEvents.afterLoanApplied(loans.toList.asJava, index) + Iterator( + ( + loanSubEvents.getDeposits.asScala, + loanSubEvents.getRepays.asScala, + loanSubEvents.getTransfers.asScala + ) ) - ) - }).cache() + }) + .cache() ( afterLoanActions.flatMap(_._1), diff --git a/src/main/scala/ldbc/finbench/datagen/generation/serializers/ActivitySerializer.scala b/src/main/scala/ldbc/finbench/datagen/generation/serializers/ActivitySerializer.scala index 9b50ab7f..1cc79c12 100644 --- a/src/main/scala/ldbc/finbench/datagen/generation/serializers/ActivitySerializer.scala +++ b/src/main/scala/ldbc/finbench/datagen/generation/serializers/ActivitySerializer.scala @@ -10,7 +10,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import scala.collection.JavaConverters._ -import scala.concurrent.{Future, Await} +import scala.concurrent.Future /** generate person and company activities */ @@ -111,12 +111,12 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) } def writeCompanyWithActivities( - self: RDD[Company] + companiesRDD: RDD[Company] )(implicit spark: SparkSession): Seq[Future[Unit]] = { val futures = Seq( SparkUI.jobAsync("Write Company", "Write Company") { - val rawCompanies = self.map { c: Company => + val rawCompanies = companiesRDD.map { c: Company => CompanyRaw( c.getCompanyId, c.getCreationDate, @@ -138,7 +138,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) }, SparkUI .jobAsync("Write Company own account", "Write Company own account") { - val rawCompanyOwnAccount = self.flatMap { c => + val rawCompanyOwnAccount = companiesRDD.flatMap { c => c.getCompanyOwnAccounts.asScala.map { coa => CompanyOwnAccountRaw( c.getCompanyId, @@ -158,7 +158,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) .save((pathPrefix / "companyOwnAccount").toString) }, SparkUI.jobAsync("Write Company guarantee", "Write Company guarantee") { - val rawCompanyGuarantee = self.flatMap { c => + val rawCompanyGuarantee = companiesRDD.flatMap { c => c.getGuaranteeSrc.asScala.map { cgc: CompanyGuaranteeCompany => CompanyGuaranteeCompanyRaw( cgc.getFromCompany.getCompanyId, @@ -177,7 +177,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) .save((pathPrefix / "companyGuarantee").toString) }, SparkUI.jobAsync("Write Company apply loan", "Write Company apply loan") { - val rawCompanyLoan = self.flatMap { c => + val rawCompanyLoan = companiesRDD.flatMap { c => c.getCompanyApplyLoans.asScala.map { cal: CompanyApplyLoan => CompanyApplyLoanRaw( cal.getCompany.getCompanyId, @@ -214,7 +214,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) m.isBlocked, m.getLastLogin, m.getRiskLevel - ) + ) } spark .createDataFrame(rawMedium) @@ -269,7 +269,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) a.getMaxOutDegree, a.isExplicitlyDeleted, a.getOwnerType.toString - ) + ) } spark .createDataFrame(rawAccount) @@ -335,15 +335,13 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) futures } - def writeInvest( - self: RDD[Either[PersonInvestCompany, CompanyInvestCompany]] + def writeInvestCompanies( + investedCompaniesRDD: RDD[Company] )(implicit spark: SparkSession): Seq[Future[Unit]] = { - val futures = Seq( SparkUI.jobAsync("Write person invest", "Write Person Invest") { - val personInvest = self.filter(_.isLeft).map(_.left.get) - spark - .createDataFrame(personInvest.map { pic => + val rawPersonInvestCompany = investedCompaniesRDD.flatMap { c => + c.getPersonInvestCompanies.asScala.map { pic => PersonInvestCompanyRaw( pic.getPerson.getPersonId, pic.getCompany.getCompanyId, @@ -351,16 +349,22 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) pic.getRatio, pic.getComment ) - }) + } + } + log.info( + "[Invest] PersonInvestCompany count: " + rawPersonInvestCompany + .count() + ) + spark + .createDataFrame(rawPersonInvestCompany) .write .format(sink.format.toString) .options(options) .save((pathPrefix / "personInvest").toString) }, SparkUI.jobAsync("Write company invest", "Write Company Invest") { - val companyInvest = self.filter(_.isRight).map(_.right.get) - spark - .createDataFrame(companyInvest.map { cic => + val rawCompanyInvestCompany = investedCompaniesRDD.flatMap { c => + c.getCompanyInvestCompanies.asScala.map { cic => CompanyInvestCompanyRaw( cic.getFromCompany.getCompanyId, cic.getToCompany.getCompanyId, @@ -368,7 +372,14 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) cic.getRatio, cic.getComment ) - }) + } + } + log.info( + "[Invest] CompanyInvestCompany count: " + rawCompanyInvestCompany + .count() + ) + spark + .createDataFrame(rawCompanyInvestCompany) .write .format(sink.format.toString) .options(options) @@ -395,7 +406,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) formattedDouble(l.getBalance), l.getUsage, f"${l.getInterestRate}%.3f" - ) + ) } spark .createDataFrame(rawLoan) @@ -414,7 +425,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) formattedDouble(d.getAmount), d.isExplicitlyDeleted, d.getComment - ) + ) } spark .createDataFrame(rawDeposit) @@ -433,7 +444,7 @@ class ActivitySerializer(sink: RawSink)(implicit spark: SparkSession) formattedDouble(r.getAmount), r.isExplicitlyDeleted, r.getComment - ) + ) } spark .createDataFrame(rawRepay) diff --git a/scripts/check_consistency.py b/tools/check_consistency.py similarity index 100% rename from scripts/check_consistency.py rename to tools/check_consistency.py diff --git a/scripts/check_deletion.py b/tools/check_deletion.py similarity index 100% rename from scripts/check_deletion.py rename to tools/check_deletion.py diff --git a/scripts/check_duplicate.py b/tools/check_duplicate.py similarity index 100% rename from scripts/check_duplicate.py rename to tools/check_duplicate.py diff --git a/scripts/check_transfer.py b/tools/check_transfer.py similarity index 100% rename from scripts/check_transfer.py rename to tools/check_transfer.py diff --git a/scripts/paramgen/factor_table.sh b/tools/paramgen/factor_table.sh similarity index 100% rename from scripts/paramgen/factor_table.sh rename to tools/paramgen/factor_table.sh diff --git a/scripts/paramgen/generate_account.py b/tools/paramgen/generate_account.py similarity index 100% rename from scripts/paramgen/generate_account.py rename to tools/paramgen/generate_account.py diff --git a/scripts/paramgen/loan.py b/tools/paramgen/loan.py similarity index 100% rename from scripts/paramgen/loan.py rename to tools/paramgen/loan.py diff --git a/scripts/paramgen/parameter_curation.py b/tools/paramgen/parameter_curation.py similarity index 100% rename from scripts/paramgen/parameter_curation.py rename to tools/paramgen/parameter_curation.py diff --git a/scripts/paramgen/search_params.py b/tools/paramgen/search_params.py similarity index 100% rename from scripts/paramgen/search_params.py rename to tools/paramgen/search_params.py diff --git a/scripts/paramgen/split_amount.py b/tools/paramgen/split_amount.py similarity index 100% rename from scripts/paramgen/split_amount.py rename to tools/paramgen/split_amount.py diff --git a/scripts/paramgen/time_select.py b/tools/paramgen/time_select.py similarity index 100% rename from scripts/paramgen/time_select.py rename to tools/paramgen/time_select.py diff --git a/scripts/paramgen/time_split.py b/tools/paramgen/time_split.py similarity index 100% rename from scripts/paramgen/time_split.py rename to tools/paramgen/time_split.py diff --git a/scripts/statistic.py b/tools/statistic.py similarity index 100% rename from scripts/statistic.py rename to tools/statistic.py diff --git a/scripts/validate_formula.py b/tools/validate_formula.py similarity index 100% rename from scripts/validate_formula.py rename to tools/validate_formula.py