Skip to content

Commit

Permalink
remove extra space for string value for postgres datasource (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Jan 30, 2022
1 parent f69bb65 commit d003fad
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,28 @@ trait Processor extends Serializable {

if (row.isNullAt(index)) return null

val value = row.get(index).toString.trim
PropertyType.findByValue(fieldTypeMap(field)) match {
case PropertyType.STRING | PropertyType.FIXED_STRING => {
var value = row.get(index).toString
var stringValue = value
if (value.equals(DEFAULT_EMPTY_VALUE)) {
value = ""
stringValue = ""
}
val result = NebulaUtils.escapeUtil(value).mkString("\"", "", "\"")
val result = NebulaUtils.escapeUtil(stringValue).mkString("\"", "", "\"")
result
}
case PropertyType.DATE => "date(\"" + row.get(index) + "\")"
case PropertyType.DATETIME => "datetime(\"" + row.get(index) + "\")"
case PropertyType.TIME => "time(\"" + row.get(index) + "\")"
case PropertyType.DATE => "date(\"" + value + "\")"
case PropertyType.DATETIME => "datetime(\"" + value + "\")"
case PropertyType.TIME => "time(\"" + value + "\")"
case PropertyType.TIMESTAMP => {
val value = row.get(index).toString
if (NebulaUtils.isNumic(value)) {
value
} else {
"timestamp(\"" + row.get(index) + "\")"
"timestamp(\"" + value + "\")"
}
}
case PropertyType.GEOGRAPHY => "ST_GeogFromText(\"" + row.get(index) + "\")"
case _ => row.get(index)
case PropertyType.GEOGRAPHY => "ST_GeogFromText(\"" + value + "\")"
case _ => value
}
}

Expand All @@ -98,18 +98,18 @@ trait Processor extends Serializable {
case PropertyType.UNKNOWN =>
throw new IllegalArgumentException("date type in nebula is UNKNOWN.")
case PropertyType.STRING | PropertyType.FIXED_STRING => {
val value = row.get(index).toString
val value = row.get(index).toString.trim
if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
}
case PropertyType.BOOL => row.get(index).toString.toBoolean
case PropertyType.DOUBLE => row.get(index).toString.toDouble
case PropertyType.FLOAT => row.get(index).toString.toFloat
case PropertyType.INT8 => row.get(index).toString.toByte
case PropertyType.INT16 => row.get(index).toString.toShort
case PropertyType.INT32 => row.get(index).toString.toInt
case PropertyType.INT64 | PropertyType.VID => row.get(index).toString.toLong
case PropertyType.BOOL => row.get(index).toString.trim.toBoolean
case PropertyType.DOUBLE => row.get(index).toString.trim.toDouble
case PropertyType.FLOAT => row.get(index).toString.trim.toFloat
case PropertyType.INT8 => row.get(index).toString.trim.toByte
case PropertyType.INT16 => row.get(index).toString.trim.toShort
case PropertyType.INT32 => row.get(index).toString.trim.toInt
case PropertyType.INT64 | PropertyType.VID => row.get(index).toString.trim.toLong
case PropertyType.TIME => {
val values = row.get(index).toString.split(":")
val values = row.get(index).toString.trim.split(":")
if (values.size < 3) {
throw new UnsupportedOperationException(
s"wrong format for time value: ${row.get(index)}, correct format is 12:00:00:0000")
Expand All @@ -120,15 +120,15 @@ trait Processor extends Serializable {
new Time(values(0).toByte, values(1).toByte, sec, microSec)
}
case PropertyType.DATE => {
val values = row.get(index).toString.split("-")
val values = row.get(index).toString.trim.split("-")
if (values.size < 3) {
throw new UnsupportedOperationException(
s"wrong format for date value: ${row.get(index)}, correct format is 2020-01-01")
}
new Date(values(0).toShort, values(1).toByte, values(2).toByte)
}
case PropertyType.DATETIME => {
val rowValue = row.get(index).toString
val rowValue = row.get(index).toString.trim
var dateTimeValue: Array[String] = null
if (rowValue.contains("T")) {
dateTimeValue = rowValue.split("T")
Expand All @@ -140,7 +140,7 @@ trait Processor extends Serializable {
s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00.0000")
}

if (dateTimeValue.size < 2) {
if (dateTimeValue.length < 2) {
throw new UnsupportedOperationException(
s"wrong format for datetime value: $rowValue, " +
s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00.0000")
Expand All @@ -167,15 +167,15 @@ trait Processor extends Serializable {
microsec)
}
case PropertyType.TIMESTAMP => {
val value = row.get(index).toString
val value = row.get(index).toString.trim
if (!NebulaUtils.isNumic(value)) {
throw new IllegalArgumentException(
s"timestamp only support long type, your value is ${value}")
}
row.get(index).toString.toLong
value.toLong
}
case PropertyType.GEOGRAPHY => {
val wkt = row.get(index).toString
val wkt = row.get(index).toString.trim
val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt)
convertJTSGeometryToGeography(jtsGeom)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class EdgeProcessor(spark: SparkSession,
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"rank must exist and cannot be null, your row data is $row")
}
val ranking = row.get(index).toString
val ranking = row.get(index).toString.trim
if (!NebulaUtils.isNumic(ranking)) {
printChoice(streamFlag,
s"Not support non-Numeric type for ranking field.your row data is $row")
Expand Down Expand Up @@ -221,7 +221,7 @@ class EdgeProcessor(spark: SparkSession,
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"$fieldType must exist and cannot be null, your row data is $row")
None
} else Some(row.get(index).toString)
} else Some(row.get(index).toString.trim)
}

val idFlag = fieldValue.isDefined
Expand Down Expand Up @@ -267,7 +267,7 @@ class EdgeProcessor(spark: SparkSession,

if (edgeConfig.rankingField.isDefined) {
val index = row.schema.fieldIndex(edgeConfig.rankingField.get)
val ranking = row.get(index).toString
val ranking = row.get(index).toString.trim
Edge(sourceField, targetField, Some(ranking.toLong), values)
} else {
Edge(sourceField, targetField, None, values)
Expand All @@ -288,7 +288,7 @@ class EdgeProcessor(spark: SparkSession,
indexCells(lat, lng).mkString(",")
} else {
val index = row.schema.fieldIndex(field)
val value = row.get(index).toString
val value = row.get(index).toString.trim
if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
}
// process string type vid
Expand All @@ -310,11 +310,11 @@ class EdgeProcessor(spark: SparkSession,
isEdgeValid(row, edgeConfig, false, vidType == VidType.STRING)

val srcIndex: Int = row.schema.fieldIndex(edgeConfig.sourceField)
var srcId: String = row.get(srcIndex).toString
var srcId: String = row.get(srcIndex).toString.trim
if (srcId.equals(DEFAULT_EMPTY_VALUE)) { srcId = "" }

val dstIndex: Int = row.schema.fieldIndex(edgeConfig.targetField)
var dstId: String = row.get(dstIndex).toString
var dstId: String = row.get(dstIndex).toString.trim
if (dstId.equals(DEFAULT_EMPTY_VALUE)) { dstId = "" }

if (edgeConfig.sourcePolicy.isDefined) {
Expand Down Expand Up @@ -344,7 +344,7 @@ class EdgeProcessor(spark: SparkSession,

val ranking: Long = if (edgeConfig.rankingField.isDefined) {
val rankIndex = row.schema.fieldIndex(edgeConfig.rankingField.get)
row.get(rankIndex).toString.toLong
row.get(rankIndex).toString.trim.toLong
} else {
0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class VerticesProcessor(spark: SparkSession,
return false
}

val vertexId = row.get(index).toString
val vertexId = row.get(index).toString.trim
// process int type vid
if (tagConfig.vertexPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(vertexId)) {
printChoice(
Expand All @@ -203,7 +203,7 @@ class VerticesProcessor(spark: SparkSession,
fieldKeys: List[String],
fieldTypeMap: Map[String, Int]): Vertex = {
val index = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId = row.get(index).toString
var vertexId = row.get(index).toString.trim
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
Expand Down Expand Up @@ -231,7 +231,7 @@ class VerticesProcessor(spark: SparkSession,
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

val index: Int = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId: String = row.get(index).toString
var vertexId: String = row.get(index).toString.trim
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class EdgeProcessor(spark: SparkSession,
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"rank must exist and cannot be null, your row data is $row")
}
val ranking = row.get(index).toString
val ranking = row.get(index).toString.trim
if (!NebulaUtils.isNumic(ranking)) {
printChoice(streamFlag,
s"Not support non-Numeric type for ranking field.your row data is $row")
Expand Down Expand Up @@ -240,7 +240,7 @@ class EdgeProcessor(spark: SparkSession,
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"$fieldType must exist and cannot be null, your row data is $row")
None
} else Some(row.get(index).toString)
} else Some(row.get(index).toString.trim)
}

val idFlag = fieldValue.isDefined
Expand Down Expand Up @@ -286,7 +286,7 @@ class EdgeProcessor(spark: SparkSession,

if (edgeConfig.rankingField.isDefined) {
val index = row.schema.fieldIndex(edgeConfig.rankingField.get)
val ranking = row.get(index).toString
val ranking = row.get(index).toString.trim
Edge(sourceField, targetField, Some(ranking.toLong), values)
} else {
Edge(sourceField, targetField, None, values)
Expand All @@ -307,7 +307,7 @@ class EdgeProcessor(spark: SparkSession,
indexCells(lat, lng).mkString(",")
} else {
val index = row.schema.fieldIndex(field)
val value = row.get(index).toString
val value = row.get(index).toString.trim
if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
}
// process string type vid
Expand All @@ -329,13 +329,13 @@ class EdgeProcessor(spark: SparkSession,
isEdgeValid(row, edgeConfig, false, vidType == VidType.STRING)

val srcIndex: Int = row.schema.fieldIndex(edgeConfig.sourceField)
var srcId: String = row.get(srcIndex).toString
var srcId: String = row.get(srcIndex).toString.trim
if (srcId.equals(DEFAULT_EMPTY_VALUE)) {
srcId = ""
}

val dstIndex: Int = row.schema.fieldIndex(edgeConfig.targetField)
var dstId: String = row.get(dstIndex).toString
var dstId: String = row.get(dstIndex).toString.trim
if (dstId.equals(DEFAULT_EMPTY_VALUE)) {
dstId = ""
}
Expand Down Expand Up @@ -367,7 +367,7 @@ class EdgeProcessor(spark: SparkSession,

val ranking: Long = if (edgeConfig.rankingField.isDefined) {
val rankIndex = row.schema.fieldIndex(edgeConfig.rankingField.get)
row.get(rankIndex).toString.toLong
row.get(rankIndex).toString.trim.toLong
} else {
0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class VerticesProcessor(spark: SparkSession,
return false
}

val vertexId = row.get(index).toString
val vertexId = row.get(index).toString.trim
// process int type vid
if (tagConfig.vertexPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(vertexId)) {
printChoice(
Expand All @@ -222,7 +222,7 @@ class VerticesProcessor(spark: SparkSession,
fieldKeys: List[String],
fieldTypeMap: Map[String, Int]): Vertex = {
val index = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId = row.get(index).toString
var vertexId = row.get(index).toString.trim
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
Expand Down Expand Up @@ -250,7 +250,7 @@ class VerticesProcessor(spark: SparkSession,
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

val index: Int = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId: String = row.get(index).toString
var vertexId: String = row.get(index).toString.trim
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class EdgeProcessor(spark: SparkSession,
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"rank must exist and cannot be null, your row data is $row")
}
val ranking = row.get(index).toString
val ranking = row.get(index).toString.trim
if (!NebulaUtils.isNumic(ranking)) {
printChoice(streamFlag,
s"Not support non-Numeric type for ranking field.your row data is $row")
Expand Down Expand Up @@ -239,7 +239,7 @@ class EdgeProcessor(spark: SparkSession,
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"$fieldType must exist and cannot be null, your row data is $row")
None
} else Some(row.get(index).toString)
} else Some(row.get(index).toString.trim)
}

val idFlag = fieldValue.isDefined
Expand Down Expand Up @@ -285,7 +285,7 @@ class EdgeProcessor(spark: SparkSession,

if (edgeConfig.rankingField.isDefined) {
val index = row.schema.fieldIndex(edgeConfig.rankingField.get)
val ranking = row.get(index).toString
val ranking = row.get(index).toString.trim
Edge(sourceField, targetField, Some(ranking.toLong), values)
} else {
Edge(sourceField, targetField, None, values)
Expand All @@ -306,7 +306,7 @@ class EdgeProcessor(spark: SparkSession,
indexCells(lat, lng).mkString(",")
} else {
val index = row.schema.fieldIndex(field)
val value = row.get(index).toString
val value = row.get(index).toString.trim
if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
}
// process string type vid
Expand All @@ -328,13 +328,13 @@ class EdgeProcessor(spark: SparkSession,
isEdgeValid(row, edgeConfig, false, vidType == VidType.STRING)

val srcIndex: Int = row.schema.fieldIndex(edgeConfig.sourceField)
var srcId: String = row.get(srcIndex).toString
var srcId: String = row.get(srcIndex).toString.trim
if (srcId.equals(DEFAULT_EMPTY_VALUE)) {
srcId = ""
}

val dstIndex: Int = row.schema.fieldIndex(edgeConfig.targetField)
var dstId: String = row.get(dstIndex).toString
var dstId: String = row.get(dstIndex).toString.trim
if (dstId.equals(DEFAULT_EMPTY_VALUE)) {
dstId = ""
}
Expand Down Expand Up @@ -366,7 +366,7 @@ class EdgeProcessor(spark: SparkSession,

val ranking: Long = if (edgeConfig.rankingField.isDefined) {
val rankIndex = row.schema.fieldIndex(edgeConfig.rankingField.get)
row.get(rankIndex).toString.toLong
row.get(rankIndex).toString.trim.toLong
} else {
0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class VerticesProcessor(spark: SparkSession,
fieldKeys: List[String],
fieldTypeMap: Map[String, Int]): Vertex = {
val index = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId = row.get(index).toString
var vertexId = row.get(index).toString.trim
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
Expand Down Expand Up @@ -250,7 +250,7 @@ class VerticesProcessor(spark: SparkSession,
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

val index: Int = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId: String = row.get(index).toString
var vertexId: String = row.get(index).toString.trim
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
Expand Down

0 comments on commit d003fad

Please sign in to comment.