Skip to content

Commit

Permalink
update the count for succeed and failed record
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed Nov 17, 2023
1 parent db824fb commit 272b5d2
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ class ReloadProcessor(data: DataFrame,
val startTime = System.currentTimeMillis
iterator.foreach { row =>
val ngql = row.getString(0)
val recordSize = computeRecordNumber(ngql)
val failStatement = writer.writeNgql(row.getString(0))
val failStatement = writer.writeNgql(ngql)
if (failStatement == null) {
batchSuccess.add(1)
recordSuccess.add(recordSize)
recordSuccess.add(1)
} else {
errorBuffer.append(failStatement)
batchFailure.add(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,10 @@ object Exchange {
val data = spark.read.text(c.reload)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reload: ${recordSuccess.value}")
LOG.info(s">>>>> recordSuccess.reimport: ${batchSuccess.value}")
LOG.info(s">>>>> recordFailure.reimport: ${batchFailure.value}")
LOG.info(
s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0)
s">>>>> exchange reimport job finished, cost:${((System.currentTimeMillis() - start) / 1000.0)
.formatted("%.2f")}s")

sys.exit(0)
Expand Down Expand Up @@ -289,31 +288,6 @@ object Exchange {
}
}

// reimport for failed tags and edges
val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}"
if (failures > 0 && ErrorHandler.existError(errorPath)) {
spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}")

val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")
val data = spark.read.text(errorPath)
val start = System.currentTimeMillis()
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f")
LOG.info(s">>>>> reimport ngql cost time: ${costTime}")
LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reimport: ${recordSuccess.value}")
totalClientBatchSuccess += batchSuccess.value
totalClientBatchFailure -= batchSuccess.value
totalClientRecordSuccess += recordSuccess.value
totalClientRecordFailure -= recordSuccess.value
if (totalClientRecordFailure < 0) {
totalClientRecordFailure = 0
}
}
spark.close()
val duration = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f").toDouble
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,18 @@ class EdgeProcessor(spark: SparkSession,
writer.prepare()
// batch write tags
val startTime = System.currentTimeMillis
iterator.grouped(edgeConfig.batch).foreach { edge =>
val edges = Edges(nebulaKeys, edge.toList, edgeConfig.sourcePolicy, edgeConfig.targetPolicy)
val failStatement = writer.writeEdges(edges, edgeConfig.ignoreIndex)
if (failStatement == null) {
iterator.grouped(edgeConfig.batch).foreach { edgeSet =>
val edges =
Edges(nebulaKeys, edgeSet.toList, edgeConfig.sourcePolicy, edgeConfig.targetPolicy)
val failStatements = writer.writeEdges(edges, edgeConfig.ignoreIndex)
if (failStatements.isEmpty) {
batchSuccess.add(1)
recordSuccess.add(edge.toList.size)
recordSuccess.add(edgeSet.size)
} else {
errorBuffer.append(failStatement)
errorBuffer.append(failStatements: _*)
batchFailure.add(1)
recordFailure.add(edge.toList.size)
recordSuccess.add(edgeSet.size - failStatements.size)
recordFailure.add(failStatements.size)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
writeErrorStatement(errorBuffer)
throw TooManyErrorsException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,17 @@ class VerticesProcessor(spark: SparkSession,
writer.prepare()
// batch write tags
val startTime = System.currentTimeMillis
iterator.grouped(tagConfig.batch).foreach { vertex =>
val vertices = Vertices(nebulaKeys, vertex.toList, tagConfig.vertexPolicy)
val failStatement = writer.writeVertices(vertices, tagConfig.ignoreIndex)
if (failStatement == null) {
iterator.grouped(tagConfig.batch).foreach { vertexSet =>
val vertices = Vertices(nebulaKeys, vertexSet.toList, tagConfig.vertexPolicy)
val failStatements = writer.writeVertices(vertices, tagConfig.ignoreIndex)
if (failStatements.isEmpty) {
batchSuccess.add(1)
recordSuccess.add(vertex.toList.size)
recordSuccess.add(vertexSet.size)
} else {
errorBuffer.append(failStatement)
errorBuffer.append(failStatements: _*)
batchFailure.add(1)
recordFailure.add(vertex.toList.size)
recordSuccess.add(vertexSet.size - failStatements.size)
recordFailure.add(failStatements.size)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
writeErrorStatement(errorBuffer)
throw TooManyErrorsException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,10 @@ object Exchange {
val data = spark.read.text(c.reload)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reload: ${recordSuccess.value}")
LOG.info(s">>>>> recordSuccess.reimport: ${batchSuccess.value}")
LOG.info(s">>>>> recordFailure.reimport: ${batchFailure.value}")
LOG.info(
s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0)
s">>>>> exchange reimport job finished, cost:${((System.currentTimeMillis() - start) / 1000.0)
.formatted("%.2f")}s")
sys.exit(0)
}
Expand Down Expand Up @@ -288,30 +287,6 @@ object Exchange {
}
}

// reimport for failed tags and edges
val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}"
if (failures > 0 && ErrorHandler.existError(errorPath)) {
spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}")
val start = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")
val data = spark.read.text(errorPath)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f")
LOG.info(s">>>>> reimport ngql cost time: ${costTime}")
LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reimport: ${recordSuccess.value}")
totalClientBatchSuccess += batchSuccess.value
totalClientBatchFailure -= batchSuccess.value
totalClientRecordSuccess += recordSuccess.value
totalClientRecordFailure -= recordSuccess.value
if (totalClientRecordFailure < 0) {
totalClientRecordFailure = 0
}
}
spark.close()
val duration = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f").toDouble
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class EdgeProcessor(spark: SparkSession,
config: Configs,
batchSuccess: LongAccumulator,
batchFailure: LongAccumulator,
recordSuccess:LongAccumulator,
recordFailure:LongAccumulator)
recordSuccess: LongAccumulator,
recordFailure: LongAccumulator)
extends Processor {

@transient
Expand All @@ -68,16 +68,18 @@ class EdgeProcessor(spark: SparkSession,
writer.prepare()
// batch write tags
val startTime = System.currentTimeMillis
iterator.grouped(edgeConfig.batch).foreach { edge =>
val edges = Edges(nebulaKeys, edge.toList, edgeConfig.sourcePolicy, edgeConfig.targetPolicy)
val failStatement = writer.writeEdges(edges, edgeConfig.ignoreIndex)
if (failStatement == null) {
iterator.grouped(edgeConfig.batch).foreach { edgeSet =>
val edges =
Edges(nebulaKeys, edgeSet.toList, edgeConfig.sourcePolicy, edgeConfig.targetPolicy)
val failStatements = writer.writeEdges(edges, edgeConfig.ignoreIndex)
if (failStatements.isEmpty) {
batchSuccess.add(1)
recordSuccess.add(edge.toList.size)
recordSuccess.add(edgeSet.size)
} else {
errorBuffer.append(failStatement)
errorBuffer.append(failStatements: _*)
batchFailure.add(1)
recordFailure.add(edge.toList.size)
recordSuccess.add(edgeSet.size - failStatements.size)
recordFailure.add(failStatements.size)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
writeErrorStatement(errorBuffer)
throw TooManyErrorsException(
Expand Down Expand Up @@ -427,7 +429,7 @@ class EdgeProcessor(spark: SparkSession,
(positiveEdgeKey, reverseEdgeKey, edgeValue)
}

private def writeErrorStatement(errorBuffer:ArrayBuffer[String]): Unit = {
private def writeErrorStatement(errorBuffer: ArrayBuffer[String]): Unit = {
if (errorBuffer.nonEmpty) {
val appId = SparkEnv.get.blockManager.conf.getAppId
ErrorHandler.save(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class VerticesProcessor(spark: SparkSession,
config: Configs,
batchSuccess: LongAccumulator,
batchFailure: LongAccumulator,
recordSuccess:LongAccumulator,
recordFailure:LongAccumulator)
recordSuccess: LongAccumulator,
recordFailure: LongAccumulator)
extends Processor {

@transient
Expand All @@ -76,16 +76,17 @@ class VerticesProcessor(spark: SparkSession,
writer.prepare()
// batch write tags
val startTime = System.currentTimeMillis
iterator.grouped(tagConfig.batch).foreach { vertex =>
val vertices = Vertices(nebulaKeys, vertex.toList, tagConfig.vertexPolicy)
val failStatement = writer.writeVertices(vertices, tagConfig.ignoreIndex)
if (failStatement == null) {
iterator.grouped(tagConfig.batch).foreach { vertexSet =>
val vertices = Vertices(nebulaKeys, vertexSet.toList, tagConfig.vertexPolicy)
val failStatements = writer.writeVertices(vertices, tagConfig.ignoreIndex)
if (failStatements.isEmpty) {
batchSuccess.add(1)
recordSuccess.add(vertex.toList.size)
recordSuccess.add(vertexSet.size)
} else {
errorBuffer.append(failStatement)
errorBuffer.append(failStatements: _*)
batchFailure.add(1)
recordFailure.add(vertex.toList.size)
recordSuccess.add(vertexSet.size - failStatements.size)
recordFailure.add(failStatements.size)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
writeErrorStatement(errorBuffer)
throw TooManyErrorsException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,8 @@ object Exchange {
val data = spark.read.text(c.reload)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reload: ${recordSuccess.value}")
LOG.info(s">>>>> recordSuccess.reimport: ${batchSuccess.value}")
LOG.info(s">>>>> recordFailure.reimport: ${batchFailure.value}")
LOG.info(
s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0)
.formatted("%.2f")}s")
Expand Down Expand Up @@ -287,30 +286,6 @@ object Exchange {
}
}

// reimport for failed tags and edges
val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}"
if (failures > 0 && ErrorHandler.existError(errorPath)) {
spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}")
val start = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.reimport")
val data = spark.read.text(errorPath)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure, recordSuccess)
processor.process()
val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f")
LOG.info(s">>>>> reimport ngql cost time: ${costTime}")
LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}")
LOG.info(s">>>>> recordSuccess.reimport: ${recordSuccess.value}")
totalClientBatchSuccess += batchSuccess.value
totalClientBatchFailure -= batchSuccess.value
totalClientRecordSuccess += recordSuccess.value
totalClientRecordFailure -= recordSuccess.value
if (totalClientRecordFailure < 0) {
totalClientRecordFailure = 0
}
}
spark.close()
val duration = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f").toDouble
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class EdgeProcessor(spark: SparkSession,
config: Configs,
batchSuccess: LongAccumulator,
batchFailure: LongAccumulator,
recordSuccess:LongAccumulator,
recordFailure:LongAccumulator)
recordSuccess: LongAccumulator,
recordFailure: LongAccumulator)
extends Processor {

@transient
Expand All @@ -68,16 +68,18 @@ class EdgeProcessor(spark: SparkSession,
writer.prepare()
// batch write tags
val startTime = System.currentTimeMillis
iterator.grouped(edgeConfig.batch).foreach { edge =>
val edges = Edges(nebulaKeys, edge.toList, edgeConfig.sourcePolicy, edgeConfig.targetPolicy)
val failStatement = writer.writeEdges(edges, edgeConfig.ignoreIndex)
if (failStatement == null) {
iterator.grouped(edgeConfig.batch).foreach { edgeSet =>
val edges =
Edges(nebulaKeys, edgeSet.toList, edgeConfig.sourcePolicy, edgeConfig.targetPolicy)
val failStatements = writer.writeEdges(edges, edgeConfig.ignoreIndex)
if (failStatements.isEmpty) {
batchSuccess.add(1)
recordSuccess.add(edge.toList.size)
recordSuccess.add(edgeSet.toList.size)
} else {
errorBuffer.append(failStatement)
errorBuffer.append(failStatements: _*)
batchFailure.add(1)
recordFailure.add(edge.toList.size)
recordSuccess.add(edgeSet.size - failStatements.size)
recordFailure.add(failStatements.size)

if (batchFailure.value >= config.errorConfig.errorMaxSize) {
writeErrorStatement(errorBuffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,17 @@ class VerticesProcessor(spark: SparkSession,
writer.prepare()
// batch write tags
val startTime = System.currentTimeMillis
iterator.grouped(tagConfig.batch).foreach { vertex =>
val vertices = Vertices(nebulaKeys, vertex.toList, tagConfig.vertexPolicy)
val failStatement = writer.writeVertices(vertices, tagConfig.ignoreIndex)
if (failStatement == null) {
iterator.grouped(tagConfig.batch).foreach { vertexSet =>
val vertices = Vertices(nebulaKeys, vertexSet.toList, tagConfig.vertexPolicy)
val failStatements = writer.writeVertices(vertices, tagConfig.ignoreIndex)
if (failStatements.isEmpty) {
batchSuccess.add(1)
recordSuccess.add(vertex.toList.size)
recordSuccess.add(vertexSet.size)
} else {
errorBuffer.append(failStatement)
errorBuffer.append(failStatements: _*)
batchFailure.add(1)
recordFailure.add(vertex.toList.size)
recordSuccess.add(vertexSet.size - failStatements.size)
recordFailure.add(failStatements.size)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
writeErrorStatement(errorBuffer)
throw TooManyErrorsException(
Expand Down

0 comments on commit 272b5d2

Please sign in to comment.