Skip to content

Commit

Permalink
add error amount limit (#58)
Browse files Browse the repository at this point in the history
* duration for sst

* add error amount limit
  • Loading branch information
Nicole00 authored Jan 17, 2022
1 parent 2785af0 commit 4741be3
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ trait Processor extends Serializable {
val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt)
convertJTSGeometryToGeography(jtsGeom)
}
case PropertyType.DURATION => {
throw new IllegalArgumentException("do not support data type duration.")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.exchange.common.writer.{GenerateSstFile, NebulaGraphClientWriter, NebulaSSTWriter}
import com.vesoft.exchange.common.VidType
import com.vesoft.nebula.encoder.NebulaCodecImpl
import com.vesoft.nebula.exchange.TooManyErrorsException
import com.vesoft.nebula.meta.EdgeItem
import org.apache.commons.codec.digest.MurmurHash2
import org.apache.log4j.Logger
Expand Down Expand Up @@ -72,6 +73,11 @@ class EdgeProcessor(spark: SparkSession,
} else {
errorBuffer.append(failStatement)
batchFailure.add(1)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
throw TooManyErrorsException(
s"There are too many failed batches, batch amount: ${batchFailure.value}, " +
s"your config max error size: ${config.errorConfig.errorMaxSize}")
}
}
}
if (errorBuffer.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.exchange.common.writer.{GenerateSstFile, NebulaGraphClientWriter, NebulaSSTWriter}
import com.vesoft.exchange.common.VidType
import com.vesoft.nebula.encoder.NebulaCodecImpl
import com.vesoft.nebula.exchange.TooManyErrorsException
import com.vesoft.nebula.meta.TagItem
import org.apache.commons.codec.digest.MurmurHash2
import org.apache.log4j.Logger
Expand Down Expand Up @@ -79,6 +80,11 @@ class VerticesProcessor(spark: SparkSession,
} else {
errorBuffer.append(failStatement)
batchFailure.add(1)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
throw TooManyErrorsException(
s"There are too many failed batches, batch amount: ${batchFailure.value}, " +
s"your config max error size: ${config.errorConfig.errorMaxSize}")
}
}
}
if (errorBuffer.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.exchange.common.writer.{GenerateSstFile, NebulaGraphClientWriter, NebulaSSTWriter}
import com.vesoft.exchange.common.VidType
import com.vesoft.nebula.encoder.NebulaCodecImpl
import com.vesoft.nebula.exchange.TooManyErrorsException
import com.vesoft.nebula.meta.EdgeItem
import org.apache.commons.codec.digest.MurmurHash2
import org.apache.log4j.Logger
Expand Down Expand Up @@ -73,6 +74,11 @@ class EdgeProcessor(spark: SparkSession,
} else {
errorBuffer.append(failStatement)
batchFailure.add(1)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
throw TooManyErrorsException(
s"There are too many failed batches, batch amount: ${batchFailure.value}, " +
s"your config max error size: ${config.errorConfig.errorMaxSize}")
}
}
}
if (errorBuffer.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.exchange.common.writer.{GenerateSstFile, NebulaGraphClientWriter, NebulaSSTWriter}
import com.vesoft.exchange.common.VidType
import com.vesoft.nebula.encoder.NebulaCodecImpl
import com.vesoft.nebula.exchange.TooManyErrorsException
import com.vesoft.nebula.meta.TagItem
import org.apache.commons.codec.digest.MurmurHash2
import org.apache.log4j.Logger
Expand Down Expand Up @@ -81,6 +82,11 @@ class VerticesProcessor(spark: SparkSession,
} else {
errorBuffer.append(failStatement)
batchFailure.add(1)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
throw TooManyErrorsException(
s"There are too many failed batches, batch amount: ${batchFailure.value}, " +
s"your config max error size: ${config.errorConfig.errorMaxSize}")
}
}
}
if (errorBuffer.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.exchange.common.writer.{GenerateSstFile, NebulaGraphClientWriter, NebulaSSTWriter}
import com.vesoft.exchange.common.VidType
import com.vesoft.nebula.encoder.NebulaCodecImpl
import com.vesoft.nebula.exchange.TooManyErrorsException
import com.vesoft.nebula.meta.EdgeItem
import org.apache.commons.codec.digest.MurmurHash2
import org.apache.log4j.Logger
Expand Down Expand Up @@ -73,6 +74,11 @@ class EdgeProcessor(spark: SparkSession,
} else {
errorBuffer.append(failStatement)
batchFailure.add(1)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
throw TooManyErrorsException(
s"There are too many failed batches, batch amount: ${batchFailure.value}, " +
s"your config max error size: ${config.errorConfig.errorMaxSize}")
}
}
}
if (errorBuffer.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.exchange.common.writer.{GenerateSstFile, NebulaGraphClientWriter, NebulaSSTWriter}
import com.vesoft.exchange.common.VidType
import com.vesoft.nebula.encoder.NebulaCodecImpl
import com.vesoft.nebula.exchange.TooManyErrorsException
import com.vesoft.nebula.meta.TagItem
import org.apache.commons.codec.digest.MurmurHash2
import org.apache.log4j.Logger
Expand Down Expand Up @@ -81,6 +82,11 @@ class VerticesProcessor(spark: SparkSession,
} else {
errorBuffer.append(failStatement)
batchFailure.add(1)
if (batchFailure.value >= config.errorConfig.errorMaxSize) {
throw TooManyErrorsException(
s"There are too many failed batches, batch amount: ${batchFailure.value}, " +
s"your config max error size: ${config.errorConfig.errorMaxSize}")
}
}
}
if (errorBuffer.nonEmpty) {
Expand Down

0 comments on commit 4741be3

Please sign in to comment.