Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sort files and separate factor with datagen #111

Merged
merged 4 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions scripts/run_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
LDBC_FINBENCH_DATAGEN_JAR=target/ldbc_finbench_datagen-0.2.0-SNAPSHOT-jar-with-dependencies.jar
OUTPUT_DIR=out

# Note: generate factor tables with --generate-factors

# run locally with the python script
# time python3 scripts/run.py --jar $LDBC_FINBENCH_DATAGEN_JAR --main-class ldbc.finbench.datagen.LdbcDatagen --memory 500g -- --scale-factor 30 --output-dir ${OUTPUT_DIR}

# run locally with spark-submit command
# **({'spark.driver.extraJavaOptions': '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005'}), # Debug
# **({'spark.executor.extraJavaOptions': '-verbose:gc -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'}),
# --conf "spark.memory.offHeap.enabled=true" \
# --conf "spark.memory.offHeap.size=100g" \
# --conf "spark.storage.memoryFraction=0" \
# --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \

time spark-submit --master local[*] \
--class ldbc.finbench.datagen.LdbcDatagen \
--driver-memory 480g \
Expand All @@ -24,3 +31,20 @@ time spark-submit --master local[*] \
${LDBC_FINBENCH_DATAGEN_JAR} \
--scale-factor 10 \
--output-dir ${OUTPUT_DIR}

# currently works on SF100
#time spark-submit --master local[*] \
# --class ldbc.finbench.datagen.LdbcDatagen \
# --driver-memory 400g \
# --conf "spark.default.parallelism=800" \
# --conf "spark.shuffle.compress=true" \
# --conf "spark.shuffle.spill.compress=true" \
# --conf "spark.kryoserializer.buffer.max=512m" \
# --conf "spark.driver.maxResultSize=0" \
# --conf "spark.driver.extraJavaOptions=-Xss512m" \
# --conf "spark.executor.extraJavaOptions=-Xss512m -XX:+UseG1GC" \
# --conf "spark.kryo.referenceTracking=false" \
# ${LDBC_FINBENCH_DATAGEN_JAR} \
# --scale-factor 100 \
# --output-dir ${OUTPUT_DIR}

34 changes: 16 additions & 18 deletions src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package ldbc.finbench.datagen

import ldbc.finbench.datagen.factors.FactorGenerationStage
import ldbc.finbench.datagen.generation.dictionary.Dictionaries
import ldbc.finbench.datagen.generation.GenerationStage
import ldbc.finbench.datagen.transformation.TransformationStage
import ldbc.finbench.datagen.util.{Logging, SparkApp}
import shapeless.lens

Expand All @@ -24,7 +22,7 @@ object LdbcDatagen extends SparkApp with Logging {
format: String = "csv",
formatOptions: Map[String, String] = Map.empty,
epochMillis: Boolean = false,
generateFactors: Boolean = true,
generateFactors: Boolean = false,
factorFormat: String = "parquet"
)

Expand Down Expand Up @@ -121,22 +119,22 @@ object LdbcDatagen extends SparkApp with Logging {
}

