diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/Processor.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/Processor.scala index f828848c..c388dd62 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/Processor.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/Processor.scala @@ -61,28 +61,28 @@ trait Processor extends Serializable { if (row.isNullAt(index)) return null + val value = row.get(index).toString.trim PropertyType.findByValue(fieldTypeMap(field)) match { case PropertyType.STRING | PropertyType.FIXED_STRING => { - var value = row.get(index).toString + var stringValue = value if (value.equals(DEFAULT_EMPTY_VALUE)) { - value = "" + stringValue = "" } - val result = NebulaUtils.escapeUtil(value).mkString("\"", "", "\"") + val result = NebulaUtils.escapeUtil(stringValue).mkString("\"", "", "\"") result } - case PropertyType.DATE => "date(\"" + row.get(index) + "\")" - case PropertyType.DATETIME => "datetime(\"" + row.get(index) + "\")" - case PropertyType.TIME => "time(\"" + row.get(index) + "\")" + case PropertyType.DATE => "date(\"" + value + "\")" + case PropertyType.DATETIME => "datetime(\"" + value + "\")" + case PropertyType.TIME => "time(\"" + value + "\")" case PropertyType.TIMESTAMP => { - val value = row.get(index).toString if (NebulaUtils.isNumic(value)) { value } else { - "timestamp(\"" + row.get(index) + "\")" + "timestamp(\"" + value + "\")" } } - case PropertyType.GEOGRAPHY => "ST_GeogFromText(\"" + row.get(index) + "\")" - case _ => row.get(index) + case PropertyType.GEOGRAPHY => "ST_GeogFromText(\"" + value + "\")" + case _ => value } } @@ -98,18 +98,18 @@ trait Processor extends Serializable { case PropertyType.UNKNOWN => throw new IllegalArgumentException("date type in nebula is UNKNOWN.") case PropertyType.STRING | PropertyType.FIXED_STRING => { - val value = row.get(index).toString + val value = row.get(index).toString.trim if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value } - case PropertyType.BOOL => row.get(index).toString.toBoolean - case PropertyType.DOUBLE => row.get(index).toString.toDouble - case PropertyType.FLOAT => row.get(index).toString.toFloat - case PropertyType.INT8 => row.get(index).toString.toByte - case PropertyType.INT16 => row.get(index).toString.toShort - case PropertyType.INT32 => row.get(index).toString.toInt - case PropertyType.INT64 | PropertyType.VID => row.get(index).toString.toLong + case PropertyType.BOOL => row.get(index).toString.trim.toBoolean + case PropertyType.DOUBLE => row.get(index).toString.trim.toDouble + case PropertyType.FLOAT => row.get(index).toString.trim.toFloat + case PropertyType.INT8 => row.get(index).toString.trim.toByte + case PropertyType.INT16 => row.get(index).toString.trim.toShort + case PropertyType.INT32 => row.get(index).toString.trim.toInt + case PropertyType.INT64 | PropertyType.VID => row.get(index).toString.trim.toLong case PropertyType.TIME => { - val values = row.get(index).toString.split(":") + val values = row.get(index).toString.trim.split(":") if (values.size < 3) { throw new UnsupportedOperationException( s"wrong format for time value: ${row.get(index)}, correct format is 12:00:00:0000") @@ -120,7 +120,7 @@ trait Processor extends Serializable { new Time(values(0).toByte, values(1).toByte, sec, microSec) } case PropertyType.DATE => { - val values = row.get(index).toString.split("-") + val values = row.get(index).toString.trim.split("-") if (values.size < 3) { throw new UnsupportedOperationException( s"wrong format for date value: ${row.get(index)}, correct format is 2020-01-01") @@ -128,7 +128,7 @@ trait Processor extends Serializable { new Date(values(0).toShort, values(1).toByte, values(2).toByte) } case PropertyType.DATETIME => { - val rowValue = row.get(index).toString + val rowValue = row.get(index).toString.trim var dateTimeValue: Array[String] = null if (rowValue.contains("T")) { dateTimeValue = rowValue.split("T") @@ -140,7 +140,7 @@ trait Processor extends Serializable { s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00.0000") } - if (dateTimeValue.size < 2) { + if (dateTimeValue.length < 2) { throw new UnsupportedOperationException( s"wrong format for datetime value: $rowValue, " + s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00.0000") @@ -167,15 +167,15 @@ trait Processor extends Serializable { microsec) } case PropertyType.TIMESTAMP => { - val value = row.get(index).toString + val value = row.get(index).toString.trim if (!NebulaUtils.isNumic(value)) { throw new IllegalArgumentException( s"timestamp only support long type, your value is ${value}") } - row.get(index).toString.toLong + value.toLong } case PropertyType.GEOGRAPHY => { - val wkt = row.get(index).toString + val wkt = row.get(index).toString.trim val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt) convertJTSGeometryToGeography(jtsGeom) } diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 7837a330..a51bb77c 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -193,7 +193,7 @@ class EdgeProcessor(spark: SparkSession, if (index < 0 || row.isNullAt(index)) { printChoice(streamFlag, s"rank must exist and cannot be null, your row data is $row") } - val ranking = row.get(index).toString + val ranking = row.get(index).toString.trim if (!NebulaUtils.isNumic(ranking)) { printChoice(streamFlag, s"Not support non-Numeric type for ranking field.your row data is $row") @@ -221,7 +221,7 @@ class EdgeProcessor(spark: SparkSession, if (index < 0 || row.isNullAt(index)) { printChoice(streamFlag, s"$fieldType must exist and cannot be null, your row data is $row") None - } else Some(row.get(index).toString) + } else Some(row.get(index).toString.trim) } val idFlag = fieldValue.isDefined @@ -267,7 +267,7 @@ class EdgeProcessor(spark: SparkSession, if (edgeConfig.rankingField.isDefined) { val index = row.schema.fieldIndex(edgeConfig.rankingField.get) - val ranking = row.get(index).toString + val ranking = row.get(index).toString.trim Edge(sourceField, targetField, Some(ranking.toLong), values) } else { Edge(sourceField, targetField, None, values) @@ -288,7 +288,7 @@ class EdgeProcessor(spark: SparkSession, indexCells(lat, lng).mkString(",") } else { val index = row.schema.fieldIndex(field) - val value = row.get(index).toString + val value = row.get(index).toString.trim if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value } // process string type vid @@ -310,11 +310,11 @@ class EdgeProcessor(spark: SparkSession, isEdgeValid(row, edgeConfig, false, vidType == VidType.STRING) val srcIndex: Int = row.schema.fieldIndex(edgeConfig.sourceField) - var srcId: String = row.get(srcIndex).toString + var srcId: String = row.get(srcIndex).toString.trim if (srcId.equals(DEFAULT_EMPTY_VALUE)) { srcId = "" } val dstIndex: Int = row.schema.fieldIndex(edgeConfig.targetField) - var dstId: String = row.get(dstIndex).toString + var dstId: String = row.get(dstIndex).toString.trim if (dstId.equals(DEFAULT_EMPTY_VALUE)) { dstId = "" } if (edgeConfig.sourcePolicy.isDefined) { @@ -344,7 +344,7 @@ class EdgeProcessor(spark: SparkSession, val ranking: Long = if (edgeConfig.rankingField.isDefined) { val rankIndex = row.schema.fieldIndex(edgeConfig.rankingField.get) - row.get(rankIndex).toString.toLong + row.get(rankIndex).toString.trim.toLong } else { 0 } diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 2bcd7160..24433f19 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -176,7 +176,7 @@ class VerticesProcessor(spark: SparkSession, return false } - val vertexId = row.get(index).toString + val vertexId = row.get(index).toString.trim // process int type vid if (tagConfig.vertexPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(vertexId)) { printChoice( @@ -203,7 +203,7 @@ class VerticesProcessor(spark: SparkSession, fieldKeys: List[String], fieldTypeMap: Map[String, Int]): Vertex = { val index = row.schema.fieldIndex(tagConfig.vertexField) - var vertexId = row.get(index).toString + var vertexId = row.get(index).toString.trim if (vertexId.equals(DEFAULT_EMPTY_VALUE)) { vertexId = "" } @@ -231,7 +231,7 @@ class VerticesProcessor(spark: SparkSession, isVertexValid(row, tagConfig, false, vidType == VidType.STRING) val index: Int = row.schema.fieldIndex(tagConfig.vertexField) - var vertexId: String = row.get(index).toString + var vertexId: String = row.get(index).toString.trim if (vertexId.equals(DEFAULT_EMPTY_VALUE)) { vertexId = "" } diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 72d54219..bff7ba48 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -212,7 +212,7 @@ class EdgeProcessor(spark: SparkSession, if (index < 0 || row.isNullAt(index)) { printChoice(streamFlag, s"rank must exist and cannot be null, your row data is $row") } - val ranking = row.get(index).toString + val ranking = row.get(index).toString.trim if (!NebulaUtils.isNumic(ranking)) { printChoice(streamFlag, s"Not support non-Numeric type for ranking field.your row data is $row") @@ -240,7 +240,7 @@ class EdgeProcessor(spark: SparkSession, if (index < 0 || row.isNullAt(index)) { printChoice(streamFlag, s"$fieldType must exist and cannot be null, your row data is $row") None - } else Some(row.get(index).toString) + } else Some(row.get(index).toString.trim) } val idFlag = fieldValue.isDefined @@ -286,7 +286,7 @@ class EdgeProcessor(spark: SparkSession, if (edgeConfig.rankingField.isDefined) { val index = row.schema.fieldIndex(edgeConfig.rankingField.get) - val ranking = row.get(index).toString + val ranking = row.get(index).toString.trim Edge(sourceField, targetField, Some(ranking.toLong), values) } else { Edge(sourceField, targetField, None, values) @@ -307,7 +307,7 @@ class EdgeProcessor(spark: SparkSession, indexCells(lat, lng).mkString(",") } else { val index = row.schema.fieldIndex(field) - val value = row.get(index).toString + val value = row.get(index).toString.trim if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value } // process string type vid @@ -329,13 +329,13 @@ class EdgeProcessor(spark: SparkSession, isEdgeValid(row, edgeConfig, false, vidType == VidType.STRING) val srcIndex: Int = row.schema.fieldIndex(edgeConfig.sourceField) - var srcId: String = row.get(srcIndex).toString + var srcId: String = row.get(srcIndex).toString.trim if (srcId.equals(DEFAULT_EMPTY_VALUE)) { srcId = "" } val dstIndex: Int = row.schema.fieldIndex(edgeConfig.targetField) - var dstId: String = row.get(dstIndex).toString + var dstId: String = row.get(dstIndex).toString.trim if (dstId.equals(DEFAULT_EMPTY_VALUE)) { dstId = "" } @@ -367,7 +367,7 @@ class EdgeProcessor(spark: SparkSession, val ranking: Long = if (edgeConfig.rankingField.isDefined) { val rankIndex = row.schema.fieldIndex(edgeConfig.rankingField.get) - row.get(rankIndex).toString.toLong + row.get(rankIndex).toString.trim.toLong } else { 0 } diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 61a134b1..14fba31c 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -195,7 +195,7 @@ class VerticesProcessor(spark: SparkSession, return false } - val vertexId = row.get(index).toString + val vertexId = row.get(index).toString.trim // process int type vid if (tagConfig.vertexPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(vertexId)) { printChoice( @@ -222,7 +222,7 @@ class VerticesProcessor(spark: SparkSession, fieldKeys: List[String], fieldTypeMap: Map[String, Int]): Vertex = { val index = row.schema.fieldIndex(tagConfig.vertexField) - var vertexId = row.get(index).toString + var vertexId = row.get(index).toString.trim if (vertexId.equals(DEFAULT_EMPTY_VALUE)) { vertexId = "" } @@ -250,7 +250,7 @@ class VerticesProcessor(spark: SparkSession, isVertexValid(row, tagConfig, false, vidType == VidType.STRING) val index: Int = row.schema.fieldIndex(tagConfig.vertexField) - var vertexId: String = row.get(index).toString + var vertexId: String = row.get(index).toString.trim if (vertexId.equals(DEFAULT_EMPTY_VALUE)) { vertexId = "" } diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 96439039..2522ed05 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -211,7 +211,7 @@ class EdgeProcessor(spark: SparkSession, if (index < 0 || row.isNullAt(index)) { printChoice(streamFlag, s"rank must exist and cannot be null, your row data is $row") } - val ranking = row.get(index).toString + val ranking = row.get(index).toString.trim if (!NebulaUtils.isNumic(ranking)) { printChoice(streamFlag, s"Not support non-Numeric type for ranking field.your row data is $row") @@ -239,7 +239,7 @@ class EdgeProcessor(spark: SparkSession, if (index < 0 || row.isNullAt(index)) { printChoice(streamFlag, s"$fieldType must exist and cannot be null, your row data is $row") None - } else Some(row.get(index).toString) + } else Some(row.get(index).toString.trim) } val idFlag = fieldValue.isDefined @@ -285,7 +285,7 @@ class EdgeProcessor(spark: SparkSession, if (edgeConfig.rankingField.isDefined) { val index = row.schema.fieldIndex(edgeConfig.rankingField.get) - val ranking = row.get(index).toString + val ranking = row.get(index).toString.trim Edge(sourceField, targetField, Some(ranking.toLong), values) } else { Edge(sourceField, targetField, None, values) @@ -306,7 +306,7 @@ class EdgeProcessor(spark: SparkSession, indexCells(lat, lng).mkString(",") } else { val index = row.schema.fieldIndex(field) - val value = row.get(index).toString + val value = row.get(index).toString.trim if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value } // process string type vid @@ -328,13 +328,13 @@ class EdgeProcessor(spark: SparkSession, isEdgeValid(row, edgeConfig, false, vidType == VidType.STRING) val srcIndex: Int = row.schema.fieldIndex(edgeConfig.sourceField) - var srcId: String = row.get(srcIndex).toString + var srcId: String = row.get(srcIndex).toString.trim if (srcId.equals(DEFAULT_EMPTY_VALUE)) { srcId = "" } val dstIndex: Int = row.schema.fieldIndex(edgeConfig.targetField) - var dstId: String = row.get(dstIndex).toString + var dstId: String = row.get(dstIndex).toString.trim if (dstId.equals(DEFAULT_EMPTY_VALUE)) { dstId = "" } @@ -366,7 +366,7 @@ class EdgeProcessor(spark: SparkSession, val ranking: Long = if (edgeConfig.rankingField.isDefined) { val rankIndex = row.schema.fieldIndex(edgeConfig.rankingField.get) - row.get(rankIndex).toString.toLong + row.get(rankIndex).toString.trim.toLong } else { 0 } diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 2d262561..3206e7a1 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -222,7 +222,7 @@ class VerticesProcessor(spark: SparkSession, fieldKeys: List[String], fieldTypeMap: Map[String, Int]): Vertex = { val index = row.schema.fieldIndex(tagConfig.vertexField) - var vertexId = row.get(index).toString + var vertexId = row.get(index).toString.trim if (vertexId.equals(DEFAULT_EMPTY_VALUE)) { vertexId = "" } @@ -250,7 +250,7 @@ class VerticesProcessor(spark: SparkSession, isVertexValid(row, tagConfig, false, vidType == VidType.STRING) val index: Int = row.schema.fieldIndex(tagConfig.vertexField) - var vertexId: String = row.get(index).toString + var vertexId: String = row.get(index).toString.trim if (vertexId.equals(DEFAULT_EMPTY_VALUE)) { vertexId = "" }