Skip to content

Commit

Permalink
cherry pick master feature (#181)
Browse files Browse the repository at this point in the history
* support job parallelism for both tags and edges (#165)

* do not repartition when source dataframe has the same partition number with write partition config (#173)

* Revert "fix escapes (#152)" (#178)

This reverts commit 1f40692.

* update the numPartitions for maxcompute to Long

* update the repartition for streaming data

* support job parallelism for both tags and edges (#165)

* do not repartition when source dataframe has the same partition number with write partition config (#173)

* update the config for numPartitions
  • Loading branch information
Nicole00 authored Nov 28, 2023
1 parent 2de3eec commit f45040c
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ case class SslConfigEntry(enableGraph: Boolean,
}
}

override def toString: String = s"SslConfigEntry:{enableGraph:$enableGraph, enableMeta:$enableMeta, signType:${signType.toString}}"
override def toString: String =
s"SslConfigEntry:{enableGraph:$enableGraph, enableMeta:$enableMeta, signType:${signType.toString}}"
}

case class CaSignParam(caCrtFilePath: String, crtFilePath: String, keyFilePath: String)
Expand Down Expand Up @@ -902,7 +903,7 @@ object Configs {
case SourceCategory.MAXCOMPUTE => {
val table = config.getString("table")
val partitionSpec = getStringOrNull(config, "partitionSpec")
val numPartitions = getOrElse(config, "numPartitions", "1")
val numPartitions = getStringOrElse(config, "numPartitions", "1")
val sentence = getStringOrNull(config, "sentence")

MaxComputeConfigEntry(
Expand All @@ -919,7 +920,7 @@ object Configs {
)
}
case SourceCategory.CLICKHOUSE => {
val partition: String = getOrElse(config, "numPartition", "1")
val partition: String = getStringOrElse(config, "numPartition", "1")
ClickHouseConfigEntry(
SourceCategory.CLICKHOUSE,
config.getString("url"),
Expand Down Expand Up @@ -1007,6 +1008,22 @@ object Configs {
}
}

/**
* Get the value from config by the path. If the path not exist, return the default value.
*
* @param config The com.vesoft.exchange.common.config.
* @param path The path of the com.vesoft.exchange.common.config.
* @param defaultValue The default value for the path.
* @return
*/
private[this] def getStringOrElse(config: Config, path: String, defaultValue: String): String = {
if (config.hasPath(path)) {
config.getString(path)
} else {
defaultValue
}
}

/**
* Get the String value from config
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,22 @@ object NebulaUtils {
s = s.replaceAll("\\\\", "\\\\\\\\")
}
if (s.contains("\t")) {
s = s.replaceAll("\t", "\\\t")
s = s.replaceAll("\t", "\\\\t")
}
if (s.contains("\n")) {
s = s.replaceAll("\n", "\\\n")
s = s.replaceAll("\n", "\\\\n")
}
if (s.contains("\"")) {
s = s.replaceAll("\"", "\\\"")
s = s.replaceAll("\"", "\\\\\"")
}
if (s.contains("\'")) {
s = s.replaceAll("\'", "\\\'")
s = s.replaceAll("\'", "\\\\'")
}
if (s.contains("\r")) {
s = s.replaceAll("\r", "\\\r")
s = s.replaceAll("\r", "\\\\r")
}
if (s.contains("\b")) {
s = s.replaceAll("\b", "\\\b")
s = s.replaceAll("\b", "\\\\b")
}
s
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,12 +439,15 @@ object Exchange {
private[this] def repartition(frame: DataFrame,
partition: Int,
sourceCategory: SourceCategory.Value): DataFrame = {
val currentPart = frame.rdd.partitions.length
if (partition > 0 && currentPart != partition
&& !CheckPointHandler.checkSupportResume(sourceCategory)) {
frame.repartition(partition).toDF
} else {
if (frame.isStreaming || partition <= 0 || CheckPointHandler.checkSupportResume(sourceCategory)) {
frame
} else {
val currentPart = frame.rdd.partitions.length
if (currentPart == partition) {
frame
} else {
frame.repartition(partition).toDF
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ object Exchange {
s">>>>>> total client recordsFailure:${totalClientRecordFailure} \n" +
s">>>>>> total SST failure:${totalSstRecordFailure} \n" +
s">>>>>> total SST Success:${totalSstRecordSuccess}")
LOG.info(s">>>>>> exchange import qps: ${(totalClientRecordSuccess/duration).formatted("%.2f")}/s")
LOG.info(
s">>>>>> exchange import qps: ${(totalClientRecordSuccess / duration).formatted("%.2f")}/s")
}

/**
Expand Down Expand Up @@ -436,12 +437,15 @@ object Exchange {
private[this] def repartition(frame: DataFrame,
partition: Int,
sourceCategory: SourceCategory.Value): DataFrame = {
val currentPart = frame.rdd.partitions.length
if (partition > 0 && currentPart != partition
&& !CheckPointHandler.checkSupportResume(sourceCategory)) {
frame.repartition(partition).toDF
} else {
if (frame.isStreaming || partition <= 0 || CheckPointHandler.checkSupportResume(sourceCategory)) {
frame
} else {
val currentPart = frame.rdd.partitions.length
if (currentPart == partition) {
frame
} else {
frame.repartition(partition).toDF
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,12 +435,15 @@ object Exchange {
private[this] def repartition(frame: DataFrame,
partition: Int,
sourceCategory: SourceCategory.Value): DataFrame = {
val currentPart = frame.rdd.partitions.length
if (partition > 0 && currentPart != partition
&& !CheckPointHandler.checkSupportResume(sourceCategory)) {
frame.repartition(partition).toDF
} else {
if (frame.isStreaming || partition <= 0 || CheckPointHandler.checkSupportResume(sourceCategory)) {
frame
} else {
val currentPart = frame.rdd.partitions.length
if (currentPart == partition) {
frame
} else {
frame.repartition(partition).toDF
}
}
}

Expand Down

0 comments on commit f45040c

Please sign in to comment.