override def run(args: ArgsType): Unit = {
val generationArgs = GenerationStage.Args(
scaleFactor = args.scaleFactor,
outputDir = args.outputDir,
format = args.format,
partitionsOpt = args.numPartitions
)
log.info("[Main] Starting generation stage")
GenerationStage.run(generationArgs)

if (args.generateFactors) {
val factorArgs = FactorGenerationStage.Args(
outputDir = args.outputDir,
format = args.factorFormat
if (!args.generateFactors) {
GenerationStage.run(
GenerationStage.Args(
scaleFactor = args.scaleFactor,
outputDir = args.outputDir,
format = args.format,
partitionsOpt = args.numPartitions
)
)
} else {
FactorGenerationStage.run(
FactorGenerationStage.Args(
outputDir = args.outputDir,
format = args.factorFormat
)
)
log.info("[Main] Starting factoring stage")
// FactorGenerationStage.run(factorArgs)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ldbc.finbench.datagen.factors

import ldbc.finbench.datagen.LdbcDatagen.log
import ldbc.finbench.datagen.util.DatagenStage
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession, functions => F}
Expand All @@ -10,7 +11,6 @@ import shapeless.lens
import scala.util.matching.Regex

object FactorGenerationStage extends DatagenStage {

@transient lazy val log: Logger = LoggerFactory.getLogger(this.getClass)

case class Args(
Expand Down Expand Up @@ -64,14 +64,14 @@ object FactorGenerationStage extends DatagenStage {
run(parsedArgs)
}

// execute factorization process
// TODO: finish all

override def run(args: Args) = {
parameterCuration(args)
factortables(args)
}

def parameterCuration(args: Args)(implicit spark: SparkSession) = {
def factortables(args: Args)(implicit spark: SparkSession) = {
import spark.implicits._
log.info("[Main] Starting factoring stage")

val transferRDD = spark.read
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
Expand Down Expand Up @@ -533,6 +533,5 @@ object FactorGenerationStage extends DatagenStage {
.option("delimiter", "|")
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.save(s"${args.outputDir}/factor_table/upstream_amount")

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ldbc.finbench.datagen.generation

import ldbc.finbench.datagen.LdbcDatagen.log
import ldbc.finbench.datagen.config.{ConfigParser, DatagenConfiguration}
import ldbc.finbench.datagen.io.raw.{Csv, Parquet, RawSink}
import ldbc.finbench.datagen.util._
Expand All @@ -25,6 +26,7 @@ object GenerationStage extends DatagenStage with Logging {
override type ArgsType = Args

override def run(args: Args): Unit = {
log.info("[Main] Starting generation stage")
// build and initialize the configs
val config = buildConfig(args)
// OPT: It is called in each SparkGenerator in Spark to initialize the context on the executors.
Expand Down
4 changes: 0 additions & 4 deletions tools/DataProfiler/result/db139/profile.log

This file was deleted.

2 changes: 0 additions & 2 deletions tools/DataProfiler/result/db177/edges.log

This file was deleted.

4 changes: 0 additions & 4 deletions tools/DataProfiler/result/db177/profile.log

This file was deleted.

4 changes: 0 additions & 4 deletions tools/DataProfiler/result/db184/profile.log

This file was deleted.

2 changes: 0 additions & 2 deletions tools/DataProfiler/result/transfer/edges.log

This file was deleted.

4 changes: 0 additions & 4 deletions tools/DataProfiler/result/transfer/profile.log

This file was deleted.

64 changes: 60 additions & 4 deletions tools/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,62 @@
# Tools

Here lists some tools for graph data processing.
- dataprofiler: a tool for profiling graph data, including degree distribution, etc.
- graphgen: a simple tool/example code to generate power-law distributed graph data.
- paramgen: a Parameter Search tool to generate parameters for queries using TuGraph.
- paramgen:
- parameter_curation: a tool for generating parameters for finbench queries
- check_*.py: python scripts used for check the data features like consistency, distribution
- merge_cluster_output.py: a python script to merge the output in cluster mode
- statistic.py: a python script to calculate the statistics of the data
- legacy: some legacy tools
- dataprofiler: a tool for profiling graph data, including degree distribution, etc.
- graphgen: a simple tool/example code to generate power-law distributed graph data.
- factorgen: factor table generators in python version


## ParamsGen

`params_gen.py` uses the CREATE_VALIDATION feature to generate parameters.

The specific steps are as follows:

1. Select vertices of type Account, Person, and Loan from the dataset, and generate a parameter file that meets the input specifications for ldbc_finbench_driver.
2. Execute CREATE_VALIDATION to generate validation_params.csv.
3. Select non-empty results from validation_params.csv.

Example:

```bash
python3 params_gen.py 1 # gen tcr1 params
```

Other notes:

1. The generated start_timestamp and end_timestamp in the current version are fixed values.
2. For tcr4 and tcr10, this method is not efficient enough. Use the following Cypher query to search for parameters:

```Cypher
// tcr4
MATCH
(n1:Account)-[:transfer]->
(n2:Account)-[:transfer]->
(n3:Account)-[:transfer]->(n4:Account)
WHERE
n1.id = n4.id AND n1.id > n2.id AND n2.id > n3.id
WITH
n1.id as n1id,
n2.id as n2id,
n3.id as n3id,
n4.id as n4id
LIMIT 1000
RETURN DISTINCT toString(n1id)+"|"+toString(n2id)

// tcr10
MATCH
(c:Company)<-[:invest]-(p:Person)
WITH
c.id as cid,
count(p.id) as num,
collect(p.id) as person
WHERE num >= 2
RETURN
tostring(person[0])+"|"+tostring(person[1])
LIMIT 1000
```
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
49 changes: 0 additions & 49 deletions tools/paramgen/README.md

This file was deleted.

Loading
Loading